This document provides high-level guidance on creating and deploying a Dataflow pipeline that streams from Apache Kafka to BigQuery.
Apache Kafka is an open source platform for streaming events. Kafka is commonly used in distributed architectures to enable communication between loosely coupled components. You can use Dataflow to read events from Kafka, process them, and write the results to a BigQuery table for further analysis.
Google provides a Dataflow template that configures a Kafka-to-BigQuery pipeline. The template uses the BigQueryIO connector provided in the Apache Beam SDK.
To use this template, you perform the following steps:
- Deploy Kafka, either in Google Cloud or elsewhere.
- Configure networking.
- Set Identity and Access Management (IAM) permissions.
- Write a function to transform the event data.
- Create the BigQuery output table.
- Deploy the Dataflow template.
Deploy Kafka
Within Google Cloud, you can deploy a Kafka cluster on Compute Engine virtual machine (VM) instances or use a third-party managed Kafka service. For more information about deployment options on Google Cloud, see What is Apache Kafka?. You can find third-party Kafka solutions on the Google Cloud Marketplace.
Alternatively, you might have an existing Kafka cluster that resides outside of Google Cloud. For example, you might have an existing workload that is deployed on-premises or in another public cloud.
Configure networking
By default, Dataflow launches instances within your default Virtual Private Cloud (VPC) network. Depending on your Kafka configuration, you might need to configure a different network and subnet for Dataflow. For more information, see Specify a network and subnetwork. When configuring your network, create firewall rules that allow the Dataflow worker machines to reach the Kafka brokers.
If you are using VPC Service Controls, then place the Kafka cluster within the VPC Service Controls perimeter, or else extend the perimeters to the authorized VPN or Cloud Interconnect.
If your Kafka cluster is deployed outside of Google Cloud, you must create a network connection between Dataflow and the Kafka cluster. There are several networking options with different tradeoffs:
- Connect using a shared RFC 1918 address space, by using one of the following:
- Reach your externally hosted Kafka cluster through public IP addresses, by
using one of the following:
- Public internet
- Direct peering
- Carrier peering
Dedicated Interconnect is the best option for predictable performance and reliability, but it can take longer to set up because third parties must provision the new circuits. With a public IP–based topology, you can get started quickly because little networking work needs to be done.
The next two sections describe these options in more detail.
Shared RFC 1918 address space
Both Dedicated Interconnect and IPsec VPN give you direct access to RFC 1918 IP addresses in your Virtual Private Cloud (VPC), which can simplify your Kafka configuration. If you're using a VPN–based topology, consider setting up a high-throughput VPN.
By default, Dataflow launches instances on your default
VPC network. In a private network topology with
routes explicitly defined in Cloud Router
that connect subnetworks in Google Cloud to that Kafka cluster, you need
more control over where to locate your Dataflow instances. You
can use Dataflow to configure the network
and subnetwork
execution parameters.
Make sure that the corresponding subnetwork has enough IP addresses available for Dataflow to launch instances on when it attempts to scale out. Also, when you create a separate network for launching your Dataflow instances, ensure that you have a firewall rule that enables TCP traffic among all virtual machines in the project. The default network already has this firewall rule configured.
Public IP address space
This architecture uses Transport Layer Security
(TLS) to secure traffic
between external clients and Kafka, and uses unencrypted traffic for inter-broker
communication. When the Kafka listener binds to a network interface that is used
for both internal and external communication, configuring the listener is
straightforward. However, in many scenarios, the externally advertised addresses
of the Kafka brokers in the cluster differ from the internal network interfaces
that Kafka uses. In such scenarios, you can use the advertised.listeners
property:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
External clients connect using port 9093 through an "SSL" channel, and internal
clients connect using port 9092 through a plaintext channel. When you specify an
address under advertised.listeners
, use DNS names
(kafkabroker-n.mydomain.com
, in this sample) that resolve to the same instance
for both external and internal traffic. Using public IP addresses might not work
because the addresses might fail to resolve for internal traffic.
Set IAM permissions
Dataflow jobs use two IAM service accounts:
- The Dataflow service uses a Dataflow service account to manipulate Google Cloud resources, such as creating VMs.
- The Dataflow worker VMs use a worker service account to access your pipeline's files and other resources. This service account needs write access to the BigQuery output table. It also needs access to any other resources that the pipeline job references.
Ensure that these two service accounts have appropriate roles. For more information, see Dataflow security and permissions.
Transform the data for BigQuery
The Kafka-to-BigQuery template creates a pipeline that reads events from one or more Kafka topics and writes them into a BigQuery table. Optionally, you can provide a JavaScript user-defined function (UDF) that transforms the event data before it is written to BigQuery.
The output from the pipeline must be JSON-formatted data that matches the schema of the output table. If the Kafka event data is already in JSON format, then you can create a BigQuery table with a matching schema and pass the events directly to BigQuery. Otherwise, author a UDF that takes the event data as input and returns JSON data that matches your BigQuery table.
For example, suppose the event data contains two fields:
name
(string)customer_id
(integer)
The output from the Dataflow pipeline might look like the following:
{ "name": "Alice", "customer_id": 1234 }
Assuming the event data is not already in JSON format, you would write a UDF that transforms the data, as follows:
// UDF
function process(eventData) {
var name;
var customer_id;
// TODO Parse the event data to extract the name and customer_id fields.
// Return a JSON payload.
return JSON.stringify({ name: name, customer_id: customer_id });
}
The UDF can perform additional processing on the event data, such as filtering events, removing personal identifiable information (PII), or enriching the data with additional fields.
For more information on writing a UDF for the template, see Extend your Dataflow template with UDFs. Upload the JavaScript file to Cloud Storage.
Create the BigQuery output table
Create the BigQuery output table before you run the template. The table schema must be compatible with the JSON output from the pipeline. For each property in the JSON payload, the pipeline writes the value to the BigQuery table column of the same name. Any missing properties in the JSON are interpreted as NULL values.
Using the previous example, the BigQuery table would have the following columns:
Column name | Data type |
---|---|
name |
STRING |
customer_id |
INTEGER |
You can use the
CREATE TABLE
SQL statement to create the table:
CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);
Alternatively, you can specify the table schema by using a JSON definition file. For more information, see Specifying a schema in the BigQuery documentation.
Run the Dataflow job
After you create the BigQuery table, run the Dataflow template.
Console
To create the Dataflow job by using the Google Cloud console, perform the following steps:
- Go to the Dataflow page in the Google Cloud console.
- Click Create job from template.
- In the Job Name field, enter a job name.
- For Regional endpoint, select a region.
- Select the "Kafka to BigQuery" template.
- Under Required parameters, enter the name of the BigQuery output table. The table must already exist and have a valid schema.
Click Show optional parameters and enter values for at least the following parameters:
- The Kafka topic to read the input from.
- The list of Kafka bootstrap servers, separated by commas.
- A service account email.
Enter additional parameters as needed. In particular, you might need to specify the following:
- Networking: To use a VPC network other than the default network, specify the network and subnet.
- UDF: To use a JavaScript UDF, specify the Cloud Storage location of the script and the name of the JavaScript function to invoke.
gcloud
To create the Dataflow job by using the Google Cloud CLI, run the following command:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters inputTopics=KAFKA_TOPICS \ --parameters bootstrapServers=BOOTSTRAP_SERVERS \ --parameters outputTableSpec=OUTPUT_TABLE \ --parameters serviceAccount=IAM_SERVICE_ACCOUNT \ --parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \ --parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \ --network VPC_NETWORK_NAME \ --subnetwork SUBNET_NAME
Replace the following variables:
- JOB_NAME. A job name of your choice.
- LOCATION. The region in which to run the job. For more information about regions and locations, see Dataflow locations.
- KAFKA_TOPICS. A comma-separated list of Kafka topics to read.
- BOOTSTRAP_SERVERS. A comma-separated list of Kafka bootstrap
servers. Example:
127:9092,127.0.0.1:9093
. - OUTPUT_TABLE. The BigQuery output table,
specified as PROJECT_ID:DATASET_NAME.TABLE_NAME.
Example:
my_project:dataset1.table1
. - IAM_SERVICE_ACCOUNT. Optional. The email address of the service account to run the job as.
- UDF_SCRIPT_PATH. Optional. The Cloud Storage path to a
JavaScript file that contains a UDF. Example:
gs://your-bucket/your-function.js
. - UDF_FUNCTION_NAME. Optional. The name of the JavaScript function to call as the UDF.
- VPC_NETWORK_NAME. Optional. The network to which workers will be assigned.
- SUBNET_NAME. Optional. The subnetwork to which workers will be assigned.
Data types
This section describes how to handle various data types in the BigQuery table schema.
Internally, the JSON messages are converted to TableRow
objects,
and the TableRow
field values are translated to BigQuery types.
Scalar types
The following example creates a BigQuery table with different scalar data types, including string, numeric, Boolean, date/time, interval, and geography types:
CREATE TABLE my_dataset.kafka_events (
string_col STRING,
integer_col INT64,
float_col FLOAT64,
decimal_col DECIMAL,
bool_col BOOL,
date_col DATE,
dt_col DATETIME,
ts_col TIMESTAMP,
interval_col INTERVAL,
geo_col GEOGRAPHY
);
Here is a JSON payload with compatible fields:
{
"string_col": "string_val",
"integer_col": 10,
"float_col": 3.142,
"decimal_col": 5.2E11,
"bool_col": true,
"date_col": "2022-07-01",
"dt_col": "2022-07-01 12:00:00.00",
"ts_col": "2022-07-01T12:00:00.00Z",
"interval_col": "0-13 370 48:61:61",
"geo_col": "POINT(1 2)"
}
Notes:
- For a
TIMESTAMP
column, you can use the JavaScriptDate.toJSON
method to format the value. - For the
GEOGRAPHY
column, you can specify the geography using well-known text (WKT) or GeoJSON, formatted as a string. For more information, see Loading geospatial data.
For more information about data types in BigQuery, see Data types.
Arrays
You can store an array in BigQuery by using the
ARRAY
data
type. In the following example, the JSON payload contains a property named
scores
whose value is a JSON array:
{"name":"Emily","scores":[10,7,10,9]}
The following CREATE TABLE
SQL statement creates a BigQuery
table with a compatible schema:
CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);
The resulting table looks like the following:
+-------+-------------+
| name | scores |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+
Structures
The STRUCT
data type in
BigQuery contains an ordered list of named fields. You can use a
STRUCT
to hold JSON objects that follow a consistent schema.
In the following example, the JSON payload contains a property named val
whose
value is a JSON object:
{"name":"Emily","val":{"a":"yes","b":"no"}}
The following CREATE TABLE
SQL statement creates a BigQuery
table with a compatible schema:
CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);
The resulting table looks like the following:
+-------+----------------------+
| name | val |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Semi-structured event data
If the Kafka event data does not follow a strict schema, consider storing it in
BigQuery as a
JSON
data type
(Preview). By storing JSON data as a JSON
data type, you don't need to define
the event schema upfront. After data ingestion, you can query the output table
by using the field access (dot notation) and array access operators.
First, create a table with a JSON
column:
-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);
Then define a JavaScript UDF that wraps the event payload inside a JSON object:
// UDF
function process(eventData) {
var json;
// TODO Convert the event data to JSON.
return JSON.stringify({ "event_data": json });
}
After the data is written to BigQuery, you can query the
individual fields by using the field access operator. For example, the following
query returns the value of the name
field for each record:
SELECT event_data.name FROM my_dataset1.kafka_events;
For more information about using JSON in BigQuery, see Working with JSON data in Google Standard SQL.
Errors and logging
You might experience errors from running the pipeline, or errors while handling individual Kafka events.
For more information about handling pipeline errors, see Pipeline troubleshooting and debugging.
If the job runs successfully but an error occurs when processing an individual Kafka event, the pipeline job writes an error record to a table in BigQuery. The job itself doesn't fail, and the event-level error does not appear as an error in the Dataflow job log.
The pipeline job automatically creates the table to hold error records. By
default, the name of the table is "output_table_error_records", where
output_table is the name of the output table. For example, if the output table
is named kafka_events
, the error table is named kafka_events_error_records
.
You can specify a different name by setting the outputDeadletterTable
template
parameter:
outputDeadletterTable=my_project:dataset1.errors_table
Possible errors include:
- Serialization errors, including badly-formatted JSON.
- Type conversion errors, caused by a mismatch in the table schema and the JSON data.
- Extra fields in the JSON data that are not present in the table schema.
Example error messages:
Type of error | Event data | errorMessage |
---|---|---|
Serialization error | "Hello world" | Failed to serialize json to table row: "Hello world" |
Type conversion error | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
Unknown field | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Next steps
- Learn more about Dataflow templates.
- Get started with BigQuery.