Update a Google Cloud Managed Service for Apache Kafka cluster

You can edit a Google Cloud Managed Service for Apache Kafka cluster to update properties like the number of vCPUs, memory, subnets, encryption type, or labels. You can also trigger an auto-rebalance.

To edit a cluster you can use the Google Cloud console, the Google Cloud CLI, the client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to update a cluster.

Before you begin

Review the properties of the cluster before making any changes.

Required roles and permissions to edit a cluster

To get the permissions that you need to update a cluster, ask your administrator to grant you the Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to update a cluster. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to update a cluster:

  • Edit a cluster: managedkafka.clusters.update

You might also be able to get these permissions with custom roles or other predefined roles.

The Managed Kafka Cluster Editor role does not let you create, delete, or modify topics and consumer groups on Managed Service for Apache Kafka clusters. Nor does it allow data plane access to publish or consume messages within clusters. For more information about this role, see Managed Service for Apache Kafka predefined roles.

Editable properties of a cluster

Not all properties can be edited in the Google Cloud console. Use the gcloud CLI, or the client libraries to edit properties not available in the Google Cloud console.

Edit a cluster

Before you edit a cluster, review the editable properties of a cluster. Updating certain properties, such as CPU and memory, might require a cluster restart. When required, clusters are restarted one broker at a time. This leads to temporary failures of requests to individual brokers. These failures are transient. Commonly used client libraries handle such errors automatically.

To edit a cluster, follow these steps:

Console

  1. In the Google Cloud console, go to the Cluster page.

    Go to Clusters

  2. From the list of clusters, click the cluster whose properties you want to edit.

    The cluster details page is displayed.

  3. In the cluster details page, click Edit.
  4. Edit the properties as required. The following properties of a cluster are editable from the console:
    • Memory
    • vCPUs
    • Subnet
    • Labels

    You cannot edit the cluster name, the cluster location, or the encryption type.

  5. Click Save.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud beta managed-kafka clusters update command:

    gcloud beta managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Replace the following:

    • CLUSTER_ID: The ID or name of the cluster. You can't update this value.

    • LOCATION: The location of the cluster. You can't update this value.

    • CPU: The number of virtual CPUs for the cluster.

    • MEMORY: The amount of memory for the cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB".

    • SUBNETS: The list of subnets to connect to. Use commas to separate multiple subnet values.

    • auto-rebalance: Enables automatic rebalancing of topic partitions among brokers when the number of CPUs in the cluster changes. This is enabled by default.

    • LABELS: Labels to associate with the cluster.

If you use the --async flag with your command, the system sends the update request and immediately returns a response, without waiting for the operation to complete. With the --async flag, you can continue with other tasks while the cluster update happens in the background. If you don't use the --async flag, the system waits for the operation to complete before returning a response. You have to wait until the cluster is fully updated before you can continue with other tasks.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 4221225472L; // 4 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);
      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2


def update_cluster(
    project_id: str, region: str, cluster_id: str, memory_bytes: int
) -> None:
    """
    Update a Kafka cluster. For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.

    Args:
        project_id: Google Cloud project ID.
        region: Cloud region.
        cluster_id: ID of the Kafka cluster.
        memory_bytes: The memory to provision for the cluster in bytes.

    Raises:
        This method will raise the exception if the operation errors or
        the timeout before the operation completes is reached.
    """

    client = managedkafka_v1.ManagedKafkaClient()

    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")

    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )

    try:
        operation = client.update_cluster(request=request)
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError:
        print(operation.operation.error)

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.