Create an Apache Kafka for BigQuery cluster

An Apache Kafka for BigQuery cluster provides an environment for storing and processing streams of messages organized into topics.

To create a cluster you can use the Google Cloud console, the Google Cloud CLI, the client library, or the Apache Kafka for BigQuery API. You can't use the open source Apache Kafka API to create a cluster.

Before you begin

Ensure that you are familiar with the following:

Required roles and permissions to create a cluster

To get the permissions that you need to create a cluster, ask your administrator to grant you the Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) IAM role on your project. For more information about granting roles, see Manage access.

This predefined role contains the permissions required to create a cluster. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a cluster:

  • Create a cluster: managedkafka.clusters.create

You might also be able to get these permissions with custom roles or other predefined roles.

The Managed Kafka Cluster Editor role does not let you create, delete, or modify topics and consumer groups on Apache Kafka for BigQuery clusters. Nor does it allow data plane access to publish or consume messages within clusters. For more information about this role, see Apache Kafka for BigQuery predefined roles.

Properties of an Apache Kafka for BigQuery cluster

When you create or update an Apache Kafka for BigQuery cluster, you must specify the following properties.

Cluster name

The name or ID of the Apache Kafka for BigQuery cluster that you are creating. For guidelines on how to name a cluster, see Guidelines to name an Apache Kafka for BigQuery resource. The name of a cluster is immutable.

Location

The location where you are creating the cluster. The location must be one of the supported Google Cloud regions. The location of a cluster cannot be changed later. For a list of available locations, see Apache Kafka for BigQuery locations.

Capacity configuration

Capacity configuration requires you to configure the number of vCPUs and the amount of memory for your Kafka setup. For more information on how to configure the capacity of a cluster, see Estimate vCPUs and memory for your Apache Kafka for BigQuery cluster.

The following are the properties for capacity configuration:

  • vCPUs: The number of vCPUs assigned to a cluster. The minimum value is 3 vCPUs. The number must also be a multiple of 3.

  • Memory: The amount of memory that is assigned to the cluster. You must provision between 1 GiB and 8 GiB per vCPU. The amount of memory can be increased or decreased within these limits after the cluster is created.

    For example, if you create a cluster with 6 vCPUs, the minimum memory you can allocate to the cluster is 6 GiB (1 GiB per vCPU), and the maximum is 48 GiB (8 GiB per vCPU).

Network configuration

Network configuration is a list of subnets in the VPCs where the cluster is accessible. The IP addresses of the broker and bootstrap server are automatically allocated in each subnet. In addition, DNS entries for these IP addresses are created for each in the corresponding VPCs.

The following are some guidelines for your network configuration:

  • A minimum of 1 subnet is required for a cluster. The maximum is 10.

  • Each subnet must be in the same region as the cluster. The project or VPC can be in a different region.

Labels

Labels are key-value pairs that help you with organization and identification. Labels enable categorizing resources based on environment. Examples are "env:production" and "owner:data-engineering".

You can filter and search for resources based on their labels. For example, assume you have multiple Apache Kafka for BigQuery clusters for different departments. You can configure and search for clusters with the label "department:marketing" to find the relevant one quickly.

Encryption

Apache Kafka for BigQuery can encrypt messages with Google-managed encryption keys (default) or Customer-managed encryption keys (CMEK). Every message is encrypted at rest and in transit. The encryption type for a cluster is immutable.

Google-managed encryption keys are used by default. These keys are created, managed, and stored entirely by Google Cloud within its infrastructure.

CMEK(s) are encryption keys that you manage using Cloud Key Management Service. This feature lets you have greater control over the keys that are used to encrypt data at rest within supported Google Cloud services. Using CMEK incurs additional costs related to Cloud Key Management Service. For CMEK usage, your key ring must be in the same location as the resources you use it with. For more information, see Configure message encryption.

Estimate vCPUs and memory for your cluster

Your goal is to pick the right capacity configuration. To do that, you must understand the throughput your cluster can handle. This section discusses how to estimate the number of vCPUs and size of memory required for your cluster.

Perform the following steps:

  1. Calculate your write-equivalent data rate.

    As a rule of thumb, read traffic is 3-4 times more efficient to process than write traffic. The write-equivalent data rate accounts for this difference and can be calculated as follows:

    Write-equivalent rate = (publish rate) + (read rate / 3)

    Assume a sample estimate that uses a publish rate of 50MBps and a read rate of 100MBps.

    Write-equivalent rate = 50 + (100 / 3) = 83.33MBps

  2. Determine the target vCPU utilization.

    Start with an average utilization target of 50% over a 30-minute time period. If you need to account for spiky traffic, decrease your target utilization to 30% or 40% and provision more CPUs. Higher utilization is cheaper but riskier if traffic exceeds estimates. You might encounter high latencies, producer back-offs, and potential out-of-memory issues if the traffic exceeds your estimation.

    Target utilization=50% or 0.5

  3. Calculate the number of vCPUs required.

    1. Divide your write-equivalent data rate by 10MBps which is the estimated capacity for a single vCPU in a single zone.

    2. Divide the result by your target utilization rate that you determined in step 2.

    3. Multiply by 3 to account for replication across availability zones.

      Number of vCPUs = ceiling (83.33 / 10 / 0.5 ) * 3 = 50 vCPUs

  4. Multiply your vCPU count by 4GB to estimate the required RAM.

    4GB of RAM is recommended for each vCPU.

    Amount of memory= 50 * 4GB = 200GB RAM

These calculations assume messages of size between 1 and 100KB. Very large messages impact calculations, and significant traffic spikes require additional resources. While you cannot decrease vCPUs below the number of brokers in your cluster, you can adjust memory size.

