Apache Kafka to BigQuery template

The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. If the errors table does not exist prior to execution, then it is created.

Pipeline requirements

  • The output BigQuery table must exist.
  • The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
  • The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format.

Template parameters

Parameter Description
outputTableSpec The BigQuery output table location to write the Apache Kafka messages to, in the format of my-project:dataset.table
inputTopics The Apache Kafka input topics to read from in a comma-separated list. For example: messages
bootstrapServers The host address of the running Apache Kafka broker servers in a comma-separated list, each host address in the format of 35.70.252.199:9092
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.
outputDeadletterTable (Optional) The BigQuery table for messages that failed to reach the output table, in the format of my-project:dataset.my-deadletter-table. If it doesn't exist, the table is created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF) in JavaScript. The template calls the UDF for each input element. Element payloads are serialized as JSON strings.

To use a UDF, upload the JavaScript file to Cloud Storage and set the following template parameters:

ParameterDescription
javascriptTextTransformGcsPath The Cloud Storage location of the JavaScript file.
javascriptTextTransformFunctionName The name of the JavaScript function.

For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the Kafka record value, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Kafka to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • BIGQUERY_TABLE: your BigQuery table name
  • KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, follow instructions on how to escape commas.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • BIGQUERY_TABLE: your BigQuery table name
  • KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, follow instructions on how to escape commas.

For more information, see Write data from Kafka to BigQuery with Dataflow.