Update a Google Cloud Managed Service for Apache Kafka consumer group

You can update a Google Cloud Managed Service for Apache Kafka consumer group to modify the offsets for a list of topic partitions.

To update a consumer group, you can use the Google Cloud CLI, the client library, the Managed Kafka API, or the open source Apache Kafka APIs. The Google Cloud is not supported for editing a consumer group.

Required roles and permissions to update a consumer group

To get the permissions that you need to edit your consumer groups, ask your administrator to grant you the Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to edit your consumer groups. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to edit your consumer groups:

  • Update consumer groups: managedkafka.consumerGroups.update

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Consumer Group Editor role, see Managed Service for Apache Kafka predefined roles.

Update a consumer group

To update a consumer group, follow these steps:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud beta managed-kafka consumer-groups update command:

    gcloud beta managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE
    

    Replace the following:

    • CLUSTER_ID: The ID or name of the cluster.

    • LOCATION: The location of the cluster.

    • CONSUMER_GROUP_ID: The ID or name of the consumer group.

    • TOPICS_FILE: This setting specifies the location of the file containing the configuration of topics to be updated for the consumer group. The file can be in JSON or YAML format. It can be a file path or directly include the JSON or YAML content.

      The topic file uses a JSON structure to represent a ConsumerGroup topics map, in the form { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. For each topic, ConsumerPartitionMetadata provides the offset and metadata for each partition.

      To set the offset for a single partition (partition 0) in a topic named topic1 to 10, the JSON configuration would look like:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      The following is an example of the contents of a topics.json file:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }
      

    • TOPIC_PATH: When specifying topics in JSON or YAML file, include the full topic path which can be obtained from running the gcloud beta managed-kafak topics describe command and of the format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := {1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2


def update_consumer_group(
    project_id: str,
    region: str,
    cluster_id: str,
    consumer_group_id: str,
    topic_path: str,
    partition_offsets: dict[int, int],
) -> None:
    """
    Update a single partition's offset in a Kafka consumer group.

    Args:
        project_id: Google Cloud project ID.
        region: Cloud region.
        cluster_id: ID of the Kafka cluster.
        consumer_group_id: ID of the Kafka consumer group.
        topic_path: Name of the Kafka topic.
        partition_offsets: Configuration of the topic, represented as a map of partition indexes to their offset value.

    Raises:
        This method will raise the exception if the consumer group is not found.
    """

    client = managedkafka_v1.ManagedKafkaClient()

    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )

    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }

    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")

    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )

    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound:
        print(f"Consumer group {consumer_group.name} not found")

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.