Publish and consume messages with the CLI

Learn how to create a Google Cloud Managed Service for Apache Kafka cluster using the CLI and configure producer and consumer clients to connect to the cluster. The following guide creates a relatively small cluster designed only for testing.

Before you begin

Follow these guidelines for the guide:

  • Create a Google Cloud project.

Perform these steps:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud services enable managedkafka.googleapis.com compute.googleapis.com dns.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud services enable managedkafka.googleapis.com compute.googleapis.com dns.googleapis.com

Create a cluster

A Managed Service for Apache Kafka cluster is located in a specific Google Cloud project and region. It can be accessed using a set of IP addresses within one or more subnets in any Virtual Private Cloud (VPC). The cluster's size is determined by the number of CPUs and total RAM that you allocate to it. Disk and broker configuration are automatic and cannot be adjusted.

  1. Get a subnet identifier by using the gcloud compute networks subnets describe command.

    The following command retrieves the self link of the default subnet of your Google Cloud project in the us-central1 region.

    gcloud compute networks subnets describe default --region=us-central1 \
        --format='value(selfLink)' | sed 's|.*/compute/v1/||'
    

    The output of the command is similar to the following: projects/managed-kafka-experimental/regions/us-central1/subnetworks/default

    Here, managed-kafka-experimental is the project ID and us-central1 is the location.

  2. Record these values from your command output as you require them later.

    export PROJECT_ID=PROJECT_ID
    export SUBNET_PATH=SUBNET_PATH
    export CLUSTER_ID=CLUSTER_ID
    

    Replace the following:

    • PROJECT_ID with the name of the project.
    • SUBNET_PATH with the full path of the subnet obtained from step 1. The value of the subnet must be in the format: projects/PROJECT_ID/regions/us-central1/subnetworks/default
    • CLUSTER_ID with the name of the new cluster.
  3. To create a cluster, run the gcloud managed-kafka clusters create command.

    gcloud managed-kafka clusters create $CLUSTER_ID \
        --location=us-central1 \
        --cpu=3 \
        --memory=3GiB \
        --subnets=$SUBNET_PATH \
        --async
    

    You get a response similar to the following:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID] for status.
    
  4. Store the $OPERATION_ID environmental variable to track progress.

    export OPERATION_ID=OPERATION_ID
    

    Replace OPERATION_ID with the value of the operation ID obtained from the previous step.

  5. Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the gcloud managed-kafka clusters create command uses a long running operation (LRO), which you can monitor using the following command:

    curl -X GET \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    "https://managedkafka.googleapis.com/v1/projects/$PROJECT_ID/locations/us-central1/operations/$OPERATION_ID"
    

    You get a response similar to the following:

    {
     "name": "projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID",
     "metadata": {
       "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
       "createTime": "2024-00-00T19:09:59.080076667Z",
       "target": "projects/PROJECT_ID/locations/us-central1/clusters/CLUSTER_ID",
       "verb": "create",
       "requestedCancellation": false,
       "apiVersion": "v1"
     },
     "done": false
     }
    

    When the operation completes, the response done field will be equal to true.

Describe a cluster

  1. After the cluster is created, to get the details of the cluster, run the gcloud managed-kafka clusters describe command.

    gcloud managed-kafka clusters describe $CLUSTER_ID \
        --location=us-central1
    

    The output of the command is similar to the following:

    bootstrapAddress: bootstrap.CLUSTER_ID.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092
    capacityConfig:
      memoryBytes: '3221225472'
      vcpuCount: '3'
    createTime: '2024-05-28T04:32:08.671168869Z'
    gcpConfig:
      accessConfig:
        networkConfigs:
        - subnet: projects/PROJECT_ID/regions/us-central1/subnetworks/default
    name: projects/PROJECT_ID/locations/us-central1/clusters/CLUSTER_ID
    rebalanceConfig:
      mode: AUTO_REBALANCE_ON_SCALE_UP
    state: CREATING
    updateTime: '2024-05-28T04:32:08.671168869Z'
    
  2. The bootstrap URL is the primary way in which Kafka clients identify a server. You can retrieve the bootstrap URL using the following command:

    gcloud managed-kafka clusters describe $CLUSTER_ID \
        --location=us-central1 \
        --format="value(bootstrapAddress)"
    

