Produce and consume messages with the Kafka command-line tools
Learn how to use the Kafka command-line tools to connect to a Managed Service for Apache Kafka cluster, produce messages, and consume messages.
Before you begin
Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka.
If you already have a Managed Service for Apache Kafka cluster, you can skip this step.
Set up a client machine
Set up a client on a Compute Engine instance that can access the VPC
containing the default
subnet where the Kafka cluster is reachable.
For this section, you require the project number and the project ID of the project where the Kafka cluster is located.
To find the project name and project number for your project, see Find the project name, number, and ID.
Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example, the following command creates a Compute Engine instance called
test-instance
:gcloud compute instances create test-instance \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/default \ --zone=REGION-c
For more information about creating a VM, see Create a VM instance in a specific subnet.
Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role (
roles/managedkafka.client
), the Service Account Token Creator role (roles/iam.serviceAccountTokenCreator
), and the Service Account OpenID Token Creator role (roles/iam.serviceAccountOpenIdTokenCreator
).gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/managedkafka.client gcloud projects add-iam-policy-binding PROJECT_ID\ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountTokenCreator gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountOpenIdTokenCreator
Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using
gcloud projects describe PROJECT_ID
.Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:
gcloud compute ssh test-instance --project=PROJECT_ID --zone=REGION-c
Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see About SSH connections.
Install Java to run Kafka command line tools and wget to help download dependencies. The following commands assume you are using a Debian Linux environment.
sudo apt-get install default-jre wget
Install the Kafka command line tools on the VM.
wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar xfz kafka_2.13-3.7.2.tgz export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2 export PATH=$PATH:$KAFKA_HOME/bin export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
This code downloads and extracts the Apache Kafka distribution, sets the
KAFKA_HOME
environment variable for convenience, adds the Kafkabin
directory to thePATH
variable, and explicitly sets theCLASSPATH
to include the downloaded files.Set up the Managed Service for Apache Kafka authentication library.
Download the dependencies and install them locally. Since the Kafka command line tools look for Java dependencies in the
lib
directory of the Kafka installation directory, we add these dependencies there.wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip sudo apt-get install unzip unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
Set up the client machine configuration properties.
cat <<EOF> client.properties security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; EOF
This code configures a Kafka client for the following settings:
Use SASL_SSL for secure communication with the Kafka cluster.
Employ OAuth 2.0 bearer tokens for authentication.
Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.
Send and consume messages
Run these commands on the client machine.
Set up the project-ID address as an environment variable.
export PROJECT_ID=PROJECT_ID export CLUSTER_ID=CLUSTER_ID
Replace the following:
PROJECT_ID
with the name of the project.CLUSTER_ID
with the name of the new cluster.
Set the bootstrap address as an environment variable.
export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
For more information, see Get the bootstrap address.
List the topics in the cluster.
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.properties
Write a message to a topic.
echo "hello world" | kafka-console-producer.sh --topic KAFKA_TOPIC_NAME \ --bootstrap-server $BOOTSTRAP --producer.config client.properties
Replace KAFKA_TOPIC_NAME with the name of the topic.
Consume message from the topic.
kafka-console-consumer.sh --topic KAFKA_TOPIC_NAME --from-beginning \ --bootstrap-server $BOOTSTRAP --consumer.config client.properties
To stop consuming messages, enter Ctrl+C.
Run a producer performance test.
kafka-producer-perf-test.sh --topic KAFKA_TOPIC_NAME --num-records 1000000 \ --throughput -1 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
To delete the cluster, run the
gcloud managed-kafka clusters delete
command:gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION