Write data from Kafka to BigQuery with Dataflow

This document provides high-level guidance on creating and deploying a Dataflow pipeline that streams from Apache Kafka to BigQuery.

Apache Kafka is an open source platform for streaming events. Kafka is commonly used in distributed architectures to enable communication between loosely coupled components. You can use Dataflow to read events from Kafka, process them, and write the results to a BigQuery table for further analysis.

Reading Kafka events into BigQuery

Google provides a Dataflow template that configures a Kafka-to-BigQuery pipeline. The template uses the BigQueryIO connector provided in the Apache Beam SDK.

To use this template, you perform the following steps:

  1. Deploy Kafka, either in Google Cloud or elsewhere.
  2. Configure networking.
  3. Set Identity and Access Management (IAM) permissions.
  4. Write a function to transform the event data.
  5. Create the BigQuery output table.
  6. Deploy the Dataflow template.

Deploy Kafka

Within Google Cloud, you can deploy a Kafka cluster on Compute Engine virtual machine (VM) instances or use a third-party managed Kafka service. For more information about deployment options on Google Cloud, see What is Apache Kafka?. You can find third-party Kafka solutions on the Google Cloud Marketplace.

Alternatively, you might have an existing Kafka cluster that resides outside of Google Cloud. For example, you might have an existing workload that is deployed on-premises or in another public cloud.

Configure networking

By default, Dataflow launches instances within your default Virtual Private Cloud (VPC) network. Depending on your Kafka configuration, you might need to configure a different network and subnet for Dataflow. For more information, see Specify a network and subnetwork in the Dataflow documentation. When configuring your network, create firewall rules that allow the Dataflow worker machines to reach the Kafka brokers.

If you are using VPC Service Controls, then place the Kafka cluster within the VPC Service Controls perimeter, or else extend the perimeters to the authorized VPN or Cloud Interconnect.

Connect to an external cluster

If your Kafka cluster is deployed outside of Google Cloud, you must create a network connection between Dataflow and the Kafka cluster. There are several networking options with different tradeoffs:

Dedicated Interconnect is the best option for predictable performance and reliability, but it can take longer to set up because third parties must provision the new circuits. With a public IP–based topology, you can get started quickly because little networking work needs to be done.

The next two sections describe these options in more detail.

Shared RFC 1918 address space

Both Dedicated Interconnect and IPsec VPN give you direct access to RFC 1918 IP addresses in your Virtual Private Cloud (VPC), which can simplify your Kafka configuration. If you're using a VPN–based topology, consider setting up a high-throughput VPN.

By default, Dataflow launches instances on your default VPC network. In a private network topology with routes explicitly defined in Cloud Router that connect subnetworks in Google Cloud to that Kafka cluster, you need more control over where to locate your Dataflow instances. You can use Dataflow to configure the network and subnetwork execution parameters.

Make sure that the corresponding subnetwork has enough IP addresses available for Dataflow to launch instances on as it attempts to scale out. Also, when you create a separate network for launching your Dataflow instances, ensure that you have a firewall rule that enables TCP traffic among all virtual machines in the project. The default network already has this firewall rule configured.

Public IP address space

This architecture uses Transport Layer Security (TLS) to secure traffic between external clients and Kafka, and uses plaintext for inter-broker communication. When the Kafka listener binds to a network interface that is used for both internal and external communication, configuring the listener is straightforward. However, in many scenarios, the externally advertised addresses of the Kafka brokers in the cluster differ from the internal network interfaces that Kafka uses. In such scenarios, you can use the advertised.listeners property:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

External clients connect using port 9093 through an "SSL" channel, and internal clients connect using port 9092 through a plaintext channel. When you specify an address under advertised.listeners, use DNS names (kafkabroker-n.mydomain.com, in this sample) that resolve to the same instance for both external and internal traffic. Using public IP addresses might not work because the addresses might fail to resolve for internal traffic.

Set IAM permissions

Dataflow jobs use two IAM service accounts:

  • The Dataflow service uses a Dataflow service account to manipulate Google Cloud resources, such as creating VMs.
  • The Dataflow worker VMs use a worker service account to access your pipeline's files and other resources. This service account needs write access to the BigQuery output table. It also needs access to any other resources that the pipeline job references.

Ensure that these two service accounts have appropriate roles. For more information, see Dataflow security and permissions.

Transform the data for BigQuery

The Kafka-to-BigQuery template creates a pipeline that reads events from one or more Kafka topics and writes them into a BigQuery table. Optionally, you can provide a JavaScript user-defined function (UDF) that transforms the event data before it is written to BigQuery.

The output from the pipeline must be JSON-formatted data that matches the schema of the output table. If the Kafka event data is already in JSON format, then you can create a BigQuery table with a matching schema and pass the events directly to BigQuery. Otherwise, author a UDF that takes the event data as input and returns JSON data that matches your BigQuery table.

For example, suppose the event data contains two fields:

  • name (string)
  • customer_id (integer)

The output from the Dataflow pipeline might look like the following:

{ "name": "Alice", "customer_id": 1234 }

Assuming the event data is not already in JSON format, you would write a UDF that transforms the data, as follows:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

The UDF can perform additional processing on the event data, such as filtering events, removing personal identifiable information (PII), or enriching the data with additional fields.

For more information on writing a UDF for the template, see Extend your Dataflow template with UDFs. Upload the JavaScript file to Cloud Storage.

Create the BigQuery output table

