Create a Google Cloud Managed Service for Apache Kafka cluster

A Managed Service for Apache Kafka cluster provides an environment for storing and processing streams of messages organized into topics.

To create a cluster you can use the Google Cloud console, the Google Cloud CLI, the client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to create a cluster.

Before you begin

Ensure that you are familiar with the following:

Required roles and permissions to create a cluster

To get the permissions that you need to create a cluster, ask your administrator to grant you the Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create a cluster. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a cluster:

  • Create a cluster: managedkafka.clusters.create

You might also be able to get these permissions with custom roles or other predefined roles.

The Managed Kafka Cluster Editor role does not let you create, delete, or modify topics and consumer groups on Managed Service for Apache Kafka clusters. Nor does it allow data plane access to publish or consume messages within clusters. For more information about this role, see Managed Service for Apache Kafka predefined roles.

Properties of a Managed Service for Apache Kafka cluster

When you create or update a Managed Service for Apache Kafka cluster, you must specify the following properties.

Cluster name

The name or ID of the Managed Service for Apache Kafka cluster that you are creating. For guidelines on how to name a cluster, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a cluster is immutable.

Location

The location where you are creating the cluster. The location must be one of the supported Google Cloud regions. The location of a cluster cannot be changed later. For a list of available locations, see Managed Service for Apache Kafka locations.

Capacity configuration

Capacity configuration requires you to configure the number of vCPUs and the amount of memory for your Kafka setup. For more information on how to configure the capacity of a cluster, see Estimate vCPUs and memory for your Managed Service for Apache Kafka cluster.

The following are the properties for capacity configuration:

  • vCPUs: The number of vCPUs assigned to a cluster. The minimum value is 3 vCPUs. The number must also be a multiple of 3.

  • Memory: The amount of memory that is assigned to the cluster. You must provision between 1 GiB and 8 GiB per vCPU. The amount of memory can be increased or decreased within these limits after the cluster is created.

    For example, if you create a cluster with 6 vCPUs, the minimum memory you can allocate to the cluster is 6 GiB (1 GiB per vCPU), and the maximum is 48 GiB (8 GiB per vCPU).

Network configuration

Network configuration is a list of subnets in the VPCs where the cluster is accessible. The IP addresses of the broker and bootstrap server are automatically allocated in each subnet. In addition, DNS entries for these IP addresses are created for each in the corresponding VPCs.

The following are some guidelines for your network configuration:

  • A minimum of 1 subnet is required for a cluster. The maximum is 10.

  • Each subnet must be in the same region as the cluster. The project or VPC can be in a different region.

Labels

Labels are key-value pairs that help you with organization and identification. Labels enable categorizing resources based on environment. Examples are "env:production" and "owner:data-engineering".

You can filter and search for resources based on their labels. For example, assume you have multiple Managed Service for Apache Kafka clusters for different departments. You can configure and search for clusters with the label "department:marketing" to find the relevant one quickly.

Encryption

Managed Service for Apache Kafka can encrypt messages with Google-managed encryption keys (default) or Customer-managed encryption keys (CMEK). Every message is encrypted at rest and in transit. The encryption type for a cluster is immutable.

Google-managed encryption keys are used by default. These keys are created, managed, and stored entirely by Google Cloud within its infrastructure.

CMEK(s) are encryption keys that you manage using Cloud Key Management Service. This feature lets you have greater control over the keys that are used to encrypt data at rest within supported Google Cloud services. Using CMEK incurs additional costs related to Cloud Key Management Service. For CMEK usage, your key ring must be in the same location as the resources you use it with. For more information, see Configure message encryption.

Estimate vCPUs and memory for your cluster

Your goal is to pick the right capacity configuration. To do that, you must understand the throughput your cluster can handle. This section discusses how to estimate the number of vCPUs and size of memory required for your cluster.

