Produce and consume messages with the Kafka command-line tools
Learn how to use the Kafka command-line tools to connect to a Managed Service for Apache Kafka cluster, produce messages, and consume messages.
Before you begin
Before you start this tutorial, create a new Managed Service for Apache Kafka cluster. If you already have a cluster, you can skip this step.
How to create a cluster
Console
- Go to the Managed Service for Apache Kafka > Clusters page.
 - Click Create.
 - In the Cluster name box, enter a name for the cluster.
 - In the Region list, select a location for the cluster.
 - 
        For Network configuration, configure the subnet where the cluster is accessible:
        
- For Project, select your project.
 - For Network, select the VPC network.
 - For Subnet, select the subnet.
 - Click Done.
 
 - Click Create.
 
After you click Create, the cluster state is Creating. When the cluster
      is ready, the state is Active.
    
gcloud
      To create a Kafka cluster, run the
      managed-kafka clusters
      create command.
    
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
Replace the following:
KAFKA_CLUSTER: a name for the Kafka clusterREGION: the location of the clusterPROJECT_ID: your project IDSUBNET_NAME: the subnet where you want to create the cluster, for exampledefault
For information about supported locations, see Managed Service for Apache Kafka locations.
The command runs asynchronously and returns an operation ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
      To track the progress of the create operation, use the
      gcloud managed-kafka
      operations describe command:
    
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
      When the cluster is ready, the output from this command includes the entry state:
      ACTIVE. For more information, see
       Monitor the
      cluster creation operation.
    
Create a client VM
Create a Linux virtual machine (VM) instance in Compute Engine that can access the Kafka cluster. When you configure the VM, set the following options:
Region. Create the VM in the same region as your Kafka cluster.
Subnet. Create the VM in the same VPC network as the subnet that you used in your Kafka cluster configuration. For more information, see View a cluster's subnets.
Access scopes. Assign the
https://www.googleapis.com/auth/cloud-platformaccess scope to the VM. This scope authorizes the VM to send requests to the Managed Kafka API.
The following steps show how to set these options.
Console
In the Google Cloud console, go to the Create an instance page.
In the Machine configuration pane, do the following:
In the Name field, specify a name for your instance. For more information, see Resource naming convention.
In the Region list, select the same region as your Kafka cluster.
In the Zone list, select a zone.
In the navigation menu, click Networking. In the Networking pane that appears, do the following:
Go to the Network interfaces section.
To expand the default network interface, click the arrow.
In the Network field, choose the VPC network.
In the Subnetwork list, select the subnet.
Click Done.
In the navigation menu, click Security. In the Security pane that appears, do the following:
For Access scopes, select Set access for each API.
In the list of access scopes, find the Cloud Platform drop-down list and select Enabled.
Click Create to create the VM.
gcloud
To create the VM instance, use the
gcloud compute instances create
command.
gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE
Replace the following:
- VM_NAME: the name of the VM
 - PROJECT_ID: your project ID
 - REGION: the region where you created the Kafka
cluster, for example 
us-central1 - SUBNET: a subnet in the same VPC network as the subnet that you used in the cluster configuration
 - ZONE: a zone in the region where you created the
cluster, for example 
us-central1-c 
For more information about creating a VM, see Create a VM instance in a specific subnet.
Grant IAM roles
Grant the following Identity and Access Management (IAM) roles to the Compute Engine default service account:
- Managed Kafka Client (
roles/managedkafka.client) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
Console
In the Google Cloud console, go to the IAM page.
Find the row for Compute Engine default service account and click Edit principal.
Click Add another role and select the role Managed Kafka Client. Repeat this step for the Service Account Token Creator and Service Account OpenID Token Creator roles.
Click Save.
gcloud
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator
Replace the following:
PROJECT_ID: your project ID
PROJECT_NUMBER: your project number
To get the project number, run the
gcloud projects describe command:
gcloud projects describe PROJECT_ID
For more information, see Find the project name, number, and ID.
Connect to the VM
Use SSH to connect to the VM instance.
Console
Go to the VM instances page.
In the list of VM instances, find the VM name and click SSH.
gcloud
To connect to the VM, use the
gcloud compute ssh command.
gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE
Replace the following:
- VM_NAME: the name of the VM
 - PROJECT_ID: your project ID
 - ZONE: the zone where you created the VM
 
Additional configuration might be required for first time SSH usage. For more information, see About SSH connections.
Install the Kafka command-line tools
From your SSH session, run the following commands to install the Kafka command-line tools.
Install Java, which is needed to run the Kafka command-line tools, and
wgetto help download dependencies. The following commands assume you are using a Debian Linux environment.sudo apt-get install default-jre wgetInstall the Kafka command-line tools.
wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar xfz kafka_2.13-3.7.2.tgzSet the following environment variables:
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2 export PATH=$PATH:$KAFKA_HOME/bin export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
Set up authentication
From your SSH session, perform the following steps to set up the Managed Service for Apache Kafka authentication library.
Download the library and install it locally.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip sudo apt-get install unzip unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/This command installs the library in the
libdirectory of the Kafka installation directory. The Kafka command-line tools look in this directory for Java dependencies.Using a text editor, create a file named
client.propertiesand paste in the following:security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;Save the file. This file configures a Kafka client with the following settings:
Use SASL_SSL for secure communication with the Kafka cluster.
Use OAuth 2.0 bearer tokens for authentication.
Use the library-provided
GcpLoginCallbackHandlerclass as the login callback handler to obtain OAuth 2.0 tokens.
Produce and consume messages
From your SSH session, run the following commands to produce and consume Kafka messages.
Set the bootstrap address as an environment variable.
export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092Replace the following:
CLUSTER_ID: the name of your clusterREGION: where you created the clusterPROJECT_ID: your project ID
For more information, see Get the bootstrap address.
List the topics in the cluster.
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.propertiesWrite a message to a topic.
echo "hello world" | kafka-console-producer.sh \ --topic KAFKA_TOPIC_NAME \ --bootstrap-server $BOOTSTRAP \ --producer.config client.propertiesReplace KAFKA_TOPIC_NAME with the name of the topic.
Consume messages from the topic.
kafka-console-consumer.sh \ --topic KAFKA_TOPIC_NAME \ --from-beginning \ --bootstrap-server $BOOTSTRAP \ --consumer.config client.propertiesTo stop consuming messages, enter Ctrl+C.
Run a producer performance test.
kafka-producer-perf-test.sh \ --topic KAFKA_TOPIC_NAME \ --num-records 1000000 --throughput -1 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP \ --producer.config client.properties
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
Console
Delete the VM instance.
Go to the VM instances page.
Select the VM and click Delete.
Delete the Kafka cluster.
Go to the Managed Service for Apache Kafka > Clusters page.
Select the Kafka cluster and click Delete.
gcloud
To delete the VM, use the
gcloud compute instances deletecommand.gcloud compute instances delete VM_NAME --zone=ZONETo delete the Kafka cluster, use the
gcloud managed-kafka clusters deletecommand.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
What's next
- Overview of Managed Service for Apache Kafka
 - Authenticate to Managed Service for Apache Kafka
 - Configure networking