The Apache Kafka to Cloud Storage template is a streaming pipeline that ingests text data from Google Cloud Managed Service for Apache Kafka and outputs the records to Cloud Storage.
You can also use the Apache Kafka to BigQuery template with self-managed or external Kafka.
Pipeline requirements
- The output Cloud Storage bucket must exist.
- The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
- The Apache Kafka topics must exist.
Kafka message format
The Apache Kafka to Cloud Storage template supports reading messages from Kafka in the following formats: CONFLUENT_AVRO_WIRE_FORMAT
and JSON
.
Output file format
The output file format is the same format as the input Kafka message. For example, if you select JSON for the Kafka message format, JSON files are written to the output Cloud Storage bucket.
Authentication
The Apache Kafka to Cloud Storage template supports SASL/PLAIN authentication to Kafka brokers.
Template parameters
Required parameters
- readBootstrapServerAndTopic : Kafka Topic to read the input from.
- kafkaReadAuthenticationMode : The mode of authentication to use with the Kafka cluster. Use NONE for no authentication and SASL_PLAIN for SASL/PLAIN username and password. Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode. Defaults to: SASL_PLAIN.
- outputDirectory : The path and filename prefix for writing output files. Must end with a slash. (Example: gs://your-bucket/your-path/).
- messageFormat : The format of the Kafka messages to read. The supported values are AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry encoded Avro), AVRO_BINARY_ENCODING (Plain binary Avro), and JSON. Defaults to: AVRO_CONFLUENT_WIRE_FORMAT.
Optional parameters
- windowDuration : The window duration/size in which data will be written to Cloud Storage. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h). (Example: 5m). Defaults to: 5m.
- outputFilenamePrefix : The prefix to place on each windowed file. (Example: output-). Defaults to: output.
- numShards : The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. Default value is decided by Dataflow.
- enableCommitOffsets : Commit offsets of processed messages to Kafka. If enabled, this will minimize the gaps or duplicate processing of messages when restarting the pipeline. Requires specifying the Consumer Group ID. Defaults to: false.
- consumerGroupId : The unique identifier for the consumer group that this pipeline belongs to. Required if Commit Offsets to Kafka is enabled. Defaults to empty.
- kafkaReadOffset : The starting point for reading messages when no committed offsets exist. The earliest starts from the beginning, the latest from the newest message. Defaults to: latest.
- kafkaReadUsernameSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka username to use with SASL_PLAIN authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
- kafkaReadPasswordSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka password to use with SASL_PLAIN authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
- schemaFormat : The Kafka schema format. Can be provided as SINGLE_SCHEMA_FILE or SCHEMA_REGISTRY. If SINGLE_SCHEMA_FILE is specified, all messages should have the schema mentioned in the avro schema file. If SCHEMA_REGISTRY is specified, the messages can have either a single schema or multiple schemas. Defaults to: SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath : The Google Cloud Storage path to the single Avro schema file used to decode all of the messages in a topic. Defaults to empty.
- schemaRegistryConnectionUrl : The URL for the Confluent Schema Registry instance used to manage Avro schemas for message decoding. Defaults to empty.
- binaryAvroSchemaPath : The Google Cloud Storage path to the Avro schema file used to decode binary-encoded Avro messages. Defaults to empty.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Kafka to Cloud Storage template.
- In the provided parameter fields, enter your parameter values.
- Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: your Cloud Storage table nameKAFKA_TOPICS
: the Apache Kakfa topic list. If multiple topics are provided, you need to escape commas. Seegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage URI of the.js
file that defines the JavaScript user-defined function (UDF) you want to use—for example,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples.KAFKA_SERVER_ADDRESSES
: the Apache Kafka broker server IP address list. Each IP address needs to have the port number that the server is accessible from. For example:35.70.252.199:9092
. If multiple addresses are provided, you need to escape commas. Seegcloud topic escaping
.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: your Cloud Storage table nameKAFKA_TOPICS
: the Apache Kakfa topic list. If multiple topics are provided, you need to escape commas. Seegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage URI of the.js
file that defines the JavaScript user-defined function (UDF) you want to use—for example,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples.KAFKA_SERVER_ADDRESSES
: the Apache Kafka broker server IP address list. Each IP address needs to have the port number that the server is accessible from. For example:35.70.252.199:9092
. If multiple addresses are provided, you need to escape commas. Seegcloud topic escaping
.
For more information, see Write data from Kafka to Cloud Storage with Dataflow.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.