Publish and consume messages with the CLI
Learn how to create a Google Cloud Managed Service for Apache Kafka cluster using the CLI and configure producer and consumer clients to connect to the cluster. The following guide creates a relatively small cluster designed only for testing.
Before you begin
Follow these guidelines for the guide:
- Create a Google Cloud project.
Perform these steps:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com
Create a cluster
A Managed Service for Apache Kafka cluster is located in a specific Google Cloud project and region. It can be accessed using a set of IP addresses within one or more subnets in any Virtual Private Cloud (VPC). The cluster's size is determined by the number of CPUs and total RAM that you allocate to it. Disk and broker configuration are automatic and cannot be adjusted.
Get a subnet identifier by using the
gcloud compute networks subnets describe
command.The following command retrieves the self link of the
default
subnet of your Google Cloud project in theus-central1
region.gcloud compute networks subnets describe default --region=us-central1 \ --format='value(selfLink)' | sed 's|.*/compute/v1/||'
The output of the command is similar to the following:
projects/managed-kafka-experimental/regions/us-central1/subnetworks/default
Here,
managed-kafka-experimental
is the project ID andus-central1
is the location.Record these values from your command output as you require them later.
export PROJECT_ID=PROJECT_ID export SUBNET_PATH=SUBNET_PATH export CLUSTER_ID=CLUSTER_ID
Replace the following:
PROJECT_ID
with the name of the project.SUBNET_PATH
with the full path of the subnet obtained from step 1. The value of the subnet must be in the format:projects/PROJECT_ID/regions/us-central1/subnetworks/default
CLUSTER_ID
with the name of the new cluster.
To create a cluster, run the
gcloud beta managed-kafka clusters create
command.gcloud beta managed-kafka clusters create $CLUSTER_ID \ --location=us-central1 \ --cpu=3 \ --memory=3GiB \ --subnets=$SUBNET_PATH \ --async
You get a response similar to the following:
Create request issued for: [CLUSTER_ID] Check operation [projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID] for status.
Store the
$OPERATION_ID
environmental variable to track progress.export OPERATION_ID=OPERATION_ID
Replace
OPERATION_ID
with the value of the operation ID obtained from the previous step.Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the
gcloud beta managed-kafka clusters create
command uses a long running operation (LRO), which you can monitor using the following command:curl -X GET \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ "https://managedkafka.googleapis.com/v1/projects/$PROJECT_ID/locations/us-central1/operations/$OPERATION_ID"
You get a response similar to the following:
{ "name": "projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata", "createTime": "2024-00-00T19:09:59.080076667Z", "target": "projects/PROJECT_ID/locations/us-central1/clusters/CLUSTER_ID", "verb": "create", "requestedCancellation": false, "apiVersion": "v1" }, "done": false }
When the operation completes, the response
done
field will be equal totrue
.
Describe a cluster
After the cluster is created, to get the details of the cluster, run the
gcloud beta managed-kafka clusters describe
command.gcloud beta managed-kafka clusters describe $CLUSTER_ID \ --location=us-central1
The output of the command is similar to the following:
bootstrapAddress: bootstrap.CLUSTER_ID.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 capacityConfig: memoryBytes: '3221225472' vcpuCount: '3' createTime: '2024-05-28T04:32:08.671168869Z' gcpConfig: accessConfig: networkConfigs: - subnet: projects/PROJECT_ID/regions/us-central1/subnetworks/default name: projects/PROJECT_ID/locations/us-central1/clusters/CLUSTER_ID rebalanceConfig: mode: AUTO_REBALANCE_ON_SCALE_UP state: CREATING updateTime: '2024-05-28T04:32:08.671168869Z'
The bootstrap URL is the primary way in which Kafka clients identify a server. You can retrieve the bootstrap URL using the following command:
gcloud beta managed-kafka clusters describe $CLUSTER_ID \ --location=us-central1 \ --format="value(bootstrapAddress)"
Create a topic
Create a topic in the cluster by running the command
gcloud beta managed-kafka topics create
.Here, the name of the topic is
t1
.gcloud beta managed-kafka topics create t1 \ --cluster=$CLUSTER_ID --location=us-central1 --partitions=10 \ --replication-factor=3
You get an output similar to the following:
Created topic [t1]
Describe a topic
After the topic is created, to get more details about the topic, run the
gcloud beta managed-kafka topics describe
command.gcloud beta managed-kafka topics describe t1 \ --cluster=$CLUSTER_ID --location=us-central1
You get an output similar to the following:
name: projects/PROJECT_ID/locations/ us-central1/clusters/CLUSTER_ID/topics/t1 partitionCount: 10 replicationFactor: 3
Set up a client machine
Set up a client on a Compute Engine instance that can access the VPC
containing the default
subnet where the Kafka cluster is reachable.
For this section, you require the project number and the project ID of the project where the Kafka cluster is located.
To find the project name and project number for your project, see Find the project name, number, and ID.
Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example, the following command creates a Compute Engine instance called
test-instance
:gcloud compute instances create test-instance \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --subnet=$SUBNET_PATH \ --zone=us-central1-f
For more information about creating a VM, see Create a VM instance in a specific subnet.
Give the Compute Engine default service account the permissions to use Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/managedkafka.client
Replace the following:
PROJECT_NUMBER
: The automatically generated unique identifier for your project.
Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:
gcloud compute ssh test-instance --project=$PROJECT_ID --zone=us-central1-f
Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see About SSH connections.
Install Java to run Kafka command line tools and wget to help download dependencies. The following commands assume you are using a Debian Linux environment.
sudo apt-get install default-jre wget
Install the Kafka command line tools on the VM.
wget -O kafka_2.13-3.7.1.tgz https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz tar xfz kafka_2.13-3.7.1.tgz export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.1 export PATH=$PATH:$KAFKA_HOME/bin
This code downloads and extracts the Apache Kafka distribution and sets the
KAFKA_HOME
environment variable for convenience, and adds the Kafkabin
directory to thePATH
variable.Set up the Managed Service for Apache Kafka authentication library.
Download the dependencies and install them locally. Since the Kafka command line tools look for Java dependencies in the
lib
directory of the Kafka installation directory, we add these dependencies there.wget https://github.com/googleapis/managedkafka/releases/download/v1.0.1/release-and-dependencies.zip sudo apt-get install unzip unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
Set up the client machine configuration properties.
cat <<EOF> client.properties security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; EOF
This code configures a Kafka client for the following settings:
Use SASL_SSL for secure communication with the Kafka cluster.
Employ OAuth 2.0 bearer tokens for authentication.
Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.
Use the Kafka command line tools
Run these commands on the client machine.
Set up the project-ID address as an environment variable.
export PROJECT_ID=PROJECT_ID export CLUSTER_ID=CLUSTER_ID
Replace the following:
PROJECT_ID
with the name of the project.CLUSTER_ID
with the name of the new cluster.
Set up the BOOTSTRAP address as an environment variable. This can be fetched from describing the cluster that was created.
export BOOTSTRAP=bootstrap.$CLUSTER_ID.us-central1.managedkafka.$PROJECT_ID.cloud.goog:9092
List the topics in the cluster.
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.properties
Write a message to the topic
t1
and consume it.echo "hello world" | kafka-console-producer.sh --topic t1 \ --bootstrap-server $BOOTSTRAP --producer.config client.properties
Consume the message from topic
t1
.kafka-console-consumer.sh --topic t1 --from-beginning \ --bootstrap-server $BOOTSTRAP --consumer.config client.properties
Run a simpler producer performance test.
kafka-producer-perf-test.sh --topic t1 --num-records 1000000 \ --throughput -1 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
To delete the cluster, run the
gcloud beta managed-kafka clusters delete
command:gcloud beta managed-kafka clusters delete $CLUSTER_ID --location=us-central1