Create a Google Cloud Managed Service for Apache Kafka topic

In Managed Service for Apache Kafka, messages are organized in topics. A topic is made up of partitions. A partition is an ordered, immutable sequence of records that is owned by a single broker within a Kafka cluster. You must create a topic to publish or consume messages.

To create a topic you can use the Google Cloud console, the Google Cloud CLI, the client library, the Managed Kafka API, or the open source Apache Kafka APIs.

Before you begin

You must first create a cluster before creating a topic. Ensure that you have set up the following:

Required roles and permissions to create a topic

To get the permissions that you need to create a topic, ask your administrator to grant you the Managed Kafka Topic Editor (roles/managedkafka.topicEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create a topic. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a topic:

  • Create a topic: managedkafka.topics.create

You might also be able to get these permissions with custom roles or other predefined roles.

The Managed Kafka Topic Editor role also contains the Managed Kafka Viewer role. For more information about this role, see Managed Service for Apache Kafka predefined roles.

Properties of a Managed Service for Apache Kafka topic

When you create or update a Managed Service for Apache Kafka topic, you must specify the following properties.

Topic name

The name of the Managed Service for Apache Kafka topic that you are creating. For guidelines on how to name a topic, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a topic is immutable.

Partition count

The number of partitions in the topic. You can edit a topic to increase the partition count for a topic, but you cannot decrease it. Increasing the number of partitions for a topic that uses a key might change how messages are distributed.

Replication factor

The number of replicas for each partition. If you don't specify the value, the cluster's default replication factor is used.

A higher replication factor can improve data consistency in the event of broker failures, as data is replicated to multiple brokers. For production environments, a replication factor of 3 or higher is recommended. Higher replica counts increase local storage and data transfer costs for the topic. However, they don't increase the persistent storage costs. The replication factor cannot exceed the number of available brokers.

Other parameters

You can also set other Apache Kafka topic-level configuration parameters. These are specified as key=value pairs that override the cluster defaults.

Configurations related to topics have a server default and an optional per-topic override. The format is a comma-separated list of KEY=VALUE pairs, where KEY is the name of the Kafka topic configuration property, and VALUE is the required setting.These key-value pairs help you override the cluster defaults. Examples include flush.ms=10 and compression.type=producer.

For a list of all supported topic-level configurations, see Apache Kafka topic-level configs.

Create a topic

Before you create a topic, review the topic properties.

Console

  1. In the Google Cloud console console, go to the Clusters page.

    Go to Clusters

  2. Click the cluster for which you want to create a topic.

    The Cluster details page opens.

  3. In the cluster details page, click Create Topic.

    The Create Kafka topic page opens.

  4. For the Topic name, enter a string.
  5. For Partition count, enter the number of partitions you want or keep the default value.
  6. For Replication factor, enter the replication factor you want or keep the default value.
  7. (Optional) To alter any topic configurations, add them as comma-separated key-value pairs in the Configurations field.
  8. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka topics create command:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS_NUM \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Replace the following:

    • TOPIC_ID: The name of the topic.

    • CLUSTER: The name of the cluster where you want to create the topic.

    • LOCATION: The region of the cluster.

      For more information about supported locations, see Supported Managed Kafka locations.

    • PARTITIONS: The number of partitions for the topic.

    • REPLICATION_FACTOR: The replication factor for the topic.

    • CONFIGS:Topic-level optional parameters. Specify as comma-separated key-value pairs. For example, compression.type=producer.

Terraform

You can use a Terraform resource to create a topic.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

What's next?