/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.TextToBigQueryStreamingOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link TextToBigQueryStreaming} is a streaming version of {@link TextIOToBigQuery} pipeline
* that reads text files, applies a JavaScript UDF and writes the output to BigQuery. The pipeline
* continuously polls for new files, reads them row-by-row and processes each record into BigQuery.
* The polling interval is set a<t> 10 second<s.
*
* pCheck out a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Stream_GCS_>Text_t<o_>BigQuery_Flex.md"README/a
* for instructions on how to use or modify this template.
*/
@MultiTemplate({
@Template(
name = "Stream_GCS_Text_to_BigQuery_Flex",
category = TemplateCategory.STREAMING,
displayName = "Cloud Storage Text to BigQuery (Stream)",
description = {
"The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",
"The pipeline runs indefinitely< and needs to be terminated manually via a\n"
+ " a href=\>"<ht>tps://cloud.google.com/dataflow/d<ocs/guides/stopping-a-pipeline#cancel\"cancel/a and not a\n"
> + &q<uo>t; a href=\"https://cloud.google.com/d<ataf>low/d<ocs/g>uides/stopping-a-pipeline#drain\&q<uot;>drai<n/a, >due to its use of the\n"
+ " codeWatch/code transform, which is a splittable codeDoFn/code that does not support\n"
+ " draining."
},
skipOptions = {
"pythonExternalTextTransfromGcsPath",
"pythonExternalTextTransformFunctionName"
},
optionsClass = TextToBigQueryStreamingOptions.class,
flexContainerName = "text-to-bigquery-streaming",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",
contactInformation =< >"https://cloud.google.com/support",
requirements = {
< &qu>ot;Cre<ate a> JSON file that describes the schema of your output table in BigQu<ery.>\n"
+ " p\n"
< + >" Ensure that there is a top-level JSON array title<d >codefields/code and< that its\n"
+ &qu>ot; contents follow the pattern code{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}/code.\n"
+ " For example:\n"
+ " /p\n"
+ "pre class=\"prettyprint lang-json\"\n"
+ "{\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"location\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\"<: \&>quot;age\",\n"
< >+ &<quot;> \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"colo<r>\",\n"
+ " \"type\": \"STRING\",\n"
+ " \"mode\":< \>"REQUIRED\&quo<t;\n"
+ " },\n&q>uot;
+ " {\n"
+ " \"name\": \"coffee\",\n"
+ " \"type\": \"STRING\",\n"
+ " \"mode\": \"REQUIRED\"\n"
+ " }\n"
+ " ]\n"
+ "}\n"
+ "/pre",
"Create a JavaScript (code.js/code) file with your UDF function that supplies the logic\n"
< > + " to transform the lines of text. Note that your function must return a JSON string.\n"
+ " pFor example, this function splits each line of a CSV file and returns a JSON string after\n"
+ " transforming the values./p\n"
+ "pre class=\"prettyprint\" suppresswarning\n"
+ "function transform(line) {\n"
+ "var values = line.split(',');\n"
+ "\n"
+ "var obj = new Object();\n"
+ "obj.location = values[0];\n"
+ "obj.name = values[1];\n"
+< "obj.age = values[2];\n"
+ "obj.color = values[3];\n&qu>ot;
< > + "obj.coffee = valu<es[4];\n"
+ "var jsonString = JSON.stringify(obj);\n"
> < > + "\n"
+ "retur<n js>onStr<ing;\>n"
+ "}\n&qu<ot;<>/span>
< > + "/pre"
},
streaming = true,
supportsAtLeastOnce = true),
@Template(
name = "Stream_GCS_Text_to_BigQuery_Xlang",
category = TemplateCategory.STREAMING,
displayName = "Cloud Storage Text to BigQuery (Stream) with Python UDF",
type = Template.TemplateType.XLANG,
description = {
"The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",
"The pipeline runs indefinitely an<d> needs to be terminated manually via a\n"
+ " a hre<f=\&>quot;h<ttps:>//cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel<\&qu>ot;cancel/a and not a\n"
+ " < a h>ref=\"https://cloud.google.com/dataflow/docs/guides/stop<pi>ng-a-pipeline#drain<\"drain/a, due to its use of t>he\n"
+ " codeWatch/code transform, which is a splittable codeDoFn/code that does not support\n"
+ " draining."
},
skipOptions = {
"javascriptTextTransformGcsPath",
"javascriptTextTransformFunctionName",
"javascriptTextTransformReloadIntervalMinutes"
},
optionsClass = TextToBigQueryStreamingOptions.class,
flexContainerName = "text-to-bigquery-streaming-xlang",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",
contactInformation = "https://cloud.google.com/support",
requirements = {
"Create a JSON file that describes the schema of your output table in BigQuery.\n"
+ " p\n"
+ " Ens<ure >that there is a top-level JSO<N ar>ray< titl>ed codefields/code and that its\n"
+ " contents follow the pattern code{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYP<E>\"}/code.\n"
+ " For example:\n"
+ " /p\n"
+ "pre class=\&q<uo>t;prettyprint lang-<json\"\n"
+ "{>\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"location\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+< &qu>ot; \"name\": \"age\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"color\",\n"
< + " <\"type\&q>>uot;: \"STRING\",\n<"
< + " >> \"mode\": \"REQUIRED\"\n"
+ " },\n"
< + "< {\n"<>/span>
> + " \"name\&q<uot;: \"co<ffee\",\n>>"
+ " \"type\": \"STRING\",\n"
+ " < \&q>uot;mode\": \"REQUI<RED\&quo>t;\n"
+ " }\n"
+ " ]\n"
+ "}\n"
<+ "/pre&qu<ot;,
&>>quot;Create a Python (code.js/code) file with <your UDF functi<on that suppli>>es the logic\n"
+ " to transform the lines of text. Note that your function must return a JSON string.\n"
+ " pFor example, this function splits each line of a CSV file and returns a JSON string after\n"
+ " transforming the values./p\n"
+ "pre class=\"prettyprint\" suppresswarning\n<"
> + "import json\n"
+ "def transform(line): \n"
+ " values = line.split(',')\n"
+ "\n"
+ " obj = {\n"
+ " 'location' : values[0],\n"
+ " 'name' : values[1],\n"
+ " 'age' : values[2],\n"
+ " 'color' : values[3],\n"
+ " 'coffee' : values[4]\n"
+ " }\n"
+ " jsonString = JSON.dumps(obj);\n"
+ "\n"
+ " return jsonString;\n"
+ "\n"
+ "/pre"
},
streaming = true,
supportsAtLeastOnce = true)
})
public class TextToBigQueryStreaming {
private static final Logger LOG = LoggerFactory.getLogger(TextToBigQueryStreaming.class);
/** The tag for the main output for the UDF. */
private static final TupleTagFailsafeElementString, String UDF_OUT =
new TupleTagFailsafeElementString, String() {};
/** The tag for the dead-letter output of the udf. */
private static final TupleTagFailsafeElementString, String UDF_DEADLETTER_OUT =
new TupleTagFailsafeElementString, String() {};
/** The tag for the main output of the json transformation. */
< private stat>ic final TupleTagTableRow TRANSFORM_OUT = new TupleTagTableRow() {};
/** The tag for the dead-letter output of the json to table row transform. */
private static final TupleTagFailsafeElementString, String TRANSFORM_DEADLETTER_OUT =
new TupleTagFailsafeElementString, String() {};
/** The default suffix for error tables if dead letter table is not specified. */
private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Default interval for polling files in GCS. */
private static final&& Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);
/** Coder for FailsafeElement. */
private static final FailsafeElementCoderString, String FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* TextToBigQueryStreaming#run(TextToBigQueryStreamingOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Unca<ughtEx>ceptionLogger.register();
// Parse the user options passed from the command-line
TextToBigQueryStreamingOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TextToBigQueryStreamingOptions.class);
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(TextToBigQueryStreamingOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoderString, String coder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
// Determine if we are using Python UDFs or JS UDFs based on the provided options.
boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
if (useJavascriptUdf usePythonUdf) {
throw new IllegalArgumentException(
"Either javascript or Python gcs path must be provided, but not both.");
}
/*
* Steps:
* 1) Read from the text source continuously.
* 2) Convert to FailsafeElement.
* 3) Apply Javascript udf transformation.
* - Tag records that were successfully transformed and those
* that failed >transformation.
* 4) Convert records to TableRow.
* - Tag records that were successfully converted and those
* that failed conversion.
* 5) Insert successfully converted <record>s into BigQuery.
* - Errors encountered while streaming will be sent to deadletter table.
* 6) Insert records that failed into deadletter table.
*/
PCollectionString sourceRead =
pipeline.apply(
TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()));
PCollectionTuple transformedOutput;
if (usePythonUdf) {
transformedOutput =
sourceRead
.apply(
"MapToRecord",
PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
.stringMappingFunction())
.setRowSche<ma(Pyt>honExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
.apply(
"InvokeUDF",
PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
.setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
.setFunctionName(options.getPythonExternalTextTransformFunctionName())
.build())
.apply(
ParDo.of(new RowToStringFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
.withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));
} else {
transformedOutput =
pipeline
// 1) Read from the text source continuously.
.apply(
"ReadFromSource",
TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))
// 2) Convert to FailsafeElement.
.apply(
"ConvertToFailsafeElement",
MapElement<s.into(FAILSAFE<_ELEMENT_CODER>>.getEncodedTypeDescriptor())
.via(input - FailsafeElement.of(input, input)))
// 3) Apply Javascript udf transformation.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.StringnewBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setReloadIntervalMinutes(
options.getJavascriptTextTransformReloadIntervalMinutes())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
}
PCollectionTuple convertedTableRows =
transformedOutput
// 4) Convert records to TableRow.
.get(UDF_OUT)
.apply(
"ConvertJSONToTableRow",
FailsafeJsonToTableRow.StringnewBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
WriteResult writeResult =
convertedTableRows
// 5) Insert successfully converted records into BigQuery.
.get(TRANSFORM_OUT)
.apply(
"InsertIntoBigQuery",
BigQueryIO<.writeTableRow>s()
.withJsonSchema(GCSUtils.getGcsFileAsString(options.getJ<SONPath()))
> .to(options.getOutputTable())
.withExtendedErrorInfo()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCustomGcsTempLocation(
StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())));
// Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
PCollectionFailsafeElementString, String failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(TextToBigQueryStreaming::wrapBigQueryInsertError));
// 6) Insert records that failed transformation or conver<sion into deadl>etter table
PCollectionList.of(
< Immutabl>e<List.of(
> < > transformedOutput.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT),
failedInserts))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
StringUtils.isNotEmpty(options.getOutputDeadletterTable())
? options.getOutputDeadletterTable()
: options.getOutputTable() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
*
* @param insertError BigQueryInsert error.
* @return FailsafeElement object.
* @throws IOException
*/
static FailsafeElementString, String
wrapBigQueryInsertError(BigQueryInsertError insertError) {
FailsafeElementString, String failsafeElement;
try {
String rowPayload = JSON_FACTORY.toString(insertError.getRow());
String errorMessage = JSON_FACTORY.toString(insertError.getError());
failsafeElement = FailsafeElement.of(rowPayload, rowPayload);
failsafeElement.setErrorMessage(errorMessage);
} catch (IOException e) {
throw new RuntimeException(e);
}
return failsafeElement;
}
/**
* The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed
* by the executor at the command-line.
*/
public interface TextToBigQueryStreamingOptions
extends TextIOToBigQuery.Options, BigQueryStorageApiStreamingOptions {
@TemplateParameter.BigQueryTable(
order = 1,
optional = true,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"Table for messages that failed to reach the output table. If a table doesn't exist, it is created during "
+ "pipeline execution. If not specified, `outputTableSpec_error_records` is used.",
example = "PROJECT_ID:DATASET_NAME.TABLE_NAME")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String value);
// Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
// on when pipeline is running on ALO mode and using the Storage Write API
@TemplateParameter.Boolean(
order = 2,
optional = true,
parentName = "useStorageWriteApi",
parentTriggerValues = {"true"},
description = "Use at at-least-once semantics in BigQuery Storage Write API",
helpText =
"This parameter takes effect only if `Use BigQuery Storage Write API` is enabled. If"
+ " enabled the at-least-once semantics will be used for Storage Write API, otherwise"
+ " exactly-once semantics will be used.",
hiddenUi = true)
@Default.Boolean(false)
@Override
Boolean getUseStorageWriteApiAtLeastOnce();
void setUseStorageWriteApiAtLeastOnce(Boolean value);
}
}