Create a topic

  • Create a topic in the cluster by running the command gcloud managed-kafka topics create.

    Here, the name of the topic is t1.

    gcloud managed-kafka topics create t1 \
        --cluster=$CLUSTER_ID --location=us-central1 --partitions=10 \
        --replication-factor=3
    

    You get an output similar to the following:

    Created topic [t1]
    

Describe a topic

  • After the topic is created, to get more details about the topic, run the gcloud managed-kafka topics describe command.

    gcloud managed-kafka topics describe t1 \
        --cluster=$CLUSTER_ID --location=us-central1
    

    You get an output similar to the following:

    name: projects/PROJECT_ID/locations/
    us-central1/clusters/CLUSTER_ID/topics/t1
    partitionCount: 10
    replicationFactor: 3
    

Set up a client machine

Set up a client on a Compute Engine instance that can access the VPC containing the default subnet where the Kafka cluster is reachable.

For this section, you require the project number and the project ID of the project where the Kafka cluster is located.

To find the project name and project number for your project, see Find the project name, number, and ID.

  1. Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example, the following command creates a Compute Engine instance called test-instance:

    gcloud compute instances create test-instance \
        --scopes=https://www.googleapis.com/auth/cloud-platform \
        --subnet=$SUBNET_PATH \
        --zone=us-central1-f
    

    For more information about creating a VM, see Create a VM instance in a specific subnet.

  2. Give the Compute Engine default service account the permissions to use Managed Service for Apache Kafka.

    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
        --role=roles/managedkafka.client
    

    Replace the following:

    • PROJECT_NUMBER: The automatically generated unique identifier for your project.
  3. Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:

     gcloud compute ssh test-instance --project=$PROJECT_ID --zone=us-central1-f
    

    Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see About SSH connections.

  4. Install Java to run Kafka command line tools and wget to help download dependencies. The following commands assume you are using a Debian Linux environment.

    sudo apt-get install default-jre wget
    
  5. Install the Kafka command line tools on the VM.

    wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    

    This code downloads and extracts the Apache Kafka distribution and sets the KAFKA_HOME environment variable for convenience, and adds the Kafka bin directory to the PATH variable.

  6. Set up the Managed Service for Apache Kafka authentication library.

    1. Download the dependencies and install them locally. Since the Kafka command line tools look for Java dependencies in the lib directory of the Kafka installation directory, we add these dependencies there.

      wget https://github.com/googleapis/managedkafka/releases/download/v1.0.1/release-and-dependencies.zip
      sudo apt-get install unzip
      unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
      
    2. Set up the client machine configuration properties.

      cat <<EOF> client.properties
      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
      EOF
      

      This code configures a Kafka client for the following settings:

    • Use SASL_SSL for secure communication with the Kafka cluster.

    • Employ OAuth 2.0 bearer tokens for authentication.

    • Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.

Use the Kafka command line tools

Run these commands on the client machine.

  1. Set up the project-ID address as an environment variable.

    export PROJECT_ID=PROJECT_ID
    export CLUSTER_ID=CLUSTER_ID
    

    Replace the following:

    • PROJECT_ID with the name of the project.
    • CLUSTER_ID with the name of the new cluster.
  2. Set up the BOOTSTRAP address as an environment variable. This can be fetched from describing the cluster that was created.

    export BOOTSTRAP=bootstrap.$CLUSTER_ID.us-central1.managedkafka.$PROJECT_ID.cloud.goog:9092
  3. List the topics in the cluster.

    kafka-topics.sh --list \
    --bootstrap-server $BOOTSTRAP \
    --command-config client.properties
    
  4. Write a message to the topic t1 and consume it.

    echo "hello world" | kafka-console-producer.sh --topic t1 \
    --bootstrap-server $BOOTSTRAP --producer.config client.properties
    
  5. Consume the message from topic t1.

     kafka-console-consumer.sh --topic t1 --from-beginning \
     --bootstrap-server $BOOTSTRAP --consumer.config client.properties
    
  6. Run a simpler producer performance test.

    kafka-producer-perf-test.sh --topic t1 --num-records 1000000 \
    --throughput -1 --print-metrics --record-size 1024 \
    --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

  1. To delete the cluster, run the gcloud managed-kafka clusters delete command:

    gcloud managed-kafka clusters delete $CLUSTER_ID --location=us-central1

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.