Perform the following steps:

  1. Calculate your write-equivalent data rate.

    As a rule of thumb, read traffic is 3-4 times more efficient to process than write traffic. The write-equivalent data rate accounts for this difference and can be calculated as follows:

    Write-equivalent rate = (publish rate) + (read rate / 3)

    Assume a sample estimate that uses a publish rate of 50MBps and a read rate of 100MBps.

    Write-equivalent rate = 50 + (100 / 3) = 83.33MBps

  2. Determine the target vCPU utilization.

    Start with an average utilization target of 50% over a 30-minute time period. If you need to account for spiky traffic, decrease your target utilization to 30% or 40% and provision more CPUs. Higher utilization is cheaper but riskier if traffic exceeds estimates. You might encounter high latencies, producer back-offs, and potential out-of-memory issues if the traffic exceeds your estimation.

    Target utilization=50% or 0.5

  3. Calculate the number of vCPUs required.

    1. Divide your write-equivalent data rate by 10MBps which is the estimated capacity for a single vCPU in a single zone.

    2. Divide the result by your target utilization rate that you determined in step 2.

    3. Multiply by 3 to account for replication across availability zones.

      Number of vCPUs = ceiling (83.33 / 10 / 0.5 ) * 3 = 50 vCPUs

  4. Multiply your vCPU count by 4GB to estimate the required RAM.

    4GB of RAM is recommended for each vCPU.

    Amount of memory= 50 * 4GB = 200GB RAM

These calculations assume messages of size between 1 and 100KB. Very large messages impact calculations, and significant traffic spikes require additional resources. While you cannot decrease vCPUs below the number of brokers in your cluster, you can adjust memory size.

Here's why vCPU reduction is limited: Managed Service for Apache Kafka doesn't allow decreasing the number of brokers. Each broker requires a minimum of 1 CPU. For example, a cluster configured with 45 CPUs (with a maximum of 15 CPUs per broker) results in 3 brokers. In this scenario, you can decrease CPUs to as low as 3 (1 CPU per broker), but no further.

Test with your real workload for the most accurate sizing. Watch your cluster's resource usage and scale up if needed.

Create a cluster

Before you create a cluster, review the documentation of cluster properties.

Creating a cluster usually takes 20-30 minutes.

To create a cluster, follow these steps:

Console

  1. In the Google Cloud console, go to the Clusters page.

    Go to Clusters

  2. Select Create.

    The Create Kafka cluster page opens.

  3. For the Cluster name, enter a string.

    For more information about how to name a cluster, see Guidelines to name a Managed Service for Apache Kafka resource.

  4. For Location, enter a supported location.

    For more information about supported locations, see Supported Managed Service for Apache Kafka locations.

  5. For Capacity configuration, enter values for Memory and vCPUs.

    For more information about how to size an Managed Service for Apache Kafka cluster, see Estimate vCPUs and memory for your Managed Service for Apache Kafka cluster.

  6. For Network configuration, enter the following details:
    1. Project: The project where the subnetwork is located. The subnet must be located in the same region as the cluster, but the project might be different.
    2. Network: The network to which the subnet is connected.
    3. Subnetwork: The name of the subnet.
    4. Subnet URI path: This field is automatically populated. Or, you can enter the subnet path here. The name of the subnet must be in the format: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.
    5. Click Done.
  7. (Optional) Add additional subnets by clicking Add a connected subnet.

    You can add additional subnets, up to a maximum value of ten.

  8. Retain the other default values.
  9. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka clusters create command:

    gcloud managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS

    Replace the following:

    • CLUSTER_ID: The ID or name of the cluster.

      For more information about how to name a cluster, see Guidelines to name a Managed Service for Apache Kafka resource.

    • LOCATION: The location of the cluster.

      For more information about supported locations, see Supported Managed Service for Apache Kafka locations.

    • CPU: The number of vCPUs for the cluster.

      For more information about how to size an Managed Service for Apache Kafka cluster, see Estimate vCPUs and memory for your Managed Service for Apache Kafka cluster.

    • MEMORY: The amount of memory for the cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB".

    • SUBNETS: The list of subnets to connect to. Use commas to separate multiple subnet values.

      The format of the subnet is projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

    • auto-rebalance: Enables automatic rebalancing of topic partitions among brokers when the number of CPUs in the cluster changes. This is enabled by default.

    • ENCRYPTION_KEY: ID of the customer-managed encryption key to use for the cluster.

      The format is projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY.

    • --async: Lets the system send the create request and immediately returns a response, without waiting for the operation to complete. With the --async flag, you can continue with other tasks while the cluster creation happens in the background. If you don't use the flag, the system waits for the operation to complete before returning a response. You have to wait until the cluster is fully updated before you can continue with other tasks.

    • LABELS: Labels to associate with the cluster.

      For more information about the format for labels, see Labels.

    You get a response similar to the following:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    Store the OPERATION_ID to track progress.

Terraform

