Managed Service for Apache Kafka client libraries

This page shows how to get started with the Cloud Client Libraries for the Managed Kafka API. Client libraries make it easier to access Google Cloud APIs from a supported language. Although you can use Google Cloud APIs directly by making raw requests to the server, client libraries provide simplifications that significantly reduce the amount of code you need to write.

Read more about the Cloud Client Libraries and the older Google API Client Libraries in Client libraries explained.

Install the client library

Go

go get cloud.google.com/go/managedkafka

For more information, see Setting Up a Go Development Environment.

Java

If you are using Maven, add this to your pom.xml file:

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-managedkafka</artifactId>
    <version>0.9.0</version>
</dependency>

If you are using Gradle, add this to your dependencies:

compile group: 'com.google.cloud', name: 'google-cloud-managedkafka', version:'0.9.0'

For more information, see Setting Up a Java Development Environment.

Python

pip install google-cloud-managedkafka

For more information, see Setting Up a Python Development Environment.

Set up authentication

To authenticate calls to Google Cloud APIs, client libraries support Application Default Credentials (ADC); the libraries look for credentials in a set of defined locations and use those credentials to authenticate requests to the API. With ADC, you can make credentials available to your application in a variety of environments, such as local development or production, without needing to modify your application code.

For production environments, the way you set up ADC depends on the service and context. For more information, see Set up Application Default Credentials.

For a local development environment, you can set up ADC with the credentials that are associated with your Google Account:

  1. Install the Google Cloud CLI, then initialize it by running the following command:

    gcloud init
  2. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    A sign-in screen appears. After you sign in, your credentials are stored in the local credential file used by ADC.

Use the client library

The client library samples for Google Cloud Managed Service for Apache Kafka show you how to do cluster administration using the Google Cloud Managed Kafka API. For example, the following code sample shows how to use the client library to create a Managed Service for Apache Kafka cluster.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
	// cpu := 3
	// memoryBytes := 3221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)

	// Memory must be between 1 GiB and 8 GiB per CPU.
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   cpu,
		MemoryBytes: memoryBytes,
	}
	var networkConfig []*managedkafkapb.NetworkConfig
	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
		Subnet: subnet,
	})
	platformConfig := &managedkafkapb.Cluster_GcpConfig{
		GcpConfig: &managedkafkapb.GcpConfig{
			AccessConfig: &managedkafkapb.AccessConfig{
				NetworkConfigs: networkConfig,
			},
		},
	}
	rebalanceConfig := &managedkafkapb.RebalanceConfig{
		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
	}
	cluster := &managedkafkapb.Cluster{
		Name:            clusterPath,
		CapacityConfig:  capacityConfig,
		PlatformConfig:  platformConfig,
		RebalanceConfig: rebalanceConfig,
	}

	req := &managedkafkapb.CreateClusterRequest{
		Parent:    locationPath,
		ClusterId: clusterID,
		Cluster:   cluster,
	}
	op, err := client.CreateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
	return nil
}

Java


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.AccessConfig;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.CreateClusterRequest;
import com.google.cloud.managedkafka.v1.GcpConfig;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.NetworkConfig;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.RebalanceConfig;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    int cpu = 3;
    long memoryBytes = 3221225472L; // 3 GiB
    createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
  }

  public static void createCluster(
      String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig =
        CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
    NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
    GcpConfig gcpConfig =
        GcpConfig.newBuilder()
            .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
    RebalanceConfig rebalanceConfig =
        RebalanceConfig.newBuilder()
            .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
            .build();
    Cluster cluster =
        Cluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setRebalanceConfig(rebalanceConfig)
            .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {

      CreateClusterRequest request =
          CreateClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setClusterId(clusterId)
              .setCluster(cluster)
              .build();

      // The duration of this operation can vary considerably, typically taking between 10-40
      // minutes.
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.createClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
      Cluster response = future.get();
      System.out.printf("Created cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 3
# memory_bytes = 3221225472

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.vcpu_count = cpu
cluster.capacity_config.memory_bytes = memory_bytes
cluster.gcp_config.access_config.network_configs = [
    managedkafka_v1.NetworkConfig(subnet=subnet)
]
cluster.rebalance_config.mode = (
    managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
)

request = managedkafka_v1.CreateClusterRequest(
    parent=client.common_location_path(project_id, region),
    cluster_id=cluster_id,
    cluster=cluster,
)

try:
    operation = client.create_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # The duration of this operation can vary considerably, typically taking 10-40 minutes.
    # We can set a timeout of 3000s (50 minutes).
    response = operation.result(timeout=3000)
    print("Created cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Additional resources

Go

The following list contains links to more resources related to the client library for Go:

Java

The following list contains links to more resources related to the client library for Java:

Python

The following list contains links to more resources related to the client library for Python: