/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform.readFromKafka;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.templates.KafkaToBigQuery.KafkaToBQOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteKafkaMessageErrors;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link KafkaToBigQuery} pipeline is a streaming pipeline which ingests text data from Kafka,
* executes a UDF, and outputs the resulting records to BigQuery. Any errors which occur in the
* transformation of the data, execution of the UDF, or inserting into the output table will be
* inserted into a separate errors table in BigQuery. The errors table will be created if it does
* not exist prior to execution. Both output and error tables are specified by the user as
* parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Kafka topic exists and the message is encoded in a valid JSON format.
* <li>The BigQuery output table exists.
* <li>The Kafka brokers are reachable from the Dataflow worker machines.
* </ul>
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/kafka-to-bigquery/README_Kafka_to_BigQuery.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Kafka_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Kafka to BigQuery",
description =
"The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. "
+ "Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. "
+ "If the errors table does not exist prior to execution, then it is created.",
optionsClass = KafkaToBQOptions.class,
flexContainerName = "kafka-to-bigquery",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The output BigQuery table must exist.",
"The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.",
"The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format."
})
public class KafkaToBigQuery {
/* Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(KafkaToBigQuery.class);
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<KV<String, String>, String>> UDF_OUT =
new TupleTag<FailsafeElement<KV<String, String>, String>>() {};
/** The tag for the main output of the json transformation. */
static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
static final TupleTag<FailsafeElement<KV<String, String>, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<KV<String, String>, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
static final TupleTag<FailsafeElement<KV<String, String>, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<KV<String, String>, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** String/String Coder for FailsafeElement. */
private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(
NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));
/**
* The {@link KafkaToBQOptions} class provides the custom execution options passed by the executor
* at the command-line.
*/
public interface KafkaToBQOptions
extends KafkaReadOptions,
JavascriptTextTransformerOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The name should be in the format "
+ "`<project>:<dataset>.<table_name>`. The table's schema must match input objects.")
@Required
String getOutputTableSpec();
void setOutputTableSpec(String outputTableSpec);
/**
* Get bootstrap server across releases.
*
* @deprecated This method is no longer acceptable to get bootstrap servers.
* <p>Use {@link KafkaToBQOptions#getReadBootstrapServers()} instead.
*/
@TemplateParameter.Text(
order = 2,
optional = true,
regexes = {"[,:a-zA-Z0-9._-]+"},
description = "Kafka Bootstrap Server list",
helpText = "Kafka Bootstrap Server list, separated by commas.",
example = "localhost:9092,127.0.0.1:9093")
@Deprecated
String getBootstrapServers();
/**
* Get bootstrap server across releases.
*
* @deprecated This method is no longer acceptable to set bootstrap servers.
* <p>Use {@link KafkaToBQOptions#setReadBootstrapServers(String)} instead.
*/
@Deprecated
void setBootstrapServers(String bootstrapServers);
/**
* Get bootstrap server across releases.
*
* @deprecated This method is no longer acceptable to get Input topics.
* <p>Use {@link KafkaToBQOptions#getKafkaReadTopics()} instead.
*/
@Deprecated
@TemplateParameter.Text(
order = 3,
optional = true,
regexes = {"[,a-zA-Z0-9._-]+"},
description = "Kafka topic(s) to read the input from",
helpText = "Kafka topic(s) to read the input from.",
example = "topic1,topic2")
String getInputTopics();
/**
* Get bootstrap server across releases.
*
* @deprecated This method is no longer acceptable to set Input topics.
* <p>Use {@link KafkaToBQOptions#getKafkaReadTopics()} instead.
*/
@Deprecated
void setInputTopics(String inputTopics);
@TemplateParameter.BigQueryTable(
order = 4,
optional = true,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons "
+ "(e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will"
+ " be created during pipeline execution. If not specified, \"outputTableSpec_error_records\" is used instead.",
example = "your-project-id:your-dataset.your-table-name")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String outputDeadletterTable);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* KafkaToBigQuery#run(KafkaToBQOptions)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
KafkaToBQOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToBQOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(KafkaToBQOptions options) {
// Validate BQ STORAGE_WRITE_API options
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
MetadataValidator.validate(options);
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<KV<String, String>, String> coder =
FailsafeElementCoder.of(
KvCoder.of(
NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
NullableCoder.of(StringUtf8Coder.of()));
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
List<String> topicsList;
if (options.getKafkaReadTopics() != null) {
topicsList = new ArrayList<>(Arrays.asList(options.getKafkaReadTopics().split(",")));
} else if (options.getInputTopics() != null) {
topicsList = new ArrayList<>(Arrays.asList(options.getInputTopics().split(",")));
} else {
throw new IllegalArgumentException("Please Provide --kafkaReadTopic");
}
String bootstrapServers;
if (options.getReadBootstrapServers() != null) {
bootstrapServers = options.getReadBootstrapServers();
} else if (options.getBootstrapServers() != null) {
bootstrapServers = options.getBootstrapServers();
} else {
throw new IllegalArgumentException("Please Provide --bootstrapServers");
}
/*
* Steps:
* 1) Read messages in from Kafka
* 2) Transform the messages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
readFromKafka(
bootstrapServers,
topicsList,
ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
null))
/*
* Step #2: Transform the Kafka Messages into TableRows
*/
.apply("ConvertMessageToTableRow", new MessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(KafkaToBigQuery::wrapBigQueryInsertError))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write failed records out to BigQuery
*/
PCollectionList.of(convertedTableRows.get(UDF_DEADLETTER_OUT))
.and(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteTransformationFailedRecords",
WriteKafkaMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
/*
* Step #5: Insert records that failed BigQuery inserts into a deadletter table.
*/
failedInserts.apply(
"WriteInsertionFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
return pipeline.run();
}
/**
* The {@link MessageToTableRow} class is a {@link PTransform} which transforms incoming Kafka
* Message objects into {@link TableRow} objects for insertion into BigQuery while applying a UDF
* to the input. The executions of the UDF and transformation to {@link TableRow} objects is done
* in a fail-safe way by wrapping the element with it's original payload inside the {@link
* FailsafeElement} class. The {@link MessageToTableRow} transform will output a {@link
* PCollectionTuple} which contains all output and dead-letter {@link PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link KafkaToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the UDF.
* <li>{@link KafkaToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement} records
* which failed processing during the UDF execution.
* <li>{@link KafkaToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link KafkaToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class MessageToTableRow
extends PTransform<PCollection<KV<String, String>>, PCollectionTuple> {
private final KafkaToBQOptions options;
MessageToTableRow(KafkaToBQOptions options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<KV<String, String>> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new MessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<KV<String, String>>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setReloadIntervalMinutes(
options.getJavascriptTextTransformReloadIntervalMinutes())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<KV<String, String>>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link MessageToFailsafeElementFn} wraps an Kafka Message with the {@link FailsafeElement}
* class so errors can be recovered from and the original message can be output to a error records
* table.
*/
static class MessageToFailsafeElementFn
extends DoFn<KV<String, String>, FailsafeElement<KV<String, String>, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
KV<String, String> message = context.element();
context.output(FailsafeElement.of(message, message.getValue()));
}
}
/**
* Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
*
* @param insertError BigQueryInsert error.
* @return FailsafeElement object.
*/
protected static FailsafeElement<String, String> wrapBigQueryInsertError(
BigQueryInsertError insertError) {
FailsafeElement<String, String> failsafeElement;
try {
failsafeElement =
FailsafeElement.of(
insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
failsafeElement.setErrorMessage(insertError.getError().toPrettyString());
} catch (IOException e) {
LOG.error("Failed to wrap BigQuery insert error.");
throw new RuntimeException(e);
}
return failsafeElement;
}
}