You can use a Terraform resource to create a cluster.

resource "google_managed_kafka_cluster" "default" {
  project    = data.google_project.default.project_id # Replace this with your project ID in quotes
  cluster_id = "my-cluster-id"
  location   = "us-central1"
  capacity_config {
    vcpu_count   = 3
    memory_bytes = 3221225472
  }
  gcp_config {
    access_config {
      network_configs {
        subnet = google_compute_subnetwork.default.id
      }
    }
  }
}

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands.

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
	// cpu := 3
	// memoryBytes := 3221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)

	// Memory must be between 1 GiB and 8 GiB per CPU.
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   cpu,
		MemoryBytes: memoryBytes,
	}
	var networkConfig []*managedkafkapb.NetworkConfig
	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
		Subnet: subnet,
	})
	platformConfig := &managedkafkapb.Cluster_GcpConfig{
		GcpConfig: &managedkafkapb.GcpConfig{
			AccessConfig: &managedkafkapb.AccessConfig{
				NetworkConfigs: networkConfig,
			},
		},
	}
	rebalanceConfig := &managedkafkapb.RebalanceConfig{
		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
	}
	cluster := &managedkafkapb.Cluster{
		Name:            clusterPath,
		CapacityConfig:  capacityConfig,
		PlatformConfig:  platformConfig,
		RebalanceConfig: rebalanceConfig,
	}

	req := &managedkafkapb.CreateClusterRequest{
		Parent:    locationPath,
		ClusterId: clusterID,
		Cluster:   cluster,
	}
	op, err := client.CreateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.AccessConfig;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.CreateClusterRequest;
import com.google.cloud.managedkafka.v1.GcpConfig;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.NetworkConfig;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.RebalanceConfig;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    int cpu = 3;
    long memoryBytes = 3221225472L; // 3 GiB
    createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
  }

  public static void createCluster(
      String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig =
        CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
    NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
    GcpConfig gcpConfig =
        GcpConfig.newBuilder()
            .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
    RebalanceConfig rebalanceConfig =
        RebalanceConfig.newBuilder()
            .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
            .build();
    Cluster cluster =
        Cluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setRebalanceConfig(rebalanceConfig)
            .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {

      CreateClusterRequest request =
          CreateClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setClusterId(clusterId)
              .setCluster(cluster)
              .build();

      // The duration of this operation can vary considerably, typically taking between 10-40
      // minutes.
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.createClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
      Cluster response = future.get();
      System.out.printf("Created cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 3
# memory_bytes = 3221225472

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.vcpu_count = cpu
cluster.capacity_config.memory_bytes = memory_bytes
cluster.gcp_config.access_config.network_configs = [
    managedkafka_v1.NetworkConfig(subnet=subnet)
]
cluster.rebalance_config.mode = (
    managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
)

request = managedkafka_v1.CreateClusterRequest(
    parent=client.common_location_path(project_id, region),
    cluster_id=cluster_id,
    cluster=cluster,
)

try:
    operation = client.create_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # The duration of this operation can vary considerably, typically taking 10-40 minutes.
    # We can set a timeout of 3000s (50 minutes).
    response = operation.result(timeout=3000)
    print("Created cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Monitor the cluster creation operation

You can run the following command only if you ran the gcloud CLI for creating the cluster.

  • Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the gcloud managed-kafka clusters create command uses a long running operation (LRO), which you can monitor using the following command:

    curl -X GET \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    "https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID"
    

    Replace the following:

    • OPERATION_ID with the value of the operation ID from the previous section.
    • LOCATION with the value of the location from the previous section.
    • PROJECT_ID with the project for your Kafka cluster.

Troubleshooting

The following are some errors you may encounter when creating clusters.

Service agent service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com has not been granted the required role cloudkms.cryptoKeyEncrypterDecrypter to encrypt data using the KMS key.

The Managed Service for Apache Kafka service agent is missing the required permission to access the Cloud KMS key. See the documentation for required roles for configuring CMEK.

Service does not have permission to retrieve subnet. Please grant service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com the managedkafka.serviceAgent role in the IAM policy of the project ${SUBNET_PROJECT} and ensure the Compute Engine API is enabled in project ${SUBNET_PROJECT}

The Managed Service for Apache Kafka service agent is missing the required role to configure networking in the VPC network that the Kafka clients run in. See the documentation for required permissions for configuring networking.

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.