Update a Connect cluster

You can edit a Connect cluster to update properties like the number of vCPUs, memory, network, and labels.

To edit a Connect cluster, you can use the Google Cloud Console, the gcloud CLI, the client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to update a Connect cluster.

Before you begin

Not all the properties of a Connect cluster are editable. Review the properties of a Connect cluster before you update.

Required roles and permissions to edit a Connect cluster

To get the permissions that you need to edit a Connect cluster, ask your administrator to grant you the Managed Kafka Connect Cluster Editor (roles/managedkafka.connectClusterEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to edit a Connect cluster. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to edit a Connect cluster:

  • Grant the update a Connect cluster permission on the specified location: managedkafka.connectClusters.update
  • Grant the view a Connect cluster permission on the specified location. This permission is only required for updating a Connect Cluster using the Google Cloud console: managedkafka.connectors.list

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about this role, see Managed Service for Apache Kafka predefined roles.

Edit a Connect cluster

Updating certain properties, such as CPU and memory, requires a cluster restart.

Cluster restarts preserve data but might increase latency. The initial number of workers in the cluster determines the restart duration.

You can update the following Connect cluster properties:

Property Editable
vCPUs Yes
Memory Yes
Network Yes
Primary Subnet Yes
Additional Subnets Yes (Add/Delete)
Resolvable DNS domains Yes (Add/Delete)
Connect cluster name No
Kafka cluster No
Location No
Labels Yes (Add/Edit/Delete)
Secrets Yes (Add/Delete)

Console

  1. In the Google Cloud console, go to the Connect Clusters page.

    Go to Connect Clusters

  2. Click the Connect cluster that you want to update.

    The Connect cluster details page is displayed.

  3. Click Edit.

    The Edit Kafka Connect cluster page is displayed.

  4. Make the necessary changes to the editable properties.

  5. Click Save.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud alpha managed-kafka connect-clusters update command:

    gcloud alpha managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=PRIMARY_SUBNET \
         : --additional-subnet=ADDITIONAL_SUBNET]] \
        [--async]
    

    Replace the following:

    • CONNECT_CLUSTER_ID: The ID or name of the Connect cluster. The name of a Connect cluster is immutable.
    • LOCATION: The location of the Connect cluster. The location of a Connect cluster is immutable.
    • CPU: The number of vCPUs for the Connect cluster. The minimum value is 3 vCPUs.
    • MEMORY: The amount of memory for the Connect cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB". You must provision between 1 GiB and 8 GiB per vCPU.

    • DNS_NAME: DNS domain name from the subnet's network to be made visible to the Connect Cluster.
    • LABELS: (Optional) Labels to associate with the cluster. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (_), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (_), lowercase characters, and numbers.
    • SECRET: (Optional) Secrets to load into workers. Exact Secret versions from Secret Manager must be provided, aliases are not supported. Up to 32 secrets may be loaded into one cluster. Format: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • PRIMARY_SUBNET: The primary subnet for the Connect cluster.

      The format of the subnet is projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

      The primary subnet must be in the same region as the Connect cluster. See Connected subnets.

    • ADDITIONAL_SUBNET: (Optional) Additional subnets for the Connect cluster. The other subnets can be in a different region than the Connect cluster, but must be in the same VPC network. See Connected subnets.

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-connect-cluster"
	// memoryBytes := 25769803776 // 24 GiB in bytes
	// labels := map[string]string{"environment": "production"}
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)

	// Capacity configuration update
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memoryBytes,
	}

	connectCluster := &managedkafkapb.ConnectCluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
		Labels:         labels,
	}
	paths := []string{"capacity_config.memory_bytes", "labels"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConnectClusterRequest{
		UpdateMask:     updateMask,
		ConnectCluster: connectCluster,
	}
	op, err := client.UpdateConnectCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.ConnectCluster;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateConnectCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-connect-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateConnectCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateConnectCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    ConnectCluster connectCluster = ConnectCluster.newBuilder()
        .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
        .setCapacityConfig(capacityConfig)
        .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateConnectClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
        settingsBuilder.build())) {
      UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
          .setUpdateMask(updateMask)
          .setConnectCluster(connectCluster).build();
      OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
          .updateConnectClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateConnectCluster contains sample
      // code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf(
          "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      ConnectCluster response = future.get();
      System.out.printf("Updated connect cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
          e.getMessage());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import ConnectCluster
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"
# memory_bytes = 4295000000

connect_client = ManagedKafkaConnectClient()

connect_cluster = ConnectCluster()
connect_cluster.name = connect_client.connect_cluster_path(
    project_id, region, connect_cluster_id
)
connect_cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
request = managedkafka_v1.UpdateConnectClusterRequest(
    update_mask=update_mask,
    connect_cluster=connect_cluster,
)

try:
    operation = connect_client.update_connect_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    operation.result()
    response = operation.result()
    print("Updated Connect cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.