Apache Kafka to Kafka template

The Apache Kafka to Apache Kafka template creates a streaming pipeline that ingests data as bytes from an Apache Kafka source, and then writes the bytes to an Apache Kafka sink.

Pipeline requirements

  • The Apache Kafka source topic must exist.
  • The Apache Kafka source and sink broker servers must be running and be reachable from the Dataflow worker machines.
  • If you are using Google Cloud Managed Service for Apache Kafka as either a source or a sink, the topic must exist before launching the template.

Kafka message format

The Apache Kafka source messages are read as bytes, and the bytes are written to the Apache Kafka sink.

Authentication

The Apache Kafka to Apache Kafka template supports SASL/PLAIN and TLS authentication to Kafka brokers.

Template parameters

Required parameters

  • readBootstrapServerAndTopic : Kafka Bootstrap server and topic to read the input from. (Example: localhost:9092;topic1,topic2).
  • kafkaReadAuthenticationMode : The mode of authentication to use with the Kafka cluster. Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, TLSfor certificate-based authentication. APPLICATION_DEFAULT_CREDENTIALS should be used only for Google Cloud Apache Kafka for BigQuery cluster since This allow you to authenticate with Google Cloud Apache Kafka for BigQuery using application default credentials.
  • writeBootstrapServerAndTopic : Kafka topic to write the output to.
  • kafkaWriteAuthenticationMethod : The mode of authentication to use with the Kafka cluster. Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, and TLS for certificate-based authentication. Defaults to: APPLICATION_DEFAULT_CREDENTIALS.

Optional parameters

  • enableCommitOffsets : Commit offsets of processed messages to Kafka. If enabled, this will minimize the gaps or duplicate processing of messages when restarting the pipeline. Requires specifying the Consumer Group ID. Defaults to: false.
  • consumerGroupId : The unique identifier for the consumer group that this pipeline belongs to. Required if Commit Offsets to Kafka is enabled. Defaults to empty.
  • kafkaReadOffset : The starting point for reading messages when no committed offsets exist. The earliest starts from the beginning, the latest from the newest message. Defaults to: latest.
  • kafkaReadUsernameSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka username to use with SASL_PLAIN authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
  • kafkaReadPasswordSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka password to use with SASL_PLAIN authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
  • kafkaReadKeystoreLocation : The Google Cloud Storage path to the Java KeyStore (JKS) file that contains the TLS certificate and private key to use when authenticating with the Kafka cluster. (Example: gs://your-bucket/keystore.jks).
  • kafkaReadTruststoreLocation : The Google Cloud Storage path to the Java TrustStore (JKS) file that contains the trusted certificates to use to verify the identity of the Kafka broker.
  • kafkaReadTruststorePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the Java TrustStore (JKS) file for Kafka TLS authentication (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the Java KeyStore (JKS) file for Kafka TLS authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeyPasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the private key within the Java KeyStore (JKS) file for Kafka TLS authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaWriteUsernameSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka username for SASL_PLAIN authentication with the destination Kafka cluster. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
  • kafkaWritePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka password to use for SASL_PLAIN authentication with the destination Kafka cluster. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
  • kafkaWriteKeystoreLocation : The Google Cloud Storage path to the Java KeyStore (JKS) file that contains the TLS certificate and private key for authenticating with the destination Kafka cluster. (Example: gs://
  • kafkaWriteTruststoreLocation : The Google Cloud Storage path to the Java TrustStore (JKS) file that contains the trusted certificates to use to verify the identity of the destination Kafka broker.
  • kafkaWriteTruststorePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the Java TrustStore (JKS) file for TLS authentication with the destination Kafka cluster. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaWriteKeystorePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to access the Java KeyStore (JKS) file to use for TLS authentication with the destination Kafka cluster. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaWriteKeyPasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the private key within the Java KeyStore (JKS) file for TLS authentication with the destination Kafka cluster. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Kafka to Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
  8. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • BIGQUERY_TABLE: your Cloud Storage table name
  • KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, you need to escape commas. See gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address needs to have the port number that the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, you need to escape commas. See gcloud topic escaping.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage",
   }
}
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • BIGQUERY_TABLE: your Cloud Storage table name
  • KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, you need to escape commas. See gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address needs to have the port number that the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, you need to escape commas. See gcloud topic escaping.

For more information, see Write data from Kafka to Cloud Storage with Dataflow.

What's next