Create the BigQuery output table before you run the template. The table schema must be compatible with the JSON output from the pipeline. For each property in the JSON payload, the pipeline writes the value to the BigQuery table column of the same name. Any missing properties in the JSON are interpreted as NULL values.

Using the previous example, the BigQuery table would have the following columns:

Column name Data type
name STRING
customer_id INTEGER

You can use the CREATE TABLE SQL statement to create the table:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Alternatively, you can specify the table schema by using a JSON definition file. For more information, see Specifying a schema in the BigQuery documentation.

Run the Dataflow job

After you create the BigQuery table, run the Dataflow template.

Console

To create the Dataflow job by using the Google Cloud console, perform the following steps:

  1. Go to the Dataflow page in the Google Cloud console.
  2. Click Create job from template.
  3. In the Job Name field, enter a job name.
  4. For Regional endpoint, select a region.
  5. Select the "Kafka to BigQuery" template.
  6. Under Required parameters, enter the name of the BigQuery output table. The table must already exist and have a valid schema.
  7. Click Show optional parameters and enter values for at least the following parameters:

    • The Kafka topic to read the input from.
    • The list of Kafka bootstrap servers, separated by commas.
    • A service account email.

    Enter additional parameters as needed. In particular, you might need to specify the following:

    • Networking: To use a VPC network other than the default network, specify the network and subnet.
    • UDF: To use a JavaScript UDF, specify the Cloud Storage location of the script and the name of the JavaScript function to invoke.

gcloud

To create the Dataflow job by using the Google Cloud CLI, run the following command:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Replace the following variables:

  • JOB_NAME. A job name of your choice.
  • LOCATION. The region in which to run the job. For more information about regions and locations, see Dataflow locations.
  • KAFKA_TOPICS. A comma-separated list of Kafka topics to read.
  • BOOTSTRAP_SERVERS. A comma-separated list of Kafka bootstrap servers. Example: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. The BigQuery output table, specified as PROJECT_ID:DATASET_NAME.TABLE_NAME. Example: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Optional. The email address of the service account to run the job as.
  • UDF_SCRIPT_PATH. Optional. The Cloud Storage path to a JavaScript file that contains a UDF. Example: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Optional. The name of the JavaScript function to call as the UDF.
  • VPC_NETWORK_NAME. Optional. The network to which workers will be assigned.
  • SUBNET_NAME. Optional. The subnetwork to which workers will be assigned.

Data types

This section describes how to handle various data types in the BigQuery table schema.

Internally, the JSON messages are converted to TableRow objects, and the TableRow field values are translated to BigQuery types.

Scalar types

The following example creates a BigQuery table with different scalar data types, including string, numeric, Boolean, date/time, interval, and geography types:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Here is a JSON payload with compatible fields:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Notes:

  • For a TIMESTAMP column, you can use the JavaScript Date.toJSON method to format the value.
  • For the GEOGRAPHY column, you can specify the geography using well-known text (WKT) or GeoJSON, formatted as a string. For more information, see Loading geospatial data.

For more information about data types in BigQuery, see Data types.

Arrays

You can store an array in BigQuery by using the ARRAY data type. In the following example, the JSON payload contains a property named scores whose value is a JSON array:

{"name":"Emily","scores":[10,7,10,9]}

The following CREATE TABLE SQL statement creates a BigQuery table with a compatible schema:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

The resulting table looks like the following:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Structures

The STRUCT data type in BigQuery contains an ordered list of named fields. You can use a STRUCT to hold JSON objects that follow a consistent schema.

In the following example, the JSON payload contains a property named val whose value is a JSON object:

{"name":"Emily","val":{"a":"yes","b":"no"}}

The following CREATE TABLE SQL statement creates a BigQuery table with a compatible schema:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

The resulting table looks like the following:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Semi-structured event data

If the Kafka event data does not follow a strict schema, consider storing it in BigQuery as a JSON data type (Preview). By storing JSON data as a JSON data type, you don't need to define the event schema upfront. After data ingestion, you can query the output table by using the field access (dot notation) and array access operators.

First, create a table with a JSON column:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Then define a JavaScript UDF that wraps the event payload inside a JSON object:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

After the data is written to BigQuery, you can query the individual fields by using the field access operator. For example, the following query returns the value of the name field for each record:

SELECT event_data.name FROM my_dataset1.kafka_events;

For more information about using JSON in BigQuery, see Working with JSON data in Google Standard SQL.

Errors and logging

You might experience errors from running the pipeline, or errors while handling individual Kafka events.

For more information about handling pipeline errors, see Pipeline troubleshooting and debugging.

If the job runs successfully but an error occurs when processing an individual Kafka event, the pipeline job writes an error record to a table in BigQuery. The job itself doesn't fail, and the event-level error does not appear as an error in the Dataflow job log.

The pipeline job automatically creates the table to hold error records. By default, the name of the table is "output_table_error_records", where output_table is the name of the output table. For example, if the output table is named kafka_events, the error table is named kafka_events_error_records. You can specify a different name by setting the outputDeadletterTable template parameter:

outputDeadletterTable=my_project:dataset1.errors_table

Possible errors include:

  • Serialization errors, including badly-formatted JSON.
  • Type conversion errors, caused by a mismatch in the table schema and the JSON data.
  • Extra fields in the JSON data that are not present in the table schema.

Example error messages:

Type of error Event data errorMessage
Serialization error "Hello world" Failed to serialize json to table row: "Hello world"
Type conversion error {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Unknown field {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Next steps