* Copyright (C) 2019 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
package com.google.cloud.teleport.v2.templates;
import static com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform.readFromKafka;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.templates.KafkaToBigQuery.KafkaToBQOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteKafkaMessageErrors;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The {@link KafkaToBigQuery} pipeline is a streaming pipeline which ingests text data from Kafka,
* executes a UDF, and outputs the resulting records to BigQuery. Any errors which occur in the
* transformation of the data, execution of the UDF, or inserting into the output table will be
* inserted into a separate errors table in BigQuery. The errors table will be created if it does
* not exist prior to execution. Both output and error tables are specified by the user as
* parameters.
* <p><b>Pipeline Requirements</b>
* <ul>
* <li>The Kafka topic exists and the message is encoded in a valid JSON format.
* <li>The BigQuery output table exists.
* <li>The Kafka brokers are reachable from the Dataflow worker machines.
* </ul>
* <p><b>Example Usage</b>
* <pre>
* # Set some environment variables
* PROJECT=my-project
* TEMP_BUCKET=my-temp-bucket
* OUTPUT_TABLE=${PROJECT}:my_dataset.my_table
* TOPICS=my-topics
* JS_PATH=my-js-path-on-gcs
* JS_FUNC_NAME=my-js-func-name
* BOOTSTRAP=my-comma-separated-bootstrap-servers
* # Set containerization vars
* IMAGE_NAME=my-image-name
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
* # Create an image spec in GCS that contains the path to the image
* {
* "docker_template_spec": {
* "docker_image": $TARGET_GCR_IMAGE
* }
* }
* # Execute template:
* API_ROOT_URL="https://dataflow.googleapis.com"
* TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch"
* JOB_NAME="kafka-to-bigquery`date +%Y%m%d-%H%M%S-%N`"
* time curl -X POST -H "Content-Type: application/json" \
* -H "Authorization: Bearer $(gcloud auth print-access-token)" \
* `"?validateOnly=false"`
* `"&dynamicTemplate.gcsPath=${TEMP_BUCKET}/path/to/image-spec"`
* `"&dynamicTemplate.stagingLocation=${TEMP_BUCKET}/staging" \
* -d '
* {
* "jobName":"'$JOB_NAME'",
* "parameters": {
* "outputTableSpec":"'$OUTPUT_TABLE'",
* "inputTopics":"'$TOPICS'",
* "javascriptTextTransformGcsPath":"'$JS_PATH'",
* "javascriptTextTransformFunctionName":"'$JS_FUNC_NAME'",
* "bootstrapServers":"'$BOOTSTRAP'"
* }
* }
* '
* </pre>
name = "Kafka_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Kafka to BigQuery",
description =
"A streaming pipeline which ingests data in JSON format from Kafka, performs a transform"
+ " via a user defined JavaScript function, and writes to a pre-existing BigQuery"
+ " table.",
optionsClass = KafkaToBQOptions.class,
flexContainerName = "kafka-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public class KafkaToBigQuery {
/* Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(KafkaToBigQuery.class);
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<KV<String, String>, String>> UDF_OUT =
new TupleTag<FailsafeElement<KV<String, String>, String>>() {};
/** The tag for the main output of the json transformation. */
static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
static final TupleTag<FailsafeElement<KV<String, String>, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<KV<String, String>, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
static final TupleTag<FailsafeElement<KV<String, String>, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<KV<String, String>, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** String/String Coder for FailsafeElement. */
private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));
* The {@link KafkaToBQOptions} class provides the custom execution options passed by the executor
* at the command-line.
public interface KafkaToBQOptions
extends KafkaReadOptions,
BigQueryStorageApiStreamingOptions {
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The name should be in the format "
+ "<project>:<dataset>.<table_name>. The table's schema must match input objects.")
String getOutputTableSpec();
void setOutputTableSpec(String outputTableSpec);
* Get bootstrap server across releases.
* @deprecated This method is no longer acceptable to get bootstrap servers.
* <p>Use {@link KafkaToBQOptions#getReadBootstrapServers()} instead.
order = 2,
optional = true,
regexes = {"[,:a-zA-Z0-9._-]+"},
description = "Kafka Bootstrap Server list",
helpText = "Kafka Bootstrap Server list, separated by commas.",
example = "localhost:9092,")
String getBootstrapServers();
* Get bootstrap server across releases.
* @deprecated This method is no longer acceptable to set bootstrap servers.
* <p>Use {@link KafkaToBQOptions#setReadBootstrapServers()} instead.
void setBootstrapServers(String bootstrapServers);
* Get bootstrap server across releases.
* @deprecated This method is no longer acceptable to get Input topics.
* <p>Use {@link KafkaToBQOptions#getKafkaReadTopics()} instead.
order = 3,
optional = true,
regexes = {"[,a-zA-Z0-9._-]+"},
description = "Kafka topic(s) to read the input from",
helpText = "Kafka topic(s) to read the input from.",
example = "topic1,topic2")
String getInputTopics();
* Get bootstrap server across releases.
* @deprecated This method is no longer acceptable to set Input topics.
* <p>Use {@link KafkaToBQOptions#getKafkaReadTopics()} instead.
void setInputTopics(String inputTopics);
order = 4,
optional = true,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. If it doesn't exist, it will"
+ " be created during pipeline execution.",
example = "your-project-id:your-dataset.your-table-name")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String outputDeadletterTable);
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* KafkaToBigQuery#run(KafkaToBQOptions)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
* @param args The command-line args passed by the executor.
public static void main(String[] args) {
KafkaToBQOptions options =
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
* @param options The execution options.
* @return The pipeline result.
public static PipelineResult run(KafkaToBQOptions options) {
// Validate BQ STORAGE_WRITE_API options
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<KV<String, String>, String> coder =
NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
List<String> topicsList;
if (options.getKafkaReadTopics() != null) {
topicsList = new ArrayList<>(Arrays.asList(options.getKafkaReadTopics().split(",")));
} else if (options.getInputTopics() != null) {
topicsList = new ArrayList<>(Arrays.asList(options.getInputTopics().split(",")));
} else {
throw new IllegalArgumentException("Please Provide --kafkaReadTopic");
String bootstrapServers;
if (options.getReadBootstrapServers() != null) {
bootstrapServers = options.getReadBootstrapServers();
} else if (options.getBootstrapServers() != null) {
bootstrapServers = options.getBootstrapServers();
} else {
throw new IllegalArgumentException("Please Provide --bootstrapServers");
* Steps:
* 1) Read messages in from Kafka
* 2) Transform the messages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
PCollectionTuple convertedTableRows =
* Step #1: Read messages in from Kafka
ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
* Step #2: Transform the Kafka Messages into TableRows
.apply("ConvertMessageToTableRow", new MessageToTableRow(options));
* Step #3: Write the successful records out to BigQuery
WriteResult writeResult =
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
* Step #4: Write failed records out to BigQuery
.apply("Flatten", Flatten.pCollections())
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
* Step #5: Insert records that failed BigQuery inserts into a deadletter table.
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
return pipeline.run();
* The {@link MessageToTableRow} class is a {@link PTransform} which transforms incoming Kafka
* Message objects into {@link TableRow} objects for insertion into BigQuery while applying a UDF
* to the input. The executions of the UDF and transformation to {@link TableRow} objects is done
* in a fail-safe way by wrapping the element with it's original payload inside the {@link
* FailsafeElement} class. The {@link MessageToTableRow} transform will output a {@link
* PCollectionTuple} which contains all output and dead-letter {@link PCollection}.
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
* <ul>
* <li>{@link KafkaToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the UDF.
* <li>{@link KafkaToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement} records
* which failed processing during the UDF execution.
* <li>{@link KafkaToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link KafkaToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
static class MessageToTableRow
extends PTransform<PCollection<KV<String, String>>, PCollectionTuple> {
private final KafkaToBQOptions options;
MessageToTableRow(KafkaToBQOptions options) {
this.options = options;
public PCollectionTuple expand(PCollection<KV<String, String>> input) {
PCollectionTuple udfOut =
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new MessageToFailsafeElementFn()))
FailsafeJavascriptUdf.<KV<String, String>>newBuilder()
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
FailsafeJsonToTableRow.<KV<String, String>>newBuilder()
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
* The {@link MessageToFailsafeElementFn} wraps an Kafka Message with the {@link FailsafeElement}
* class so errors can be recovered from and the original message can be output to a error records
* table.
static class MessageToFailsafeElementFn
extends DoFn<KV<String, String>, FailsafeElement<KV<String, String>, String>> {
public void processElement(ProcessContext context) {
KV<String, String> message = context.element();
context.output(FailsafeElement.of(message, message.getValue()));
* Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
* @param insertError BigQueryInsert error.
* @return FailsafeElement object.
protected static FailsafeElement<String, String> wrapBigQueryInsertError(
BigQueryInsertError insertError) {
FailsafeElement<String, String> failsafeElement;
try {
failsafeElement =
insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
} catch (IOException e) {
LOG.error("Failed to wrap BigQuery insert error.");
throw new RuntimeException(e);
return failsafeElement;