Update a Kafka consumer group

Update a Kafka consumer group

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Managed Service for Apache Kafka quickstart using client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in the Managed Service for Apache Kafka quickstart using client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in the Managed Service for Apache Kafka quickstart using client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.