Here's why vCPU reduction is limited: Apache Kafka for BigQuery doesn't allow decreasing the number of brokers. Each broker requires a minimum of 1 CPU. For example, a cluster configured with 45 CPUs (with a maximum of 15 CPUs per broker) results in 3 brokers. In this scenario, you can decrease CPUs to as low as 3 (1 CPU per broker), but no further.

Test with your real workload for the most accurate sizing. Watch your cluster's resource usage and scale up if needed.

Create a cluster

Before you create a cluster, review the documentation of cluster properties.

To create a cluster, follow these steps:

Console

  1. In the Google Cloud console, go to the Clusters page.

    Go to Clusters

  2. Select Create.

    The Create Kafka cluster page opens.

  3. For the Cluster name, enter a string.

    For more information about how to name a cluster, see Guidelines to name an Apache Kafka for BigQuery resource.

  4. For Location, enter a supported location.

    For more information about supported locations, see Supported Apache Kafka for BigQuery locations.

  5. For Capacity configuration, enter values for Memory and vCPUs.

    For more information about how to size an Apache Kafka for BigQuery cluster, see Estimate vCPUs and memory for your Apache Kafka for BigQuery cluster.

  6. For Network configuration, enter the following details:
    1. Project: The project where the subnetwork is located. The subnet must be located in the same region as the cluster, but the project might be different.
    2. Network: The network to which the subnet is connected.
    3. Subnetwork: The name of the subnet.
    4. Subnet URI path: This field is automatically populated. Or, you can enter the subnet path here. The name of the subnet must be in the format: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.
    5. Click Done.
  7. (Optional) Add additional subnets by clicking Add a connected subnet.

    You can add additional subnets, up to a maximum value of ten.

  8. Retain the other default values.
  9. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud beta managed-kafka clusters create command:

    gcloud beta managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS
    

    Replace the following:

    • CLUSTER_ID: The ID or name of the cluster.

      For more information about how to name a cluster, see Guidelines to name an Apache Kafka for BigQuery resource.

    • LOCATION: The location of the cluster.

      For more information about supported locations, see Supported Apache Kafka for BigQuery locations.

    • CPU: The number of vCPUs for the cluster.

      For more information about how to size an Apache Kafka for BigQuery cluster, see Estimate vCPUs and memory for your Apache Kafka for BigQuery cluster.

    • MEMORY: The amount of memory for the cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB".

    • SUBNETS: The list of subnets to connect to. Use commas to separate multiple subnet values.

      The format of the subnet is projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

    • auto-rebalance: Enables automatic rebalancing of topic partitions among brokers when the number of CPUs in the cluster changes. This is enabled by default.

    • ENCRYPTION_KEY: ID of the customer-managed encryption key to use for the cluster.

      The format is projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY.

    • --async: Lets the system send the create request and immediately returns a response, without waiting for the operation to complete. With the --async flag, you can continue with other tasks while the cluster creation happens in the background. If you don't use the flag, the system waits for the operation to complete before returning a response. You have to wait until the cluster is fully updated before you can continue with other tasks.

    • LABELS: Labels to associate with the cluster.

      For more information about the format for labels, see Labels.

    You get a response similar to the following:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    Store the OPERATION_ID to track progress.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// subnet := "my-subnet"
	// cpu := 3
	// memoryBytes := 3221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)

	// Memory must be between 1 GiB and 8 GiB per CPU.
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   cpu,
		MemoryBytes: memoryBytes,
	}
	var networkConfig []*managedkafkapb.NetworkConfig
	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
		Subnet: subnet,
	})
	platformConfig := &managedkafkapb.Cluster_GcpConfig{
		GcpConfig: &managedkafkapb.GcpConfig{
			AccessConfig: &managedkafkapb.AccessConfig{
				NetworkConfigs: networkConfig,
			},
		},
	}
	rebalanceConfig := &managedkafkapb.RebalanceConfig{
		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
	}
	cluster := &managedkafkapb.Cluster{
		Name:            clusterPath,
		CapacityConfig:  capacityConfig,
		PlatformConfig:  platformConfig,
		RebalanceConfig: rebalanceConfig,
	}

	req := &managedkafkapb.CreateClusterRequest{
		Parent:    locationPath,
		ClusterId: clusterID,
		Cluster:   cluster,
	}
	op, err := client.CreateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
	return nil
}

Java

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.managedkafka.v1.AccessConfig;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.CreateClusterRequest;
import com.google.cloud.managedkafka.v1.GcpConfig;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.NetworkConfig;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.RebalanceConfig;
import java.util.concurrent.ExecutionException;

public class CreateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    int cpu = 3;
    long memoryBytes = 3221225472L; // 3 GiB
    createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
  }

  public static void createCluster(
      String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig =
        CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
    NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
    GcpConfig gcpConfig =
        GcpConfig.newBuilder()
            .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
    RebalanceConfig rebalanceConfig =
        RebalanceConfig.newBuilder()
            .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
            .build();
    Cluster cluster =
        Cluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setRebalanceConfig(rebalanceConfig)
            .build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateClusterRequest request =
          CreateClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setClusterId(clusterId)
              .setCluster(cluster)
              .build();
      // The duration of this operation can vary considerably, typically taking between 10-40
      // minutes.
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.createClusterOperationCallable().futureCall(request);
      Cluster response = future.get();
      System.out.printf("Created cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
    }
  }
}

Monitor the cluster creation operation

You can run the following command only if you ran the gcloud CLI for creating the cluster.

  • Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the gcloud beta managed-kafka clusters create command uses a long running operation (LRO), which you can monitor using the following command:

    curl -X GET \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    "https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID"
    

    Replace the following:

    • OPERATION_ID with the value of the operation ID from the previous section.
    • LOCATION with the value of the location from the previous section.
    • PROJECT_ID with the project for your Kafka cluster.

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.