Get a Kafka consumer group

Get the information about a Kafka consumer group

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Managed Service for Apache Kafka quickstart using client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func getConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
	req := &managedkafkapb.GetConsumerGroupRequest{
		Name: consumerGroupPath,
	}
	consumerGroup, err := client.GetConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.GetConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Got consumer group: %#v\n", consumerGroup)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in the Managed Service for Apache Kafka quickstart using client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import java.io.IOException;

public class GetConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String consumerGroupId = "my-consumer-group";
    getConsumerGroup(projectId, region, clusterId, consumerGroupId);
  }

  public static void getConsumerGroup(
      String projectId, String region, String clusterId, String consumerGroupId) throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      // This operation is being handled synchronously.
      ConsumerGroup consumerGroup =
          managedKafkaClient.getConsumerGroup(
              ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId));
      System.out.println(consumerGroup.getAllFields());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.getConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in the Managed Service for Apache Kafka quickstart using client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"

client = managedkafka_v1.ManagedKafkaClient()

consumer_group_path = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)
request = managedkafka_v1.GetConsumerGroupRequest(
    name=consumer_group_path,
)

try:
    consumer_group = client.get_consumer_group(request=request)
    print("Got consumer group:", consumer_group)
except NotFound as e:
    print(f"Failed to get consumer group {consumer_group_id} with error: {e.message}")

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.