Publish and consume messages with the CLI

Learn how to create an Apache Kafka for BigQuery cluster using the CLI and configure producer and consumer clients to connect to the cluster. The following guide creates a relatively small cluster designed only for testing.

Before you begin

Follow these guidelines for the quickstart:

  • Create the Google Cloud project in the us-central1 region.

Perform these steps:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud services enable managedkafka.googleapis.com compute.googleapis.com dns.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud services enable managedkafka.googleapis.com compute.googleapis.com dns.googleapis.com

Create a cluster

An Apache Kafka for BigQuery cluster is located in a specific Google Cloud project and region. It can be accessed using a set of IP addresses within one or more subnets in any Virtual Private Cloud (VPC). The cluster's size is determined by the number of CPUs and total RAM that you allocate to it. Disk and broker configuration are automatic and cannot be adjusted.

  1. Get a subnet identifier by using the gcloud compute networks subnets describe command.

    The following command retrieves the self link of the default subnet of your Google Cloud project in the us-central1 region.

    gcloud compute networks subnets describe default --region=us-central1 \
    --format='value(selfLink)' | sed 's|.*/compute/v1/||'
    

    The output of the command is similar to the following: projects/managed-kafka-experimental/regions/us-central1/subnetworks/default

    Here, managed-kafka-experimental is the project ID and us-central1 is the location.

    Record these values from your command output as you require them later.

  2. To create a cluster, run the gcloud beta managed-kafka clusters create command.

    gcloud beta managed-kafka clusters create test-cluster \
    --location=us-central1 \
    --cpu=3 \
    --memory=3GiB \
    --subnets=SUBNET_ID \
    --async
    

    Replace SUBNET_ID with the value of the subnet ID obtained from step 1.

    You get a response similar to the following:

    Create request issued for: [test-cluster]
    Check operation [projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID] for status.
    

    Store the OPERATION_ID to track progress.

  3. Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the gcloud beta managed-kafka clusters create command uses a long running operation (LRO), which you can monitor using the following command:

    curl -X GET \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    "https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID"
    

    Replace the following:

    • OPERATION_ID with the value of the operation ID from step 2.
    • PROJECT_ID with the project for your Kafka cluster.

Describe a cluster

  1. After the cluster is created, to get the details of the cluster, run the gcloud beta managed-kafka clusters describe command.

    gcloud beta managed-kafka clusters describe test-cluster \
      --location=us-central1
    

    The output of the command is similar to the following:

    bootstrapAddress: bootstrap.test-cluster.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092
    capacityConfig:
      memoryBytes: '3221225472'
      vcpuCount: '3'
    createTime: '2024-05-28T04:32:08.671168869Z'
    gcpConfig:
      accessConfig:
        networkConfigs:
        - subnet: projects/PROJECT_NUMBER/regions/us-central1/subnetworks/default
    name: projects/PROJECT_ID/locations/us-central1/clusters/test-cluster
    rebalanceConfig:
      mode: AUTO_REBALANCE_ON_SCALE_UP
    state: CREATING
    updateTime: '2024-05-28T04:32:08.671168869Z'
    
  2. The bootstrap URL is the primary way in which Kafka clients identify a server. You can retrieve the bootstrap URL using the following command:

    gcloud beta managed-kafka clusters describe \
    test-cluster --location=us-central1 \
    --format="value(bootstrapAddress)"
    

Create a topic

  • Create a topic in the cluster by running the commandgcloud beta managed-kafka topics create.

    Here, the name of the topic is t1.

    gcloud beta managed-kafka topics create t1 \
    --cluster=test-cluster --location=us-central1 --partitions=10 \
    --replication-factor=3
    

    You get an output similar to the following:

    Created topic [t1]
    

Describe a topic

  • After the topic is created, to get more details about the topic, run the gcloud beta managed-kafka topics describe command.

    gcloud beta managed-kafka topics describe t1 \
    --cluster=test-cluster --location=us-central1
    

    You get an output similar to the following:

    name: projects/PROJECT_ID/locations/us-central1/clusters/test-cluster/topics/t1
    partitionCount: 10
    replicationFactor: 3
    

Set up a client machine

