The Apache Kafka to BigQuery template is a streaming pipeline that ingests text data from Google Cloud Managed Service for Apache Kafka clusters, and then outputs the resulting records to BigQuery tables. Any errors that occur while inserting data into the output table are inserted into a separate errors table in BigQuery.
You can also use the Apache Kafka to BigQuery template with self-managed or external Kafka.
Pipeline requirements
- The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
- The Apache Kafka topics must exist.
- You must enable the Dataflow, BigQuery, and Cloud Storage APIs. If authentication is required, you must also enable the Secret Manager API.
- Create a BigQuery dataset and table with the appropriate schema for your Kafka input topic. If you're using multiple schemas in the same topic and want to write to multiple tables, you don't need to create the table before configuring the pipeline.
- When the dead-letter (unprocessed messages) queue for the template is enabled, create an empty table that doesn't have a schema for the dead-letter queue.
Kafka message format
The Apache Kafka to BigQuery template supports reading messages from Kafka in the following formats:
CONFLUENT_AVRO_WIRE_FORMAT
, AVRO_BINARY_FORMAT
, and JSON
.
Authentication
The Apache Kafka to BigQuery template supports SASL/PLAIN authentication to Kafka brokers.
Template parameters
Required parameters
- readBootstrapServerAndTopic : Kafka Topic to read the input from.
- writeMode : Write Mode: write records to one table or multiple tables (based on schema). The DYNAMIC_TABLE_NAMES mode is supported only for AVRO_CONFLUENT_WIRE_FORMAT Source Message Format and SCHEMA_REGISTRY Schema Source. The target table name will be auto-generated based on the Avro schema name of each message, it could either be a single schema (creating a single table) or multiple schemas (creating multiple tables). The SINGLE_TABLE_NAME mode writes to a single table (single schema) specified by the user. Defaults to SINGLE_TABLE_NAME.
- kafkaReadAuthenticationMode : The mode of authentication to use with the Kafka cluster. Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, TLSfor certificate-based authentication. APPLICATION_DEFAULT_CREDENTIALS should be used only for Google Cloud Apache Kafka for BigQuery cluster since This allow you to authenticate with Google Cloud Apache Kafka for BigQuery using application default credentials.
- messageFormat : The format of the Kafka messages to read. The supported values are AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry encoded Avro), AVRO_BINARY_ENCODING (Plain binary Avro), and JSON. Defaults to: AVRO_CONFLUENT_WIRE_FORMAT.
- useBigQueryDLQ : If true, failed messages will be written to BigQuery with extra error information. Defaults to: false.
Optional parameters
- outputTableSpec : BigQuery table location to write the output to. The name should be in the format
<project>:<dataset>.<table_name>
. The table's schema must match input objects. - persistKafkaKey : If true, the pipeline will persist the Kafka message key in the BigQuery table, in a
_key
field of typeBYTES
. Default is false (Key is ignored). - outputProject : BigQuery output project in wehich the dataset resides. Tables will be created dynamically in the dataset. Defaults to empty.
- outputDataset : BigQuery output dataset to write the output to. Tables will be created dynamically in the dataset. If the tables are created beforehand, the table names should follow the specified naming convention. The name should be
bqTableNamePrefix + Avro Schema FullName
, each word will be separated by a hyphen '-'. Defaults to empty. - bqTableNamePrefix : Naming prefix to be used while creating BigQuery output tables. Only applicable when using schema registry. Defaults to empty.
- createDisposition : BigQuery CreateDisposition. For example, CREATE_IF_NEEDED, CREATE_NEVER. Defaults to: CREATE_IF_NEEDED.
- writeDisposition : BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Defaults to: WRITE_APPEND.
- useAutoSharding : If true, the pipeline uses auto-sharding when writng to BigQueryThe default value is
true
. - numStorageWriteApiStreams : Specifies the number of write streams, this parameter must be set. Default is 0.
- storageWriteApiTriggeringFrequencySec : Specifies the triggering frequency in seconds, this parameter must be set. Default is 5 seconds.
- useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.
- enableCommitOffsets : Commit offsets of processed messages to Kafka. If enabled, this will minimize the gaps or duplicate processing of messages when restarting the pipeline. Requires specifying the Consumer Group ID. Defaults to: false.
- consumerGroupId : The unique identifier for the consumer group that this pipeline belongs to. Required if Commit Offsets to Kafka is enabled. Defaults to empty.
- kafkaReadOffset : The starting point for reading messages when no committed offsets exist. The earliest starts from the beginning, the latest from the newest message. Defaults to: latest.
- kafkaReadUsernameSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka username to use with SASL_PLAIN authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
- kafkaReadPasswordSecretId : The Google Cloud Secret Manager secret ID that contains the Kafka password to use with SASL_PLAIN authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Defaults to empty.
- kafkaReadKeystoreLocation : The Google Cloud Storage path to the Java KeyStore (JKS) file that contains the TLS certificate and private key to use when authenticating with the Kafka cluster. (Example: gs://your-bucket/keystore.jks).
- kafkaReadTruststoreLocation : The Google Cloud Storage path to the Java TrustStore (JKS) file that contains the trusted certificates to use to verify the identity of the Kafka broker.
- kafkaReadTruststorePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the Java TrustStore (JKS) file for Kafka TLS authentication (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeystorePasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the Java KeyStore (JKS) file for Kafka TLS authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeyPasswordSecretId : The Google Cloud Secret Manager secret ID that contains the password to use to access the private key within the Java KeyStore (JKS) file for Kafka TLS authentication. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- schemaFormat : The Kafka schema format. Can be provided as SINGLE_SCHEMA_FILE or SCHEMA_REGISTRY. If SINGLE_SCHEMA_FILE is specified, all messages should have the schema mentioned in the avro schema file. If SCHEMA_REGISTRY is specified, the messages can have either a single schema or multiple schemas. Defaults to: SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath : The Google Cloud Storage path to the single Avro schema file used to decode all of the messages in a topic. Defaults to empty.
- schemaRegistryConnectionUrl : The URL for the Confluent Schema Registry instance used to manage Avro schemas for message decoding. Defaults to empty.
- binaryAvroSchemaPath : The Google Cloud Storage path to the Avro schema file used to decode binary-encoded Avro messages. Defaults to empty.
- schemaRegistryAuthenticationMode : Schema Registry authentication mode. Can be NONE, TLS or OAUTH. Defaults to: NONE.
- schemaRegistryTruststoreLocation : Location of the SSL certificate where the trust store for authentication to Schema Registry are stored. (Example: /your-bucket/truststore.jks).
- schemaRegistryTruststorePasswordSecretId : SecretId in secret manager where the password to access secret in truststore is stored. (Example: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
- schemaRegistryKeystoreLocation : Keystore location that contains the SSL certificate and private key. (Example: /your-bucket/keystore.jks).
- schemaRegistryKeystorePasswordSecretId : SecretId in secret manager where the password to access the keystore file (Example: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
- schemaRegistryKeyPasswordSecretId : SecretId of password required to access the client's private key stored within the keystore (Example: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
- schemaRegistryOauthClientId : Client ID used to authenticate the Schema Registry client in OAUTH mode. Required for AVRO_CONFLUENT_WIRE_FORMAT message format.
- schemaRegistryOauthClientSecretId : The Google Cloud Secret Manager secret ID that contains the Client Secret to use to authenticate the Schema Registry client in OAUTH mode. Required for AVRO_CONFLUENT_WIRE_FORMAT message format. (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- schemaRegistryOauthScope : The access token scope used to authenticate the Schema Registry client in OAUTH mode. This field is optional, as the request can be made without a scope parameter passed. (Example: openid).
- schemaRegistryOauthTokenEndpointUrl : The HTTP(S)-based URL for the OAuth/OIDC identity provider used to authenticate the Schema Registry client in OAUTH mode. Required for AVRO_CONFLUENT_WIRE_FORMAT message format.
- outputDeadletterTable : Fully Qualified BigQuery table name for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table.The table will be created by the template. (Example: your-project-id:your-dataset.your-table-name).
- javascriptTextTransformGcsPath : The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://my-bucket/my-udfs/my_file.js).
- javascriptTextTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes : Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is 0, UDF reloading is disabled. The default value is 0.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Kafka to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: your BigQuery table nameKAFKA_TOPICS
: the Apache Kakfa topic list. If multiple topics are provided, you need to escape commas. Seegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage URI of the.js
file that defines the JavaScript user-defined function (UDF) you want to use—for example,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples.KAFKA_SERVER_ADDRESSES
: the Apache Kafka broker server IP address list. Each IP address needs the port number that the server is accessible from. For example:35.70.252.199:9092
. If multiple addresses are provided, you need to escape commas. Seegcloud topic escaping
.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: your BigQuery table nameKAFKA_TOPICS
: the Apache Kakfa topic list. If multiple topics are provided, you need to escape commas. Seegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage URI of the.js
file that defines the JavaScript user-defined function (UDF) you want to use—for example,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples.KAFKA_SERVER_ADDRESSES
: the Apache Kafka broker server IP address list. Each IP address needs the port number that the server is accessible from. For example:35.70.252.199:9092
. If multiple addresses are provided, you need to escape commas. Seegcloud topic escaping
.
For more information, see Write data from Kafka to BigQuery with Dataflow.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.