Develop a Java producer application
Learn how to develop a Java producer application that authenticates with a Managed Service for Apache Kafka cluster by using Application Default Credentials (ADC). ADC lets applications running on Google Cloud automatically find and use the right credentials for authenticating to Google Cloud services.
Before you start this tutorial, follow the steps in Create a cluster in Managed Service for Apache Kafka.
Before you begin
Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka.
If you already have a Managed Service for Apache Kafka cluster, you can skip this step.
Set up a client VM
A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration. You can do this as follows:
To create the client VM, run the following command:
gcloud compute instances create test-instance \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/default \ --zone=REGION-f
Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role (
roles/managedkafka.client
), the Service Account Token Creator role (roles/iam.serviceAccountTokenCreator
), and the Service Account OpenID Token Creator role (roles/iam.serviceAccountOpenIdTokenCreator
).gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/managedkafka.client gcloud projects add-iam-policy-binding PROJECT_ID\ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountTokenCreator gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountOpenIdTokenCreator
Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using
gcloud projects describe PROJECT_ID
.
Set up an Apache Maven project
Connect to the client VM using SSH. One way to do this is to run the following command:
gcloud compute ssh --project=PROJECT_ID\ --zone=REGION-f test-instance
For more information about connecting using SSH, see About SSH connections.
Install Java and Maven with the command:
sudo apt-get install maven openjdk-17-jdk
.Set up an Apache Maven project.
This command will create a package
com.google.example
in a directory calleddemo
.mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
Change into the project directory with
cd demo
.
Create a Java producer application
This section guides you through creating a Java application that
produces messages to a Kafka topic. Write and compile Java code
using Maven, configure necessary parameters in a
kafka-client.properties
file, and then run your
application to send messages.
Write the Producer code
Replace the code in src/main/java/com/google/example/App.java
with
the following
package com.google.example;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
class SendCallback implements Callback {
public void onCompletion(RecordMetadata m, Exception e){
if (e == null){
System.out.println("Produced a message successfully.");
} else {
System.out.println(e.getMessage());
}
}
}
public class App {
public static void main(String[] args) throws Exception {
Properties p = new Properties();
p.load(new java.io.FileReader("kafka-client.properties"));
KafkaProducer producer = new KafkaProducer(p);
ProducerRecord message = new ProducerRecord("topicName", "key", "value");
SendCallback callback = new SendCallback();
producer.send(message,callback);
producer.close();
}
}
Compile the application
To compile this application, you need packages related to Kafka clients generally and authentication logic specific to Google Cloud.
In the demo project directory, you find
pom.xml
with Maven configurations for this project. Add the following lines to the<dependencies>
section ofpom.xml.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency> <dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.5</version> </dependency>
Compile the application with
mvn compile
.
Configure and run the application
The producer expects client configuration parameters in a file called
kafka-client.properties
. Create this file in the demo project directory (the directory containingpom.xml
) with the following contents:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
You are now ready to run the application:
mvn exec:java -Dexec.mainClass="com.google.example.App" --quiet
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
To delete the cluster, run the
gcloud managed-kafka clusters delete
command:
gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION
To delete the VM, run the
gcloud compute instances delete
command:
gcloud instances delete test-instance \
--zone=REGION-f