Set up a client on a Compute Engine instance that can access the VPC containing the default subnet where the Kafka cluster is reachable.

  1. Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example:

    gcloud compute instances create test-instance \
     --scopes=https://www.googleapis.com/auth/cloud-platform \
     --subnet=SUBNET_ID \
     --zone=us-central1-f
    

    Replace SUBNET_ID with the value of the subnet ID obtained from step 1 of Create a cluster.

    For more information about creating a VM, see Create a VM instance in a specific subnet.

  2. Use SSH to connect to the VM that you just created in the previous step. For more information about connecting using SSH, see About SSH connections.

  3. Install the Kafka command line tools on the VM.

    Here is an example for installing Kafka version 3.6.2 in a Debian environment.

    sudo apt-get install default-jdk jq
    mkdir ~/Downloads
    wget -O ~/Downloads/kafka_2.13-3.6.2.tgz  https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz
    mkdir ~/kafka_home
    tar xfz ~/Downloads/kafka_2.13-3.6.2.tgz -C ~/kafka_home
    
  4. Set up authentication on your VM with a service account.

    a. Authenticate to Google Cloud CLI on your VM.

      gcloud auth login
    

    b. Create a service account from either your VM or another machine.

      gcloud iam service-accounts create quickstart-sa --project=SERVICE_ACCOUNT_PROJECT_ID
    

    Replace the SERVICE_ACCOUNT_PROJECT_ID with the project ID where you want to create the service account. This does not need to be the same project where your cluster is located.

    c. Grant the roles/managedkafka.client IAM role to the service account from your cluster project from either your VM or another machine.

      gcloud projects add-iam-policy-binding \
      CLUSTER_PROJECT_ID \
      --member="serviceAccount:quickstart-sa@SERVICE_ACCOUNT_PROJECT_ID.iam.gserviceaccount.com" --role=roles/managedkafka.client
    

    Replace the following:

    • CLUSTER_PROJECT_ID: the project ID where the cluster is located.
    • SERVICE_ACCOUNT_PROJECT_ID: the project ID where the service account is located.

    d. Grant the required role to the principal that will attach the service account to other resources from either your VM or another machine.

      gcloud iam service-accounts add-iam-policy-binding \
      --project=SERVICE_ACCOUNT_PROJECT_ID \
      quickstart-sa@SERVICE_ACCOUNT_PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
    

    Replace the following:

    • SERVICE_ACCOUNT_PROJECT_ID: with the project ID where the service account is located.
    • USER_EMAIL: email for the principal you want to attach to the service account.

    e. Generate a service account key on your VM. This will save the key to a file called sa.key.json in your current directory.

      gcloud iam service-accounts keys create sa.key.json \
      --iam-account=quickstart-sa@SERVICE_ACCOUNT_PROJECT_ID.iam.gserviceaccount.com
    

    Replace SERVICE_ACCOUNT_PROJECT_ID with the project ID where the service account is located.

  5. Set up Kafka client credentials. SASL_SLL security protocol with PLAIN authentication mode (username/password) is supported.

    The username in this case is the service account email address and the password is the base64-encoded service account key file.

    Run this command to create a file called client.properties.

    cat <<EOF>> client.properties
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  \\
     username="$(jq -r '.client_email' sa.key.json)" \\
     password="$(cat sa.key.json | base64 -w 0)";
    EOF
    
  1. Include the /bin/ directory in the PATH bash variable.

    This configuration lets you run Kafka commands from your terminal.

   export PATH=$PATH:~/kafka_home/kafka_2.13-3.6.2/bin
  

Use the Kafka command line tools

Run these commands on the client machine.

  1. Set up the BOOTSTRAP address as an environment variable. This can be fetched from describing the cluster that was created.

    export BOOTSTRAP=bootstrap.test-cluster.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092
    

    Replace PROJECT_ID with the value of the project for your Kafka cluster.

  2. List the topics in the cluster.

    kafka-topics.sh --list \
    --bootstrap-server $BOOTSTRAP \
    --command-config client.properties
    
  3. Write a message to the topic t1 and consume it.

    echo "hello world" | kafka-console-producer.sh --topic t1 \
    --bootstrap-server $BOOTSTRAP --producer.config client.properties
    
  4. Consume the message from topic t1.

     kafka-console-consumer.sh --topic t1 --from-beginning \
     --bootstrap-server $BOOTSTRAP --consumer.config client.properties
    
  5. Run a simpler producer performance test.

    kafka-producer-perf-test.sh --topic t1 --num-records 1000000 \
    --throughput -1 --print-metrics --record-size 1024 \
    --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
    

View the monitoring charts

  1. In the Google Cloud console, go to the Clusters page.

    Go to Clusters

    The cluster that you created is listed.

  2. Click the cluster.

    The cluster details page is displayed. In the cluster details page, for the Resources tab, the topic t1 is listed.

  3. Click the topic t1.

    The topic details page is displayed.

  4. Click the Monitoring tab.

    This tab provides visual charts that display key metrics related to the topic's activity and performance.

These time-series charts include the following:

  • Byte count: Rate at which bytes are produced to the topic. This indicates the volume of data written to the topic over time. The corresponding metric is managedkafka.googleapis.com/byte_in_count.

  • Request count: Rate of requests made to the topic. It reflects the overall activity and usage of the topic. The related metric is managedkafka.googleapis.com/topic_request_count.

  • Log segments by partition: Number of active log segments for each partition within the topic. Log segments are the physical files on disk where Kafka stores the topic data. The relevant metric is managedkafka.googleapis.com/log_segments.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

  1. To delete the cluster, run the gcloud beta managed-kafka clusters delete command:

    gcloud beta managed-kafka clusters delete test-cluster --location=us-central1
    

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.