The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads
JSON-formatted messages from a Pub/Sub subscription and writes them to a
BigQuery table. You can use the template as a quick solution to move
Pub/Sub data to BigQuery. The template reads JSON-formatted messages
from Pub/Sub and converts them to BigQuery elements.
Requirements for this pipeline:
The data field
of Pub/Sub messages must use the JSON format, described in this
JSON guide.
For example, messages with values in the data field formatted as {"k1":"v1", "k2":"v2"} can be inserted into a BigQuery
table with two columns, named k1 and k2, with a string data type.
The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.
Template parameters
Parameter
Description
inputSubscription
The Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription>.
outputTableSpec
The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>
outputDeadletterTable
The BigQuery table for messages that failed to reach the output table, in the format of <my-project>:<my-dataset>.<my-table>.
If it doesn't exist, it is created during pipeline execution.
If not specified, OUTPUT_TABLE_SPEC_error_records is used instead.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
Running the Pub/Sub Subscription to BigQuery template
Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is us-central1.
For a list of regions where you can run a Dataflow job, see
Dataflow locations.
From the Dataflow template drop-down menu, select
the Pub/Sub Subscription to BigQuery template.
In the provided parameter fields, enter your parameter values.
Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
--region REGION_NAME \
--staging-location TEMP_LOCATION \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Replace the following:
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
SUBSCRIPTION_NAME: your Pub/Sub subscription name
DATASET: your BigQuery dataset
TABLE_NAME: your BigQuery table name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@Description("Table spec to write the output to")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@Description("Pub/Sub topic to read the input from")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description(
"The Cloud Pub/Sub subscription to consume from. "
+ "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscription-name>.")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format. If it doesn't exist, it will be created during pipeline execution.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub Topic to BigQuery
The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads
JSON-formatted messages from a Pub/Sub topic and writes them to a
BigQuery table. You can use the template as a quick solution to move
Pub/Sub data to BigQuery. The template reads JSON-formatted messages
from Pub/Sub and converts them to BigQuery elements.
Requirements for this pipeline:
The data field
of Pub/Sub messages must use the JSON format, described in this
JSON guide.
For example, messages with values in the data field formatted as {"k1":"v1", "k2":"v2"} can be inserted into a BigQuery
table with two columns, named k1 and k2, with a string data type.
The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.
Template parameters
Parameter
Description
inputTopic
The Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic>.
outputTableSpec
The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>
outputDeadletterTable
The BigQuery table for messages that failed to reach the output table. It should be in <my-project>:<my-dataset>.<my-table> format.
If it doesn't exist, it is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is us-central1.
For a list of regions where you can run a Dataflow job, see
Dataflow locations.
From the Dataflow template drop-down menu, select
the Pub/Sub Topic to BigQuery template.
In the provided parameter fields, enter your parameter values.
Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
--region REGION_NAME \
--staging-location TEMP_LOCATION \
--parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Replace the following:
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
DATASET: your BigQuery dataset
TABLE_NAME: your BigQuery table name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@Description("Table spec to write the output to")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@Description("Pub/Sub topic to read the input from")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description(
"The Cloud Pub/Sub subscription to consume from. "
+ "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscription-name>.")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format. If it doesn't exist, it will be created during pipeline execution.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub Avro to BigQuery
The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro
data from a Pub/Sub subscription into a BigQuery table.
Any errors which occur while writing to the BigQuery table are streamed into a
Pub/Sub unprocessed topic.
Requirements for this pipeline
The input Pub/Sub subscription must exist.
The schema file for the Avro records must exist on Cloud Storage.
The unprocessed Pub/Sub topic must exist.
The output BigQuery dataset must exist.
Template parameters
Parameter
Description
schemaPath
The Cloud Storage location of the Avro schema file. For example, gs://path/to/my/schema.avsc.
inputSubscription
The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic
The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec
The BigQuery output table location. For example, <my-project>:<my-dataset>.<my-table>.
Depending on the createDisposition specified, the output table may be created
automatically using the user provided Avro schema.
writeDisposition
(Optional) The BigQuery WriteDisposition.
For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND
createDisposition
(Optional) The BigQuery CreateDisposition.
For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.Row;
/**
* A Dataflow pipeline to stream <a href="https://avro.apache.org/">Apache Avro</a> records from
* Pub/Sub into a BigQuery table.
*
* <p>Any persistent failures while writing to BigQuery will be written to a Pub/Sub dead-letter
* topic.
*/
public final class PubsubAvroToBigQuery {
/**
* Validates input flags and executes the Dataflow pipeline.
*
* @param args command line arguments to the pipeline
*/
public static void main(String[] args) {
PubsubAvroToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubAvroToBigQueryOptions.class);
run(options);
}
/**
* Provides custom {@link org.apache.beam.sdk.options.PipelineOptions} required to execute the
* {@link PubsubAvroToBigQuery} pipeline.
*/
public interface PubsubAvroToBigQueryOptions
extends ReadSubscriptionOptions, WriteOptions, WriteTopicOptions {
@Description("GCS path to Avro schema file.")
@Required
String getSchemaPath();
void setSchemaPath(String schemaPath);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options execution parameters to the pipeline
* @return result of the pipeline execution as a {@link PipelineResult}
*/
private static PipelineResult run(PubsubAvroToBigQueryOptions options) {
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
Schema schema = SchemaUtils.getAvroSchema(options.getSchemaPath());
WriteResult writeResults =
pipeline
.apply(
"Read Avro records",
PubsubIO.readAvroGenericRecords(schema)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic()))
// Workaround for BEAM-12256. Eagerly convert to rows to avoid
// the RowToGenericRecord function that doesn't handle all data
// types.
// TODO: Remove this workaround when a fix for BEAM-12256 is
// released.
.apply(Convert.toRows())
.apply(
"Write to BigQuery",
BigQueryConverters.<Row>createWriteTransform(options)
.useBeamSchema()
.withMethod(Method.STREAMING_INSERTS));
writeResults
.getFailedInsertsWithErr()
.apply(
"Create error payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<GenericRecord>newBuilder()
.setPayloadCoder(AvroCoder.of(schema))
.setTranslateFunction(BigQueryConverters.TableRowToGenericRecordFn.of(schema))
.build())
.apply("Write failed records", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}
Pub/Sub Proto to BigQuery
The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto
data from a Pub/Sub subscription into a BigQuery table.
Any errors that occur while writing to the BigQuery table are streamed into a
Pub/Sub unprocessed topic.
A JavaScript user-defined function (UDF) can be provided to transform data. Errors while executing
the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as
the BigQuery errors.
Requirements for this pipeline:
The input Pub/Sub subscription must exist.
The schema file for the Proto records must exist on Cloud Storage.
The output Pub/Sub topic must exist.
The output BigQuery dataset must exist.
If the BigQuery table exists, it must have a schema matching the proto data regardless of the createDisposition value.
Template parameters
Parameter
Description
protoSchemaPath
The Cloud Storage location of the self-contained proto schema file. For example, gs://path/to/my/file.pb.
This file can be generated with the --descriptor_set_out flag of the protoc command.
The --include-imports flag guarantees that the file is self-contained.
fullMessageName
The full proto message name. For example, package.name.MessageName, where package.name is the value
provided for the package statement and not the java_package statement.
inputSubscription
The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic
The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec
The BigQuery output table location. For example, my-project:my_dataset.my_table.
Depending on the createDisposition specified, the output table might be created
automatically using the input schema file.
preserveProtoFieldNames
(Optional) true to preserve the original Proto field name in JSON. false to use more standard JSON names.
For example, false would change field_name to fieldName. (Default: false)
bigQueryTableSchemaPath
(Optional) Cloud Storage path to BigQuery schema path. For example, gs://path/to/my/schema.json. If this is
not provided, then the schema is inferred from the Proto schema.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
udfOutputTopic
(Optional) The Pub/Sub topic storing the UDF errors. For example,
projects/<project-id>/topics/<topic-name>. If this is not provided, UDF errors
are sent to the same topic as outputTopic.
writeDisposition
(Optional) The BigQuery WriteDisposition.
For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND.
createDisposition
(Optional) The BigQuery CreateDisposition.
For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED.
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Proto schema file (for example, gs://MyBucket/file.pb)
PROTO_MESSAGE_NAME: the Proto message name (for example, package.name.MessageName)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
UNPROCESSED_TOPIC: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Proto schema file (for example, gs://MyBucket/file.pb)
PROTO_MESSAGE_NAME: the Proto message name (for example, package.name.MessageName)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
UNPROCESSED_TOPIC: the Pub/Sub topic to use for the unprocessed queue
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.ArrayUtils;
/**
* A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
* records from Pub/Sub to BigQuery.
*
* <p>Persistent failures are written to a Pub/Sub unprocessed topic.
*/
public final class PubsubProtoToBigQuery {
private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
public static void main(String[] args) {
run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
}
/** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
public interface PubSubProtoToBigQueryOptions
extends ReadSubscriptionOptions, WriteOptions, WriteTopicOptions {
@Description("GCS path to proto schema descriptor file.")
@Required
String getProtoSchemaPath();
void setProtoSchemaPath(String value);
@Description("Full message name (i.e. package.name.MessageName) of the target Protobuf type.")
@Required
String getFullMessageName();
void setFullMessageName(String value);
@Description("True to preserve proto snake_case. False will convert fields to lowerCamelCase.")
@Default.Boolean(false)
Boolean getPreserveProtoFieldNames();
void setPreserveProtoFieldNames(Boolean value);
@Description("GCS path to JSON file that represents the BigQuery table schema.")
String getBigQueryTableSchemaPath();
void setBigQueryTableSchemaPath(String value);
@Description("GCS path to JavaScript UDF source")
String getJavascriptTextTransformGcsPath();
void setJavascriptTextTransformGcsPath(String javascriptTextTransformGcsPath);
@Description("UDF JavaScript Function Name")
String getJavascriptTextTransformFunctionName();
void setJavascriptTextTransformFunctionName(String javascriptTextTransformFunctionName);
@Description("Pub/Sub topic for UDF failures")
String getUdfOutputTopic();
void setUdfOutputTopic(String udfOutputTopic);
}
/** Runs the pipeline and returns the results. */
private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
Pipeline pipeline = Pipeline.create(options);
Descriptor descriptor = getDescriptor(options);
PCollection<String> maybeForUdf =
pipeline
.apply("Read From Pubsub", readPubsubMessages(options, descriptor))
.apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
runUdf(maybeForUdf, options)
.apply("Write to BigQuery", writeToBigQuery(options, descriptor))
.getFailedInsertsWithErr()
.apply(
"Create Error Payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
.setPayloadCoder(StringUtf8Coder.of())
.setTranslateFunction(BigQueryConverters::tableRowToJson)
.build())
.apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
return pipeline.run();
}
/** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
@VisibleForTesting
static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
String schemaPath = options.getProtoSchemaPath();
String messageName = options.getFullMessageName();
Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
if (descriptor == null) {
throw new IllegalArgumentException(
messageName + " is not a recognized message in " + schemaPath);
}
return descriptor;
}
/** Returns the {@link PTransform} for reading Pub/Sub messages. */
private static Read<DynamicMessage> readPubsubMessages(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
return PubsubIO.readProtoDynamicMessages(descriptor)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic());
}
/**
* Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
*
* <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
* specified in {@code options}.
*/
@VisibleForTesting
static Write<String> writeToBigQuery(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
Write<String> write =
BigQueryConverters.<String>createWriteTransform(options)
.withFormatFunction(BigQueryConverters::convertJsonToTableRow)
.withMethod(Method.STREAMING_INSERTS);
String schemaPath = options.getBigQueryTableSchemaPath();
if (Strings.isNullOrEmpty(schemaPath)) {
return write.withSchema(
SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
} else {
return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
}
}
/** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
private static class ConvertDynamicProtoMessageToJson
extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
private final boolean preserveProtoName;
private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
this.preserveProtoName = options.getPreserveProtoFieldNames();
}
@Override
public PCollection<String> expand(PCollection<DynamicMessage> input) {
return input.apply(
"Map to JSON",
MapElements.into(TypeDescriptors.strings())
.via(
message -> {
try {
JsonFormat.Printer printer = JsonFormat.printer();
return preserveProtoName
? printer.preservingProtoFieldNames().print(message)
: printer.print(message);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}));
}
}
/**
* Handles running the UDF.
*
* <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
*
* <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
* topic.
*
* @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
* @param options the options containing info on running the UDF
* @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
* called
*/
@VisibleForTesting
static PCollection<String> runUdf(
PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
// In order to avoid generating a graph that makes it look like a UDF was called when none was
// intended, simply return the input as "success" output.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) {
return jsonCollection;
}
// For testing purposes, we need to do this check before creating the PTransform rather than
// in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
// a value.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
throw new IllegalArgumentException(
"JavaScript function name cannot be null or empty if file is set");
}
PCollectionTuple maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
maybeSuccess
.get(UDF_FAILURE_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Failures",
ConvertFailsafeElementToPubsubMessage.<String, String>builder()
.setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
.setErrorMessageAttributeKey("udfErrorMessage")
.build())
.apply("Write Failed UDF", writeUdfFailures(options));
return maybeSuccess
.get(UDF_SUCCESS_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Output",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.setCoder(NullableCoder.of(StringUtf8Coder.of()));
}
/** {@link PTransform} that calls a UDF and returns both success and failure output. */
private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
private final PubSubProtoToBigQueryOptions options;
RunUdf(PubSubProtoToBigQueryOptions options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<String> input) {
return input
.apply("Prepare Failsafe UDF", makeFailsafe())
.setCoder(FAILSAFE_CODER)
.apply(
"Call UDF",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_SUCCESS_TAG)
.setFailureTag(UDF_FAILURE_TAG)
.build());
}
private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
.via((String json) -> FailsafeElement.of(json, json));
}
}
/**
* Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
* topic.
*/
private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
PubSubProtoToBigQueryOptions options) {
PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
return Strings.isNullOrEmpty(options.getUdfOutputTopic())
? write.to(options.getOutputTopic())
: write.to(options.getUdfOutputTopic());
}
}
Pub/Sub to Pub/Sub
The Pub/Sub to Pub/Sub template is a streaming pipeline that reads
messages from a Pub/Sub subscription and writes the messages to another Pub/Sub
topic. The pipeline also accepts an optional message attribute key and a value that can be used
to filter the messages that should be written to the Pub/Sub topic. You can use this
template to copy messages from a Pub/Sub subscription to another Pub/Sub
topic with an optional message filter.
Requirements for this pipeline:
The source Pub/Sub subscription must exist prior to execution.
The destination Pub/Sub topic must exist prior to execution.
Template parameters
Parameter
Description
inputSubscription
Pub/Sub subscription to read the input from. For example, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic
Cloud Pub/Sub topic to write the output to. For example, projects/<project-id>/topics/<topic-name>.
filterKey
(Optional) Filter events based on an attribute key. No filters are applied if filterKey is not specified.
filterValue
(Optional) Filter attribute value to use in case a filterKey is provided. A null filterValue is used by default.
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
SUBSCRIPTION_NAME: the Pub/Sub subscription
name
TOPIC_NAME: the Pub/Sub topic name
FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
FILTER_VALUE: filter attribute value to use if an event filter key is provided.
Accepts a valid Java Regex string as an event filter value. In case a regex is provided,
the complete expression should match in order for the message to be filtered. Partial matches
(such as substring) are not filtered. A null event filter value is used by default.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
SUBSCRIPTION_NAME: the Pub/Sub subscription
name
TOPIC_NAME: the Pub/Sub topic name
FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
FILTER_VALUE: filter attribute value to use if an event filter key is provided.
Accepts a valid Java Regex string as an event filter value. In case a regex is provided,
the complete expression should match in order for the message to be filtered. Partial matches
(such as substring) are not filtered. A null event filter value is used by default.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** An template that copies messages from one Pubsub subscription to another Pubsub topic. */
public class PubsubToPubsub {
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/**
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription. 2) Apply any
* filters if an attribute=value pair is provided. 3) Write each PubSubMessage to output PubSub
* topic.
*/
pipeline
.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(
"Filter Events If Enabled",
ParDo.of(
ExtractAndFilterEventsFn.newBuilder()
.withFilterKey(options.getFilterKey())
.withFilterValue(options.getFilterValue())
.build()))
.apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Options supported by {@link PubsubToPubsub}.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@Description(
"The Cloud Pub/Sub subscription to consume from. "
+ "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscription-name>.")
@Validation.Required
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> inputSubscription);
@Description(
"The Cloud Pub/Sub topic to publish to. "
+ "The name should be in the format of "
+ "projects/<project-id>/topics/<topic-name>.")
@Validation.Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
@Description(
"Filter events based on an optional attribute key. "
+ "No filters are applied if a filterKey is not specified.")
@Validation.Required
ValueProvider<String> getFilterKey();
void setFilterKey(ValueProvider<String> filterKey);
@Description(
"Filter attribute value to use in case a filterKey is provided. Accepts a valid Java regex"
+ " string as a filterValue. In case a regex is provided, the complete expression"
+ " should match in order for the message to be filtered. Partial matches (e.g."
+ " substring) will not be filtered. A null filterValue is used by default.")
@Validation.Required
ValueProvider<String> getFilterValue();
void setFilterValue(ValueProvider<String> filterValue);
}
/**
* DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
* publish events that pass the filter else, it will publish all input events.
*/
@AutoValue
public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);
// Counter tracking the number of incoming Pub/Sub messages.
private static final Counter INPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");
// Counter tracking the number of output Pub/Sub messages after the user provided filter
// is applied.
private static final Counter OUTPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");
private Boolean doFilter;
private String inputFilterKey;
private Pattern inputFilterValueRegex;
private Boolean isNullFilterValue;
public static Builder newBuilder() {
return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
}
@Nullable
abstract ValueProvider<String> filterKey();
@Nullable
abstract ValueProvider<String> filterValue();
@Setup
public void setup() {
if (this.doFilter != null) {
return; // Filter has been evaluated already
}
inputFilterKey = (filterKey() == null ? null : filterKey().get());
if (inputFilterKey == null) {
// Disable input message filtering.
this.doFilter = false;
} else {
this.doFilter = true; // Enable filtering.
String inputFilterValue = (filterValue() == null ? null : filterValue().get());
if (inputFilterValue == null) {
LOG.warn(
"User provided a NULL for filterValue. Only messages with a value of NULL for the"
+ " filterKey: {} will be filtered forward",
inputFilterKey);
// For backward compatibility, we are allowing filtering by null filterValue.
this.isNullFilterValue = true;
this.inputFilterValueRegex = null;
} else {
this.isNullFilterValue = false;
try {
inputFilterValueRegex = getFilterPattern(inputFilterValue);
} catch (PatternSyntaxException e) {
LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
throw new RuntimeException(e);
}
}
LOG.info(
"Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
}
}
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_COUNTER.inc();
if (!this.doFilter) {
// Filter is not enabled
writeOutput(context, context.element());
} else {
PubsubMessage message = context.element();
String extractedValue = message.getAttribute(this.inputFilterKey);
if (this.isNullFilterValue) {
if (extractedValue == null) {
// If we are filtering for null and the extracted value is null, we forward
// the message.
writeOutput(context, message);
}
} else {
if (extractedValue != null
&& this.inputFilterValueRegex.matcher(extractedValue).matches()) {
// If the extracted value is not null and it matches the filter,
// we forward the message.
writeOutput(context, message);
}
}
}
}
/**
* Write a {@link PubsubMessage} and increment the output counter.
*
* @param context {@link ProcessContext} to write {@link PubsubMessage} to.
* @param message {@link PubsubMessage} output.
*/
private void writeOutput(ProcessContext context, PubsubMessage message) {
OUTPUT_COUNTER.inc();
context.output(message);
}
/**
* Return a {@link Pattern} based on a user provided regex string.
*
* @param regex Regex string to compile.
* @return {@link Pattern}
* @throws PatternSyntaxException If the string is an invalid regex.
*/
private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
checkNotNull(regex, "Filter regex cannot be null.");
return Pattern.compile(regex);
}
/** Builder class for {@link ExtractAndFilterEventsFn}. */
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilterKey(ValueProvider<String> filterKey);
abstract Builder setFilterValue(ValueProvider<String> filterValue);
abstract ExtractAndFilterEventsFn build();
/**
* Method to set the filterKey used for filtering messages.
*
* @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterKey(ValueProvider<String> filterKey) {
checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
return setFilterKey(filterKey);
}
/**
* Method to set the filterValue used for filtering messages.
*
* @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterValue(ValueProvider<String> filterValue) {
checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
return setFilterValue(filterValue);
}
}
}
}
Pub/Sub to Splunk
The Pub/Sub to Splunk template is a streaming pipeline that reads
messages from a Pub/Sub subscription and writes the message payload to Splunk via
Splunk's HTTP Event Collector (HEC). The most common use case of this template is to export logs
to Splunk. To see an example of the underlying workflow, see
Deploying production-ready log exports to Splunk using Dataflow.
Before writing to Splunk, you can also apply a JavaScript user-defined function to the message
payload. Any messages that experience processing failures are forwarded to a
Pub/Sub unprocessed topic for further troubleshooting and reprocessing.
As an extra layer of protection for your HEC token, you can also pass in a Cloud KMS key along with the base64-encoded HEC
token parameter encrypted with the Cloud KMS key.
See the Cloud KMS API encryption endpoint
for additional details on encrypting your HEC token parameter.
Requirements for this pipeline:
The source Pub/Sub subscription must exist prior to running the pipeline.
The Pub/Sub unprocessed topic must exist prior to running the pipeline.
The Splunk HEC endpoint must be accessible from the Dataflow workers' network.
The Splunk HEC token must be generated and available.
Template parameters
Parameter
Description
inputSubscription
The Pub/Sub subscription from which to read the input. For example, projects/<project-id>/subscriptions/<subscription-name>.
token
(Optional) The Splunk HEC authentication token. Must be provided if the tokenSource is set to PLAINTEXT or KMS.
url
The Splunk HEC url. This must be routable from the VPC in which the pipeline runs. For example, https://splunk-hec-host:8088.
outputDeadletterTopic
The Pub/Sub topic to forward undeliverable messages. For example, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
batchCount
(Optional) The batch size for sending multiple events to Splunk. Default 1 (no batching).
parallelism
(Optional) The maximum number of parallel requests. Default 1 (no parallelism).
disableCertificateValidation
(Optional) Disable SSL certificate validation. Default false (validation enabled). If true, the certificates are not validated (all certificates are trusted) and `rootCaCertificatePath` parameter is ignored.
includePubsubMessage
(Optional) Include the full Pub/Sub message in the payload. Default false
(only the data element is included in the payload).
tokenSource
Source of the token. One of PLAINTEXT, KMS or SECRET_MANAGER. This parameter must be provided if Secret Manager is used.
If tokenSource is set to KMS, tokenKMSEncryptionKey and encrypted tokenmust be provided.
If tokenSource is set to SECRET_MANAGER, tokenSecretIdmust be provided.
If tokenSource is set to PLAINTEXT, tokenmust be provided.
tokenKMSEncryptionKey
(Optional) The Cloud KMS key to decrypt the HEC token string. This parameter must be provided if the tokenSource is set to KMS.
If the Cloud KMS key is provided, the HEC token string must be passed in encrypted.
tokenSecretId
(Optional) The Secret Manager secret ID for the token. This parameter must provided if the tokenSource is set to SECRET_MANAGER.
Should be in the format projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath
(Optional) The full URL to root CA certificate in Cloud Storage. For example, gs://mybucket/mycerts/privateCA.crt. The certificate provided in Cloud Storage must be DER-encoded and may be supplied in binary or printable (Base64) encoding.
If the certificate is provided in Base64 encoding, it must be bounded at the beginning by -----BEGIN CERTIFICATE-----, and must be bounded at the end by -----END CERTIFICATE-----.
If this parameter is provided, this private CA certificate file is fetched and added to the Dataflow worker's trust store in order to verify Splunk HEC endpoint's SSL certificate.
If this parameter is not provided, the default trust store is used.
enableBatchLogs
(Optional) Specifies whether logs should be enabled for batches written to Splunk. Default: true.
enableGzipHttpCompression
(Optional) Specifies whether HTTP requests sent to Splunk HEC should be compressed (gzip content encoded). Default: true.
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
TOKEN: Splunk's Http Event Collector token
URL: the URL path for Splunk's Http Event Collector
(for example, https://splunk-hec-host:8088)
DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT: the batch size to use for sending multiple events to Splunk
PARALLELISM: the number of parallel requests to use for sending events to Splunk
DISABLE_VALIDATION: true if you want to disable SSL certificate validation
ROOT_CA_CERTIFICATE_PATH: the path to root CA certificate in Cloud Storage (for example, gs://your-bucket/privateCA.crt)
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
TOKEN: Splunk's Http Event Collector token
URL: the URL path for Splunk's Http Event Collector
(for example, https://splunk-hec-host:8088)
DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT: the batch size to use for sending multiple events to Splunk
PARALLELISM: the number of parallel requests to use for sending events to Splunk
DISABLE_VALIDATION: true if you want to disable SSL certificate validation
ROOT_CA_CERTIFICATE_PATH: the path to root CA certificate in Cloud Storage (for example, gs://your-bucket/privateCA.crt)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.splunk.SplunkEvent;
import com.google.cloud.teleport.splunk.SplunkEventCoder;
import com.google.cloud.teleport.splunk.SplunkIO;
import com.google.cloud.teleport.splunk.SplunkWriteError;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadSubscriptionOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubWriteDeadletterTopicOptions;
import com.google.cloud.teleport.templates.common.SplunkConverters;
import com.google.cloud.teleport.templates.common.SplunkConverters.SplunkOptions;
import com.google.cloud.teleport.util.TokenNestedValueProvider;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToSplunk} pipeline is a streaming pipeline which ingests data from Cloud
* Pub/Sub, executes a UDF, converts the output to {@link SplunkEvent}s and writes those records
* into Splunk's HEC endpoint. Any errors which occur in the execution of the UDF, conversion to
* {@link SplunkEvent} or writing to HEC will be streamed into a Pub/Sub topic.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The source Pub/Sub subscription exists.
* <li>HEC end-point is routable from the VPC where the Dataflow job executes.
* <li>Deadletter topic exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToSplunk \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --runner=${RUNNER}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-splunk-$USER-`date +"%Y%m%d-%H%M%S%z"`
* BATCH_COUNT=1
* PARALLELISM=5
*
* # Execute the templated pipeline:
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* token=my-splunk-hec-token,\
* url=http://splunk-hec-server-address:8088,\
* batchCount=${BATCH_COUNT},\
* parallelism=${PARALLELISM},\
* disableCertificateValidation=false,\
* outputDeadletterTopic=projects/${PROJECT_ID}/topics/deadletter-topic-name,\
* javascriptTextTransformGcsPath=gs://${BUCKET_NAME}/splunk/js/my-js-udf.js,\
* javascriptTextTransformFunctionName=myUdf"
* </pre>
*/
public class PubSubToSplunk {
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** Counter to track inbound messages from source. */
private static final Counter INPUT_MESSAGES_COUNTER =
Metrics.counter(PubSubToSplunk.class, "inbound-pubsub-messages");
/** The tag for successful {@link SplunkEvent} conversion. */
private static final TupleTag<SplunkEvent> SPLUNK_EVENT_OUT = new TupleTag<SplunkEvent>() {};
/** The tag for failed {@link SplunkEvent} conversion. */
private static final TupleTag<FailsafeElement<String, String>> SPLUNK_EVENT_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the dead-letter output of the udf. */
private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** GSON to process a {@link PubsubMessage}. */
private static final Gson GSON = new Gson();
/** Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToSplunk.class);
private static final Boolean DEFAULT_INCLUDE_PUBSUB_MESSAGE = false;
@VisibleForTesting protected static final String PUBSUB_MESSAGE_ATTRIBUTE_FIELD = "attributes";
@VisibleForTesting protected static final String PUBSUB_MESSAGE_DATA_FIELD = "data";
private static final String PUBSUB_MESSAGE_ID_FIELD = "messageId";
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToSplunk#run(PubSubToSplunkOptions)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
PubSubToSplunkOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToSplunkOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(PubSubToSplunkOptions options) {
Pipeline pipeline = Pipeline.create(options);
// Register coders.
CoderRegistry registry = pipeline.getCoderRegistry();
registry.registerCoderForClass(SplunkEvent.class, SplunkEventCoder.of());
registry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Convert message to FailsafeElement for processing.
* 3) Apply user provided UDF (if any) on the input strings.
* 4) Convert successfully transformed messages into SplunkEvent objects
* 5) Write SplunkEvents to Splunk's HEC end point.
* 5a) Wrap write failures into a FailsafeElement.
* 6) Collect errors from UDF transform (#3), SplunkEvent transform (#4)
* and writing to Splunk HEC (#5) and stream into a Pub/Sub deadletter topic.
*/
// 1) Read messages in from Pub/Sub
PCollection<String> stringMessages =
pipeline.apply(
"ReadMessages",
new ReadMessages(options.getInputSubscription(), options.getIncludePubsubMessage()));
// 2) Convert message to FailsafeElement for processing.
PCollectionTuple transformedOutput =
stringMessages
.apply(
"ConvertToFailsafeElement",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(input -> FailsafeElement.of(input, input)))
// 3) Apply user provided UDF (if any) on the input strings.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setLoggingEnabled(ValueProvider.StaticValueProvider.of(true))
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// 4) Convert successfully transformed messages into SplunkEvent objects
PCollectionTuple convertToEventTuple =
transformedOutput
.get(UDF_OUT)
.apply(
"ConvertToSplunkEvent",
SplunkConverters.failsafeStringToSplunkEvent(
SPLUNK_EVENT_OUT, SPLUNK_EVENT_DEADLETTER_OUT));
// 5) Write SplunkEvents to Splunk's HEC end point.
PCollection<SplunkWriteError> writeErrors =
convertToEventTuple
.get(SPLUNK_EVENT_OUT)
.apply(
"WriteToSplunk",
SplunkIO.writeBuilder()
.withToken(
new TokenNestedValueProvider(
options.getTokenSecretId(),
options.getTokenKMSEncryptionKey(),
options.getToken(),
options.getTokenSource()))
.withUrl(options.getUrl())
.withBatchCount(options.getBatchCount())
.withParallelism(options.getParallelism())
.withDisableCertificateValidation(options.getDisableCertificateValidation())
.withRootCaCertificatePath(options.getRootCaCertificatePath())
.withEnableBatchLogs(options.getEnableBatchLogs())
.withEnableGzipHttpCompression(options.getEnableGzipHttpCompression())
.build());
// 5a) Wrap write failures into a FailsafeElement.
PCollection<FailsafeElement<String, String>> wrappedSplunkWriteErrors =
writeErrors.apply(
"WrapSplunkWriteErrors",
ParDo.of(
new DoFn<SplunkWriteError, FailsafeElement<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
SplunkWriteError error = context.element();
FailsafeElement<String, String> failsafeElement =
FailsafeElement.of(error.payload(), error.payload());
if (error.statusMessage() != null) {
failsafeElement.setErrorMessage(error.statusMessage());
}
if (error.statusCode() != null) {
failsafeElement.setErrorMessage(
String.format("Splunk write status code: %d", error.statusCode()));
}
context.output(failsafeElement);
}
}));
// 6) Collect errors from UDF transform (#4), SplunkEvent transform (#5)
// and writing to Splunk HEC (#6) and stream into a Pub/Sub deadletter topic.
PCollectionList.of(
ImmutableList.of(
convertToEventTuple.get(SPLUNK_EVENT_DEADLETTER_OUT),
wrappedSplunkWriteErrors,
transformedOutput.get(UDF_DEADLETTER_OUT)))
.apply("FlattenErrors", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
.setErrorRecordsTopic(options.getOutputDeadletterTopic())
.build());
return pipeline.run();
}
/**
* The {@link PubSubToSplunkOptions} class provides the custom options passed by the executor at
* the command line.
*/
public interface PubSubToSplunkOptions
extends SplunkOptions,
PubsubReadSubscriptionOptions,
PubsubWriteDeadletterTopicOptions,
JavascriptTextTransformerOptions {}
/**
* A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
* returns a {@link PCollection} of {@link String} messages.
*/
private static class ReadMessages extends PTransform<PBegin, PCollection<String>> {
private final ValueProvider<String> subscriptionName;
private final ValueProvider<Boolean> inputIncludePubsubMessageFlag;
private Boolean includePubsubMessage;
ReadMessages(
ValueProvider<String> subscriptionName,
ValueProvider<Boolean> inputIncludePubsubMessageFlag) {
this.subscriptionName = subscriptionName;
this.inputIncludePubsubMessageFlag = inputIncludePubsubMessageFlag;
}
@Override
public PCollection<String> expand(PBegin input) {
return input
.apply(
"ReadPubsubMessage",
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriptionName))
.apply(
"ExtractMessageIfRequired",
ParDo.of(
new DoFn<PubsubMessage, String>() {
@Setup
public void setup() {
if (inputIncludePubsubMessageFlag != null) {
includePubsubMessage = inputIncludePubsubMessageFlag.get();
}
includePubsubMessage =
MoreObjects.firstNonNull(
includePubsubMessage, DEFAULT_INCLUDE_PUBSUB_MESSAGE);
LOG.info("includePubsubMessage set to: {}", includePubsubMessage);
}
@ProcessElement
public void processElement(ProcessContext context) {
if (includePubsubMessage) {
context.output(formatPubsubMessage(context.element()));
} else {
context.output(
new String(context.element().getPayload(), StandardCharsets.UTF_8));
}
}
}))
.apply(
"CountMessages",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_MESSAGES_COUNTER.inc();
context.output(context.element());
}
}));
}
}
/**
* Utility method that formats {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage} according
* to the model defined in {@link com.google.pubsub.v1.PubsubMessage}.
*
* @param pubsubMessage {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage}
* @return JSON String that adheres to the model defined in {@link
* com.google.pubsub.v1.PubsubMessage}
*/
@VisibleForTesting
protected static String formatPubsubMessage(PubsubMessage pubsubMessage) {
JsonObject messageJson = new JsonObject();
String payload = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
try {
JsonObject data = GSON.fromJson(payload, JsonObject.class);
messageJson.add(PUBSUB_MESSAGE_DATA_FIELD, data);
} catch (JsonSyntaxException e) {
messageJson.addProperty(PUBSUB_MESSAGE_DATA_FIELD, payload);
}
JsonObject attributes = getAttributesJson(pubsubMessage.getAttributeMap());
messageJson.add(PUBSUB_MESSAGE_ATTRIBUTE_FIELD, attributes);
if (pubsubMessage.getMessageId() != null) {
messageJson.addProperty(PUBSUB_MESSAGE_ID_FIELD, pubsubMessage.getMessageId());
}
return messageJson.toString();
}
/**
* Constructs a {@link JsonObject} from a {@link Map} of Pub/Sub attributes.
*
* @param attributesMap {@link Map} of Pub/Sub attributes
* @return {@link JsonObject} of Pub/Sub attributes
*/
private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
JsonObject attributesJson = new JsonObject();
for (String key : attributesMap.keySet()) {
attributesJson.addProperty(key, attributesMap.get(key));
}
return attributesJson;
}
}
Pub/Sub to Avro Files on Cloud Storage
The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads
data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage
bucket.
Requirements for this pipeline:
The input Pub/Sub topic must exist prior to pipeline execution.
Template parameters
Parameter
Description
inputTopic
Pub/Sub topic to subscribe for message consumption. The topic name must be
in the format of projects/<project-id>/topics/<topic-name>.
outputDirectory
Output directory where output Avro files are archived. Must contain / at the end.
For example: gs://example-bucket/example-directory/.
avroTempDirectory
Directory for temporary Avro files. Must contain / at the end. For example:
gs://example-bucket/example-directory/.
outputFilenamePrefix
(Optional) Output filename prefix for the Avro files.
outputFilenameSuffix
(Optional) Output filename suffix for the Avro files.
outputShardTemplate
(Optional) The shard template of the output file. It is specified as repeating sequences of
the letters S or N. For example, SSS-NNN. These are
replaced with either the shard number or the total number of shards, respectively. When this
parameter is not specified, the default template format is W-P-SS-of-NN.
Running the Pub/Sub to Cloud Storage Avro template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files; for
example, gs://your-bucket/temp
TOPIC_NAME: the Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
FILENAME_PREFIX: the preferred output filename prefix
FILENAME_SUFFIX: the preferred output filename suffix
SHARD_TEMPLATE: the preferred output shard template
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files; for
example, gs://your-bucket/temp
TOPIC_NAME: the Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
FILENAME_PREFIX: the preferred output filename prefix
FILENAME_SUFFIX: the preferred output filename suffix
SHARD_TEMPLATE: the preferred output shard template
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed Avro files at the specified output directory.
*
* <p>Files output will have the following schema:
*
* <pre>
* {
* "type": "record",
* "name": "AvroPubsubMessageRecord",
* "namespace": "com.google.cloud.teleport.avro",
* "fields": [
* {"name": "message", "type": {"type": "array", "items": "bytes"}},
* {"name": "attributes", "type": {"type": "map", "values": "string"}},
* {"name": "timestamp", "type": "long"}
* ]
* }
* </pre>
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToAvro
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-avro
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.avro"
* </pre>
*/
public class PubsubToAvro {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@Description(
"The Cloud Pub/Sub subscription to consume from. "
+ "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscription-name>.")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@Description("The Cloud Pub/Sub topic to read from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@Description("The directory to output files to. Must end with a slash.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description("The filename prefix of the files to write to.")
@Default.String("output")
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@Description("The suffix of the files to write.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@Description("The Avro Write Temporary Directory. Must end with /")
@Required
ValueProvider<String> getAvroTempDirectory();
void setAvroTempDirectory(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> messages = null;
/*
* Steps:
* 1) Read messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed data into Avro files, one per window by default.
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
messages
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
AvroIO.write(AvroPubsubMessageRecord.class)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input)))
/*.withTempDirectory(FileSystems.matchNewResource(
options.getAvroTempDirectory(),
Boolean.TRUE))
*/
.withWindowedWrites()
.withNumShards(options.getNumShards()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Converts an incoming {@link PubsubMessage} to the {@link AvroPubsubMessageRecord} class by
* copying it's fields and the timestamp of the message.
*/
static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
}
}
Pub/Sub to Text Files on Cloud Storage
The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads
records from Pub/Sub and saves them as a series of Cloud Storage files in text
format. The template can be used as a quick way to save data in Pub/Sub for future
use. By default, the template generates a new file every 5 minutes.
Requirements for this pipeline:
The Pub/Sub topic must exist prior to execution.
The messages published to the topic must be in text format.
The messages published to the topic must not contain any newlines. Note that each
Pub/Sub message is saved as a single line in the output file.
Template parameters
Parameter
Description
inputTopic
The Pub/Sub topic to read the input from. The topic name should be in the
format projects/<project-id>/topics/<topic-name>.
outputDirectory
The path and filename prefix for writing output files. For example,
gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix
The prefix to place on each windowed file. For example, output-
outputFilenameSuffix
The suffix to place on each windowed file, typically a file extension such as
.txt or .csv.
outputShardTemplate
The shard template defines the dynamic portion of each windowed file. By default, the
pipeline uses a single shard for output to the file system within each window. This means
that all data outputs into a single file per window. The outputShardTemplate
defaults to W-P-SS-of-NN where W is the window date range,
P is the pane info, S is the shard number, and N is
the number of shards. In case of a single file, the SS-of-NN portion of the
outputShardTemplate is 00-of-01.
Running the Pub/Sub to Text Files on Cloud Storage template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToText
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-text
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* windowDuration=5m,\
* numShards=1,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
* </pre>
*/
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@Description(
"The Cloud Pub/Sub subscription to consume from. "
+ "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscription-name>.")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@Description("The Cloud Pub/Sub topic to read from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@Description("The directory to output files to. Must end with a slash.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description("The directory to output temporary files to. Must end with a slash.")
ValueProvider<String> getUserTempLocation();
void setUserTempLocation(ValueProvider<String> value);
@Description("The filename prefix of the files to write to.")
@Default.String("output")
@Required
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@Description("The suffix of the files to write.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static ValueProvider<String> maybeUseUserTempLocation(
ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
return DualInputNestedValueProvider.of(
userTempLocation,
outputLocation,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return (input.getX() != null) ? input.getX() : input.getY();
}
});
}
}
Pub/Sub to MongoDB
The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents.
If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). Any errors occurred due to schema mismatch,
malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. If a table for unprocessed records does
not exist prior to execution, the pipeline automatically creates this table.
Requirements for this pipeline:
The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.
The MongoDB cluster must exist and should be acccessible from the Dataflow worker machines.
Template parameters
Parameter
Description
inputSubscription
Name of the Pub/Sub subscription. For example: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri
Comma separated list of MongoDB servers. For example: 192.285.234.12:27017,192.287.123.11:27017
database
Database in MongoDB to store the collection. For example: my-db.
collection
Name of the collection inside MongoDB database. For example: my-collection.
deadletterTable
BigQuery table that store messages due to failures (mismatched schema, malformed json etc). For example: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
batchSize
(Optional) Batch size used for batch insertion of documents into MongoDB. Default: 1000.
batchSizeBytes
(Optional) Batch size in bytes. Default: 5242880.
maxConnectionIdleTime
(Optional) Maximum idle time allowed in seconds before connection time out occurs. Default: 60000.
sslEnabled
(Optional) Boolean value indicating whether connection to MongoDB is SSL enabled. Default: true.
ignoreSSLCertificate
(Optional) Boolean value indicating if SSL certifcate should be ignored. Default: true.
withOrdered
(Optional) Boolean value enabling ordered bulk insertions into MongoDB. Default: true.
withSSLInvalidHostNameAllowed
(Optional) Boolean value indicating if invalid host name is allowed for SSL connection. Default: true.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
JOB_NAME:
a unique job name of your choice
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
DATABASE: the name of the MongoDB database (for example, users)
COLLECTION: the name of the MongoDB collection (for example, profiles)
UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
JOB_NAME:
a unique job name of your choice
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
DATABASE: the name of the MongoDB database (for example, users)
COLLECTION: the name of the MongoDB collection (for example, profiles)
UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToMongoDB} pipeline is a streaming pipeline which ingests data in JSON format
* from PubSub, applies a Javascript UDF if provided and inserts resulting records as Bson Document
* in MongoDB. If the element fails to be processed then it is written to a deadletter table in
* BigQuery.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The PubSub topic and subscriptions exist
* <li>The MongoDB is up and running
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_NAME=my-project
* BUCKET_NAME=my-bucket
* INPUT_SUBSCRIPTION=my-subscription
* MONGODB_DATABASE_NAME=testdb
* MONGODB_HOSTNAME=my-host:port
* MONGODB_COLLECTION_NAME=testCollection
* DEADLETTERTABLE=project:dataset.deadletter_table_name
*
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.v2.templates.PubSubToMongoDB \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_NAME} \
* --stagingLocation=gs://${BUCKET_NAME}/staging \
* --tempLocation=gs://${BUCKET_NAME}/temp \
* --runner=DataflowRunner \
* --inputSubscription=${INPUT_SUBSCRIPTION} \
* --mongoDBUri=${MONGODB_HOSTNAME} \
* --database=${MONGODB_DATABASE_NAME} \
* --collection=${MONGODB_COLLECTION_NAME} \
* --deadletterTable=${DEADLETTERTABLE}"
* </pre>
*/
public class PubSubToMongoDB {
/**
* Options supported by {@link PubSubToMongoDB}
*
* <p>Inherits standard configuration options.
*/
/** The tag for the main output of the json transformation. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToMongoDB.class);
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*
* <p>Inherits standard configuration options, options from {@link
* JavascriptTextTransformer.JavascriptTextTransformerOptions}.
*/
public interface Options
extends JavascriptTextTransformer.JavascriptTextTransformerOptions, PipelineOptions {
@Description(
"The Cloud Pub/Sub subscription to consume from."
+ "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscriptions-name>")
@Validation.Required
String getInputSubscription();
void setInputSubscription(String inputSubscription);
@Description("The MongoDB database to push the Documents to.")
@Validation.Required
String getDatabase();
void setDatabase(String database);
@Description(
"The host addresses of the MongoDB"
+ "Multiple addresses to be specified with a comma separated value e.g."
+ "host1:port,host2:port,host3:port")
@Validation.Required
String getMongoDBUri();
void setMongoDBUri(String mongoDBUri);
@Description("The Collection in mongoDB to put documents to.")
@Validation.Required
String getCollection();
void setCollection(String collection);
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format.")
@Validation.Required
String getDeadletterTable();
void setDeadletterTable(String deadletterTable);
@Description("Batch size in number of documents. Default: 1000")
@Default.Long(1024)
Long getBatchSize();
void setBatchSize(Long batchSize);
@Description("Batch size in number of bytes. Default: 5242880 (5mb)")
@Default.Long(5242880)
Long getBatchSizeBytes();
void setBatchSizeBytes(Long batchSizeBytes);
@Description("Maximum Connection idle time in ms. Default: 60000")
@Default.Integer(60000)
int getMaxConnectionIdleTime();
void setMaxConnectionIdleTime(int maxConnectionIdleTime);
@Description("Specify if SSL is enabled. Default: true")
@Default.Boolean(true)
Boolean getSslEnabled();
void setSslEnabled(Boolean sslEnabled);
@Description("Specify whether to ignore SSL certificate. Default: true")
@Default.Boolean(true)
Boolean getIgnoreSSLCertificate();
void setIgnoreSSLCertificate(Boolean ignoreSSLCertificate);
@Description("Enable ordered bulk insertions. Default: true")
@Default.Boolean(true)
Boolean getWithOrdered();
void setWithOrdered(Boolean withOrdered);
@Description("Enable invalidHostNameAllowed for ssl connection. Default: true")
@Default.Boolean(true)
Boolean getWithSSLInvalidHostNameAllowed();
void setWithSSLInvalidHostNameAllowed(Boolean withSSLInvalidHostNameAllowed);
}
/** DoFn that will parse the given string elements as Bson Documents. */
private static class ParseAsDocumentsFn extends DoFn<String, Document> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(Document.parse(context.element()));
}
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line.
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coders for pipeline
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
* 2) Apply Javascript UDF if provided.
* 3) Write to MongoDB
*
*/
LOG.info("Reading from subscription: " + options.getInputSubscription());
PCollectionTuple convertedPubsubMessages =
pipeline
/*
* Step #1: Read from a PubSub subscription.
*/
.apply(
"Read PubSub Subscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()))
/*
* Step #2: Apply Javascript Transform and transform, if provided and transform
* the PubsubMessages into Json documents.
*/
.apply(
"Apply Javascript UDF",
PubSubMessageToJsonDocument.newBuilder()
.setJavascriptTextTransformFunctionName(
options.getJavascriptTextTransformFunctionName())
.setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
.build());
/*
* Step #3a: Write Json documents into MongoDB using {@link MongoDbIO.write}.
*/
convertedPubsubMessages
.get(TRANSFORM_OUT)
.apply(
"Get Json Documents",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.apply("Parse as BSON Document", ParDo.of(new ParseAsDocumentsFn()))
.apply(
"Put to MongoDB",
MongoDbIO.write()
.withBatchSize(options.getBatchSize())
.withUri(String.format("mongodb://%s", options.getMongoDBUri()))
.withDatabase(options.getDatabase())
.withCollection(options.getCollection())
.withIgnoreSSLCertificate(options.getIgnoreSSLCertificate())
.withMaxConnectionIdleTime(options.getMaxConnectionIdleTime())
.withOrdered(options.getWithOrdered())
.withSSLEnabled(options.getSslEnabled())
.withSSLInvalidHostNameAllowed(options.getWithSSLInvalidHostNameAllowed()));
/*
* Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
*/
convertedPubsubMessages
.get(TRANSFORM_DEADLETTER_OUT)
.apply(
"Write Transform Failures To BigQuery",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(options.getDeadletterTable())
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* The {@link PubSubMessageToJsonDocument} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into JSON objects for insertion into MongoDB while applying an
* optional UDF to the input. The executions of the UDF and transformation to Json objects is done
* in a fail-safe way by wrapping the element with it's original payload inside the {@link
* FailsafeElement} class. The {@link PubSubMessageToJsonDocument} transform will output a {@link
* PCollectionTuple} which contains all output and dead-letter {@link PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToMongoDB#TRANSFORM_OUT} - Contains all records successfully converted to
* JSON objects.
* <li>{@link PubSubToMongoDB#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
@AutoValue
public abstract static class PubSubMessageToJsonDocument
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
public static Builder newBuilder() {
return new AutoValue_PubSubToMongoDB_PubSubMessageToJsonDocument.Builder();
}
@Nullable
public abstract String javascriptTextTransformGcsPath();
@Nullable
public abstract String javascriptTextTransformFunctionName();
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));
// If a Udf is supplied then use it to parse the PubSubMessages.
if (javascriptTextTransformGcsPath() != null) {
return failsafeElements.apply(
"InvokeUDF",
JavascriptTextTransformer.FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(javascriptTextTransformGcsPath())
.setFunctionName(javascriptTextTransformFunctionName())
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
} else {
return failsafeElements.apply(
"ProcessPubSubMessages",
ParDo.of(new ProcessFailsafePubSubFn())
.withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
}
}
/** Builder for {@link PubSubMessageToJsonDocument}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setJavascriptTextTransformGcsPath(
String javascriptTextTransformGcsPath);
public abstract Builder setJavascriptTextTransformFunctionName(
String javascriptTextTransformFunctionName);
public abstract PubSubMessageToJsonDocument build();
}
}
/**
* The {@link ProcessFailsafePubSubFn} class processes a {@link FailsafeElement} containing a
* {@link PubsubMessage} and a String of the message's payload {@link PubsubMessage#getPayload()}
* into a {@link FailsafeElement} of the original {@link PubsubMessage} and a JSON string that has
* been processed with {@link Gson}.
*
* <p>If {@link PubsubMessage#getAttributeMap()} is not empty then the message attributes will be
* serialized along with the message payload.
*/
static class ProcessFailsafePubSubFn
extends DoFn<FailsafeElement<PubsubMessage, String>, FailsafeElement<PubsubMessage, String>> {
private static final Counter successCounter =
Metrics.counter(PubSubMessageToJsonDocument.class, "successful-json-conversion");
private static Gson gson = new Gson();
private static final Counter failedCounter =
Metrics.counter(PubSubMessageToJsonDocument.class, "failed-json-conversion");
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage pubsubMessage = context.element().getOriginalPayload();
JsonObject messageObject = new JsonObject();
try {
if (pubsubMessage.getPayload().length > 0) {
messageObject = gson.fromJson(new String(pubsubMessage.getPayload()), JsonObject.class);
}
// If message attributes are present they will be serialized along with the message payload
if (pubsubMessage.getAttributeMap() != null) {
pubsubMessage.getAttributeMap().forEach(messageObject::addProperty);
}
context.output(FailsafeElement.of(pubsubMessage, messageObject.toString()));
successCounter.inc();
} catch (JsonSyntaxException e) {
context.output(
TRANSFORM_DEADLETTER_OUT,
FailsafeElement.of(context.element())
.setErrorMessage(e.getMessage())
.setStacktrace(Throwables.getStackTraceAsString(e)));
failedCounter.inc();
}
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub to Elasticsearch
The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. The Dataflow template uses Elasticsearch's data streams feature to store time series data across multiple indices while giving you a single named resource for requests. Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.
Requirements for this pipeline
The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.
A publicly reachable Elasticsearch host on a GCP instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See Google Cloud Integration for Elastic for more details.
A Pub/Sub topic for error output.
Template parameters
Parameter
Description
inputSubscription
The Pub/Sub subscription to consume from. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl
Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud.
apiKey
Base64 Encoded API key used for authentication.
errorOutputTopic
Pub/Sub output topic for publishing failed records in the format of projects/<project-id>/topics/<topic-name>
dataset
(Optional) The type of logs sent via Pub/Sub, for which we have an out-of-the box dashboard. Known log types values are audit, vpcflow, and firewall. Default: pubsub.
namespace
(Optional) An arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. Default: default.
batchSize
(Optional) Batch size in number of documents. Default: 1000.
batchSizeBytes
(Optional) Batch size in number of bytes. Default: 5242880 (5mb).
maxRetryAttempts
(Optional) Max retry attempts, must be > 0. Default: no retries.
maxRetryDuration
(Optional) Max retry duration in milliseconds, must be > 0. Default: no retries.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
ERROR_OUTPUT_TOPIC: your Pub/Sub topic for error output
SUBSCRIPTION_NAME: your Pub/Sub subscription name
CONNECTION_URL: your Elasticsearch URL
DATASET: your log type
NAMESPACE: your namespace for dataset
APIKEY: your base64 encoded API key for authentication
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
ERROR_OUTPUT_TOPIC: your Pub/Sub topic for error output
SUBSCRIPTION_NAME: your Pub/Sub subscription name
CONNECTION_URL: your Elasticsearch URL
DATASET: your log type
NAMESPACE: your namespace for dataset
APIKEY: your base64 encoded API key for authentication
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.elasticsearch.templates;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.elasticsearch.options.PubSubToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.FailedPubsubMessageToPubsubTopicFn;
import com.google.cloud.teleport.v2.elasticsearch.transforms.ProcessEventMetadata;
import com.google.cloud.teleport.v2.elasticsearch.transforms.PubSubMessageToJsonDocument;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIndex;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToElasticsearch} pipeline is a streaming pipeline which ingests data in JSON
* format from PubSub, applies a Javascript UDF if provided and writes the resulting records to
* Elasticsearch. If the element fails to be processed then it is written to an error output table
* in BigQuery.
*
* <p>Please refer to <b><a href=
* "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md">
* README.md</a></b> for further information.
*/
public class PubSubToElasticsearch {
/** The tag for the main output of the json transformation. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the error output table of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_ERROROUTPUT_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToElasticsearch.class);
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line.
PubSubToElasticsearchOptions pubSubToElasticsearchOptions =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubSubToElasticsearchOptions.class);
pubSubToElasticsearchOptions.setIndex(
new ElasticsearchIndex(
pubSubToElasticsearchOptions.getDataset(),
pubSubToElasticsearchOptions.getNamespace())
.getIndex());
run(pubSubToElasticsearchOptions);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(PubSubToElasticsearchOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coders for pipeline
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
* 2) Apply Javascript UDF if provided.
* 3) Index Json string to output ES index.
*
*/
LOG.info("Reading from subscription: " + options.getInputSubscription());
PCollectionTuple convertedPubsubMessages =
pipeline
/*
* Step #1: Read from a PubSub subscription.
*/
.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()))
/*
* Step #2: Transform the PubsubMessages into Json documents.
*/
.apply(
"ConvertMessageToJsonDocument",
PubSubMessageToJsonDocument.newBuilder()
.setJavascriptTextTransformFunctionName(
options.getJavascriptTextTransformFunctionName())
.setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
.build());
/*
* Step #3a: Write Json documents into Elasticsearch using {@link ElasticsearchTransforms.WriteToElasticsearch}.
*/
convertedPubsubMessages
.get(TRANSFORM_OUT)
.apply(
"GetJsonDocuments",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.apply("Insert metadata", new ProcessEventMetadata())
.apply(
"WriteToElasticsearch",
WriteToElasticsearch.newBuilder()
.setOptions(options.as(PubSubToElasticsearchOptions.class))
.build());
/*
* Step 3b: Write elements that failed processing to error output PubSub topic via {@link PubSubIO}.
*/
convertedPubsubMessages
.get(TRANSFORM_ERROROUTPUT_OUT)
.apply(ParDo.of(new FailedPubsubMessageToPubsubTopicFn()))
.apply("writeFailureMessages", PubsubIO.writeMessages().to(options.getErrorOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}
Datastream to Cloud Spanner
The Datastream to Cloud Spanner template is a streaming pipeline that reads
Datastream events
from a Cloud Storage bucket and writes them to a Cloud Spanner database. It is intended for
data migration from Datastream sources to Cloud Spanner.
All tables required for migration must exist in the destination Cloud Spanner database prior to
template execution. Hence schema migration from a source database to destination Cloud Spanner
must be completed prior to data migration. Data can exist in the tables prior to migration. This
template does not propagate Datastream schema changes to the Cloud Spanner
database.
Data consistency is guaranteed only at the end of migration when all data has been written to
Cloud Spanner. To store ordering information for each record written to Cloud Spanner, this
template creates an additional table (called a shadow table) for each table in the
Cloud Spanner database. This is used to ensure consistency at the end of migration. The shadow
tables are not deleted after migration and can be used for validation purposes at the end of
migration.
Any errors that occur during operation, such as schema mismatches, malformed JSON files, or errors
resulting from executing transforms, are recorded in an error queue. The error queue is a
Cloud Storage folder which stores all the Datastream events that had encountered
errors along with the error reason in text format. The errors can be transient or permanent and
are stored in appropriate Cloud Storage folders in the error queue. The transient errors are
retried automatically while the permanent errors are not. In case of permanent errors, you have
the option of making corrections to the change events and moving them to the retriable bucket
while the template is running.
Requirements for this pipeline:
A Datastream stream in Running or Not started state.
A Cloud Storage bucket where Datastream events are replicated.
A Cloud Spanner database with existing tables. These tables can be empty or contain data.
Template parameters
Parameter
Description
inputFilePattern
The file location for Datastream files in Cloud Storage to replicate. Typically, this is the root path for a stream.
streamName
The name or template for the stream to poll for schema information and source type.
instanceId
The Cloud Spanner instance where the changes are replicated.
databaseId
The Cloud Spanner database where the changes are replicated.
projectId
The Cloud Spanner project ID.
deadLetterQueueDirectory
(Optional) This is the file path to store the error queue output. Default is a directory under the Dataflow job's temp location.
inputFileFormat
(Optional) The format of the output file produced by Datastream. For example avro,json. Default, avro.
shadowTablePrefix
(Optional) The prefix used to name shadow tables. Default: shadow_.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE: your Cloud Spanner instance.
CLOUDSPANNER_DATABASE: your Cloud Spanner database.
DLQ: the Cloud Storage path for the error queue directory.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE: your Cloud Spanner instance.
CLOUDSPANNER_DATABASE: your Cloud Spanner database.
DLQ: the Cloud Storage path for the error queue directory.
Text Files on Cloud Storage to BigQuery (Stream)
The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline
that allows you to stream text files stored in Cloud Storage, transform them using a
JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.
The pipeline runs indefinitely and needs to be terminated manually via a
cancel and not a
drain, due to its use of the
Watch transform, which is a splittable DoFn that does not support
draining.
Requirements for this pipeline:
Create a JSON file that describes the schema of your output table in BigQuery.
Ensure that there is a top-level JSON array titled fields and that its
contents follow the pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.
For example:
Create a JavaScript (.js) file with your UDF function that supplies the logic
to transform the lines of text. Note that your function must return a JSON string.
For example, this function splits each line of a CSV file and returns a JSON string after
transforming the values.
function transform(line) {
var values = line.split(',');
var obj = new Object();
obj.location = values[0];
obj.name = values[1];
obj.age = values[2];
obj.color = values[3];
obj.coffee = values[4];
var jsonString = JSON.stringify(obj);
return jsonString;
}
Template parameters
Parameter
Description
javascriptTextTransformGcsPath
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
JSONPath
Cloud Storage location of your BigQuery schema file, described as a JSON.
For example: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
outputTable
The fully qualified BigQuery table.
For example: my-project:dataset.table
inputFilePattern
Cloud Storage location of the text you'd like to process.
For example: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory
Temporary directory for BigQuery loading process.
For example: gs://my-bucket/my-files/temp_dir
outputDeadletterTable
Table for messages that failed to reach the output table.
For example: my-project:dataset.my-unprocessed-table. If it doesn't exist,
it is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead.
Running the Cloud Storage Text to BigQuery (Stream) template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to
the JSON file containing the schema definition
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA: the Cloud Storage path to your
text dataset
BIGQUERY_TABLE: your BigQuery table name
BIGQUERY_UNPROCESSED_TABLE: the name of your
BigQuery table for unprocessed messages
PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the
temp directory
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to
the JSON file containing the schema definition
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA: the Cloud Storage path to your
text dataset
BIGQUERY_TABLE: your BigQuery table name
BIGQUERY_UNPROCESSED_TABLE: the name of your
BigQuery table for unprocessed messages
PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the
temp directory
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link TextToBigQueryStreaming} is a streaming version of {@link TextIOToBigQuery} pipeline
* that reads text files, applies a JavaScript UDF and writes the output to BigQuery. The pipeline
* continuously polls for new files, reads them row-by-row and processes each record into BigQuery.
* The polling interval is set at 10 seconds.
*
* <p>Example Usage:
*
* <pre>
* {@code mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.TextToBigQueryStreaming \
* -Dexec.args="\
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${STAGING_BUCKET}/staging \
* --tempLocation=gs://${STAGING_BUCKET}/tmp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://path/to/input* \
* --JSONPath=gs://path/to/json/schema.json \
* --outputTable={$PROJECT_ID}:${OUTPUT_DATASET}.${OUTPUT_TABLE} \
* --javascriptTextTransformGcsPath=gs://path/to/transform/udf.js \
* --javascriptTextTransformFunctionName=${TRANSFORM_NAME} \
* --bigQueryLoadingTemporaryDirectory=gs://${STAGING_BUCKET}/tmp \
* --outputDeadletterTable=${PROJECT_ID}:${ERROR_DATASET}.${ERROR_TABLE}"
* }
* </pre>
*/
public class TextToBigQueryStreaming {
private static final Logger LOG = LoggerFactory.getLogger(TextToBigQueryStreaming.class);
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the dead-letter output of the udf. */
private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the main output of the json transformation. */
private static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the json to table row transform. */
private static final TupleTag<FailsafeElement<String, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Default interval for polling files in GCS. */
private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);
/** Coder for FailsafeElement. */
private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* TextToBigQueryStreaming#run(TextToBigQueryStreamingOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
TextToBigQueryStreamingOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TextToBigQueryStreamingOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(TextToBigQueryStreamingOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<String, String> coder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
/*
* Steps:
* 1) Read from the text source continuously.
* 2) Convert to FailsafeElement.
* 3) Apply Javascript udf transformation.
* - Tag records that were successfully transformed and those
* that failed transformation.
* 4) Convert records to TableRow.
* - Tag records that were successfully converted and those
* that failed conversion.
* 5) Insert successfully converted records into BigQuery.
* - Errors encountered while streaming will be sent to deadletter table.
* 6) Insert records that failed into deadletter table.
*/
PCollectionTuple transformedOutput =
pipeline
// 1) Read from the text source continuously.
.apply(
"ReadFromSource",
TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))
// 2) Convert to FailsafeElement.
.apply(
"ConvertToFailsafeElement",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(input -> FailsafeElement.of(input, input)))
// 3) Apply Javascript udf transformation.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
PCollectionTuple convertedTableRows =
transformedOutput
// 4) Convert records to TableRow.
.get(UDF_OUT)
.apply(
"ConvertJSONToTableRow",
FailsafeJsonToTableRow.<String>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
WriteResult writeResult =
convertedTableRows
// 5) Insert successfully converted records into BigQuery.
.get(TRANSFORM_OUT)
.apply(
"InsertIntoBigQuery",
BigQueryIO.writeTableRows()
.withJsonSchema(getSchemaFromGCS(options.getJSONPath()))
.to(options.getOutputTable())
.withExtendedErrorInfo()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMethod(Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
// Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(TextToBigQueryStreaming::wrapBigQueryInsertError));
// 6) Insert records that failed transformation or conversion into deadletter table
PCollectionList.of(
ImmutableList.of(
transformedOutput.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT),
failedInserts))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTable(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
*
* @param insertError BigQueryInsert error.
* @return FailsafeElement object.
* @throws IOException
*/
static FailsafeElement<String, String> wrapBigQueryInsertError(BigQueryInsertError insertError) {
FailsafeElement<String, String> failsafeElement;
try {
String rowPayload = JSON_FACTORY.toString(insertError.getRow());
String errorMessage = JSON_FACTORY.toString(insertError.getError());
failsafeElement = FailsafeElement.of(rowPayload, rowPayload);
failsafeElement.setErrorMessage(errorMessage);
} catch (IOException e) {
throw new RuntimeException(e);
}
return failsafeElement;
}
/**
* Method to read a BigQuery schema file from GCS and return the file contents as a string.
*
* @param gcsPath Path string for the schema file in GCS.
* @return File contents as a string.
*/
private static ValueProvider<String> getSchemaFromGCS(ValueProvider<String> gcsPath) {
return NestedValueProvider.of(
gcsPath,
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
ResourceId sourceResourceId = FileSystems.matchNewResource(input, false);
String schema;
try (ReadableByteChannel rbc = FileSystems.open(sourceResourceId)) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
try (WritableByteChannel wbc = Channels.newChannel(baos)) {
ByteStreams.copy(rbc, wbc);
schema = baos.toString(Charsets.UTF_8.name());
LOG.info("Extracted schema: " + schema);
}
}
} catch (IOException e) {
LOG.error("Error extracting schema: " + e.getMessage());
throw new RuntimeException(e);
}
return schema;
}
});
}
/**
* The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed
* by the executor at the command-line.
*/
public interface TextToBigQueryStreamingOptions extends TextIOToBigQuery.Options {
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format. If it doesn't exist, it will be created during pipeline execution.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
}
Text Files on Cloud Storage to Pub/Sub (Stream)
This template creates a streaming pipeline that continuously polls for new text files uploaded to
Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub
topic. The template publishes records in a newline-delimited file containing JSON
records or CSV file to a Pub/Sub topic for real-time processing. You can use this
template to replay data to Pub/Sub.
The pipeline runs indefinitely and needs to be terminated manually via a 'cancel' and not a
'drain', due to its use of the 'Watch' transform, which is a 'SplittableDoFn' that does not
support draining.
Currently, the polling interval is fixed and set to 10 seconds. This template does not
set any timestamp on the individual records, so the event time is equal to the publishing
time during execution. If your pipeline relies on an accurate event time for processing, you
should not use this pipeline.
Requirements for this pipeline:
Input files must be in newline-delimited JSON or CSV format. Records that span
multiple lines in the source files can cause issues downstream, because each line within the files
is published as a message to Pub/Sub.
The Pub/Sub topic must exist prior to execution.
The pipeline runs indefinitely and needs to be terminated manually.
Template parameters
Parameter
Description
inputFilePattern
The input file pattern to read from. For example, gs://bucket-name/files/*.json
or gs://bucket-name/path/*.csv.
outputTopic
The Pub/Sub input topic to write to. The name should be in the format of
projects/<project-id>/topics/<topic-name>.
Running the Text Files on Cloud Storage to Pub/Sub (Stream) template
Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is us-central1.
For a list of regions where you can run a Dataflow job, see
Dataflow locations.
From the Dataflow template drop-down menu, select
the Text Files on Cloud Storage to Pub/Sub (Stream) template.
In the provided parameter fields, enter your parameter values.
Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
--region REGION_NAME\
--staging-location TEMP_LOCATION\
--parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Replace the following:
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage bucket
FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket
(for example, path/*.csv)
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.TextToPubsub.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Watch;
import org.joda.time.Duration;
/**
* The {@code TextToPubsubStream} is a streaming version of {@code TextToPubsub} pipeline that
* publishes records to Cloud Pub/Sub from a set of files. The pipeline continuously polls for new
* files, reads them row-by-row and publishes each record as a string message. The polling interval
* is fixed and equals to 10 seconds. At the moment, publishing messages with attributes is
* unsupported.
*
* <p>Example Usage:
*
* <pre>
* {@code mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.TextToPubsubStream \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${STAGING_BUCKET}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${STAGING_BUCKET}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://path/to/*.csv \
* --outputTopic=projects/${PROJECT_ID}/topics/${TOPIC_NAME}"
* }
* </pre>
*/
public class TextToPubsubStream extends TextToPubsub {
private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);
/**
* Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
* executes the pipeline.
*
* @param args Arguments passed in from the command-line.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Executes the pipeline with the provided execution parameters.
*
* @param options The execution parameters.
*/
public static PipelineResult run(Options options) {
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Read from the text source.
* 2) Write each text record to Pub/Sub
*/
pipeline
.apply(
"Read Text Data",
TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
}
Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)
The Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template is a streaming pipeline that reads csv
files from a Cloud Storage bucket, calls the Cloud Data Loss Prevention (Cloud DLP) API for
de-identification, and writes the de-identified data into the specified
BigQuery table. This template supports using both a Cloud DLP
inspection template and a Cloud DLP
de-identification template
.
This allows users to inspect for potentially sensitive information and de-identify,
as well as de-identify structured data where columns are specified to be de-identified
and no inspection is needed. It is also worth noting that this template does not support a
regional path for de-identification template location. Only a global path is supported.
Requirements for this pipeline:
The input data to tokenize must exist
The Cloud DLP Templates must exist (for example, DeidentifyTemplate and InspectTemplate). See
Cloud DLP templates for more details.
The BigQuery dataset must exist
Template parameters
Parameter
Description
inputFilePattern
The csv file(s) to read input data records from. Wildcarding is also accepted. For example, gs://mybucket/my_csv_filename.csv
or gs://mybucket/file-*.csv.
dlpProjectId
Cloud DLP project ID that owns the Cloud DLP API resource.
This Cloud DLP project can be the same project that owns the
Cloud DLP templates, or it can be a separate project.
For example, my_dlp_api_project.
deidentifyTemplateName
Cloud DLP deidentification template to use for API requests, specified with the pattern
projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}.
For example, projects/my_project/deidentifyTemplates/100.
datasetName
BigQuery dataset for sending tokenized results.
batchSize
Chunking/Batch size for sending data to inspect and/or detokenize. In the case of a csv file,
batchSize is the number of rows in a batch. Users must determine the batch size based on the size
of the records and the sizing of the file. Note that the Cloud DLP API has a payload size limit of 524 KB per API call.
inspectTemplateName
(Optional) Cloud DLP inspection template to use for API requests, specified with the pattern
projects/{template_project_id}/identifyTemplates/{idTemplateId}.
For example, projects/my_project/identifyTemplates/100.
Running the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
INPUT_DATA: your input file path
DEIDENTIFY_TEMPLATE: the Cloud DLPDeidentify Template number
DATASET_NAME: the BigQuery dataset name
INSPECT_TEMPLATE_NUMBER: the Cloud DLPInspect Template number
BATCH_SIZE_VALUE: the batch size (# of rows per API for csv's)
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
DLP_API_PROJECT_ID: your Cloud DLP API project ID
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
INPUT_DATA: your input file path
DEIDENTIFY_TEMPLATE: the Cloud DLPDeidentify Template number
DATASET_NAME: the BigQuery dataset name
INSPECT_TEMPLATE_NUMBER: the Cloud DLPInspect Template number
BATCH_SIZE_VALUE: the batch size (# of rows per API for csv's)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.common.base.Charsets;
import com.google.privacy.dlp.v2.ContentItem;
import com.google.privacy.dlp.v2.DeidentifyContentRequest;
import com.google.privacy.dlp.v2.DeidentifyContentRequest.Builder;
import com.google.privacy.dlp.v2.DeidentifyContentResponse;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.ProjectName;
import com.google.privacy.dlp.v2.Table;
import com.google.privacy.dlp.v2.Value;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.ReadableFileCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link DLPTextToBigQueryStreaming} is a streaming pipeline that reads CSV files from a
* storage location (e.g. Google Cloud Storage), uses Cloud DLP API to inspect, classify, and mask
* sensitive information (e.g. PII Data like passport or SIN number) and at the end stores
* obfuscated data in BigQuery (Dynamic Table Creation) to be used for various purposes. e.g. data
* analytics, ML model. Cloud DLP inspection and masking can be configured by the user and can make
* use of over 90 built in detectors and masking techniques like tokenization, secure hashing, date
* shifting, partial masking, and more.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>DLP Templates exist (e.g. deidentifyTemplate, InspectTemplate)
* <li>The BigQuery Dataset exists
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/dlp-text-to-bigquery
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.DLPTextToBigQueryStreaming \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}"
*
* # Execute the template
* JOB_NAME=dlp-text-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputFilePattern=gs://<bucketName>/<fileName>.csv, batchSize=15,datasetName=<BQDatasetId>,
* dlpProjectId=<projectId>,
* deidentifyTemplateName=projects/{projectId}/deidentifyTemplates/{deIdTemplateId}
* </pre>
*/
public class DLPTextToBigQueryStreaming {
public static final Logger LOG = LoggerFactory.getLogger(DLPTextToBigQueryStreaming.class);
/** Default interval for polling files in GCS. */
private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(30);
/** Expected only CSV file in GCS bucket. */
private static final String ALLOWED_FILE_EXTENSION = String.valueOf("csv");
/** Regular expression that matches valid BQ table IDs. */
private static final Pattern TABLE_REGEXP = Pattern.compile("[-\\w$@]{1,1024}");
/** Default batch size if value not provided in execution. */
private static final Integer DEFAULT_BATCH_SIZE = 100;
/** Regular expression that matches valid BQ column name . */
private static final Pattern COLUMN_NAME_REGEXP = Pattern.compile("^[A-Za-z_]+[A-Za-z_0-9]*$");
/** Default window interval to create side inputs for header records. */
private static final Duration WINDOW_INTERVAL = Duration.standardSeconds(30);
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* DLPTextToBigQueryStreaming#run(TokenizePipelineOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
TokenizePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TokenizePipelineOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(TokenizePipelineOptions options) {
// Create the pipeline
Pipeline p = Pipeline.create(options);
/*
* Steps:
* 1) Read from the text source continuously based on default interval e.g. 30 seconds
* - Setup a window for 30 secs to capture the list of files emited.
* - Group by file name as key and ReadableFile as a value.
* 2) Output each readable file for content processing.
* 3) Split file contents based on batch size for parallel processing.
* 4) Process each split as a DLP table content request to invoke API.
* 5) Convert DLP Table Rows to BQ Table Row.
* 6) Create dynamic table and insert successfully converted records into BQ.
*/
PCollection<KV<String, Iterable<ReadableFile>>> csvFiles =
p
/*
* 1) Read from the text source continuously based on default interval e.g. 300 seconds
* - Setup a window for 30 secs to capture the list of files emited.
* - Group by file name as key and ReadableFile as a value.
*/
.apply(
"Poll Input Files",
FileIO.match()
.filepattern(options.getInputFilePattern())
.continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
.apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
.apply("Add File Name as Key", WithKeys.of(file -> getFileName(file)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), ReadableFileCoder.of()))
.apply(
"Fixed Window(30 Sec)",
Window.<KV<String, ReadableFile>>into(FixedWindows.of(WINDOW_INTERVAL))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.ZERO)))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(GroupByKey.create());
PCollection<KV<String, TableRow>> bqDataMap =
csvFiles
// 2) Output each readable file for content processing.
.apply(
"File Handler",
ParDo.of(
new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, ReadableFile>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String fileKey = c.element().getKey();
c.element()
.getValue()
.forEach(
file -> {
c.output(KV.of(fileKey, file));
});
}
}))
// 3) Split file contents based on batch size for parallel processing.
.apply(
"Process File Contents",
ParDo.of(
new CSVReader(
NestedValueProvider.of(
options.getBatchSize(),
batchSize -> {
if (batchSize != null) {
return batchSize;
} else {
return DEFAULT_BATCH_SIZE;
}
}))))
// 4) Create a DLP Table content request and invoke DLP API for each processsing
.apply(
"DLP-Tokenization",
ParDo.of(
new DLPTokenizationDoFn(
options.getDlpProjectId(),
options.getDeidentifyTemplateName(),
options.getInspectTemplateName())))
// 5) Convert DLP Table Rows to BQ Table Row
.apply("Process Tokenized Data", ParDo.of(new TableRowProcessorDoFn()));
// 6) Create dynamic table and insert successfully converted records into BQ.
bqDataMap.apply(
"Write To BQ",
BigQueryIO.<KV<String, TableRow>>write()
.to(new BQDestination(options.getDatasetName(), options.getDlpProjectId()))
.withFormatFunction(
element -> {
return element.getValue();
})
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withoutValidation()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
return p.run();
}
/**
* The {@link TokenizePipelineOptions} interface provides the custom execution options passed by
* the executor at the command-line.
*/
public interface TokenizePipelineOptions extends DataflowPipelineOptions {
@Description("The file pattern to read records from (e.g. gs://bucket/file-*.csv)")
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description(
"DLP Deidentify Template to be used for API request "
+ "(e.g.projects/{project_id}/deidentifyTemplates/{deIdTemplateId}")
@Required
ValueProvider<String> getDeidentifyTemplateName();
void setDeidentifyTemplateName(ValueProvider<String> value);
@Description(
"DLP Inspect Template to be used for API request "
+ "(e.g.projects/{project_id}/inspectTemplates/{inspectTemplateId}")
ValueProvider<String> getInspectTemplateName();
void setInspectTemplateName(ValueProvider<String> value);
@Description(
"DLP API has a limit for payload size of 524KB /api call. "
+ "That's why dataflow process will need to chunk it. User will have to decide "
+ "on how they would like to batch the request depending on number of rows "
+ "and how big each row is.")
@Required
ValueProvider<Integer> getBatchSize();
void setBatchSize(ValueProvider<Integer> value);
@Description("Big Query data set must exist before the pipeline runs (e.g. pii-dataset")
ValueProvider<String> getDatasetName();
void setDatasetName(ValueProvider<String> value);
@Description("Project id to be used for DLP Tokenization")
ValueProvider<String> getDlpProjectId();
void setDlpProjectId(ValueProvider<String> value);
}
/**
* The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
* chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
* batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
* restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
*/
static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
private ValueProvider<Integer> batchSize;
private PCollectionView<List<KV<String, List<String>>>> headerMap;
/** This counter is used to track number of lines processed against batch size. */
private Integer lineCount;
public CSVReader(ValueProvider<Integer> batchSize) {
lineCount = 1;
this.batchSize = batchSize;
}
@ProcessElement
public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
throws IOException {
for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
String fileKey = c.element().getKey();
try (BufferedReader br = getReader(c.element().getValue())) {
List<Table.Row> rows = new ArrayList<>();
Table dlpTable = null;
/** finding out EOL for this restriction so that we know the SOL */
int endOfLine = (int) (i * batchSize.get().intValue());
int startOfLine = (endOfLine - batchSize.get().intValue());
// getting the DLP table headers
Iterator<CSVRecord> csvRows = CSVFormat.DEFAULT.parse(br).iterator();
if (!csvRows.hasNext()) {
LOG.info("File `" + c.element().getKey() + "` is empty");
continue;
}
List<FieldId> dlpTableHeaders = toDlpTableHeaders(csvRows.next());
/** skipping all the rows that's not part of this restriction */
for (int line = 0; line < startOfLine; line++) {
if (csvRows.hasNext()) {
csvRows.next();
}
}
/** looping through buffered reader and creating DLP Table Rows equals to batch */
while (csvRows.hasNext() && lineCount <= batchSize.get()) {
CSVRecord csvRow = csvRows.next();
rows.add(convertCsvRowToTableRow(csvRow));
lineCount += 1;
}
/** creating DLP table and output for next transformation */
dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
c.output(KV.of(fileKey, dlpTable));
LOG.debug(
"Current Restriction From: {}, Current Restriction To: {},"
+ " StartofLine: {}, End Of Line {}, BatchData {}",
tracker.currentRestriction().getFrom(),
tracker.currentRestriction().getTo(),
startOfLine,
endOfLine,
dlpTable.getRowsCount());
}
}
}
private static List<FieldId> toDlpTableHeaders(CSVRecord headerRow) {
List<FieldId> result = new ArrayList<>();
for (String header : headerRow) {
result.add(FieldId.newBuilder().setName(header).build());
}
return result;
}
/**
* SDF needs to define a @GetInitialRestriction method that can create a restriction describing
* the complete work for a given element. For our case this would be the total number of rows
* for each CSV file. We will calculate the number of split required based on total number of
* rows and batch size provided.
*
* @throws IOException
*/
@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
throws IOException {
int rowCount = 0;
int totalSplit = 0;
try (BufferedReader br = getReader(csvFile.getValue())) {
/** assume first row is header */
int checkRowCount = (int) br.lines().count() - 1;
rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
totalSplit = rowCount / batchSize.get().intValue();
int remaining = rowCount % batchSize.get().intValue();
/**
* Adjusting the total number of split based on remaining rows. For example: batch size of
* 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
* range {7,8}
*/
if (remaining > 0) {
totalSplit = totalSplit + 2;
} else {
totalSplit = totalSplit + 1;
}
}
LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
return new OffsetRange(1, totalSplit);
}
/**
* SDF needs to define a @SplitRestriction method that can split the intital restricton to a
* number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
* produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
*/
@SplitRestriction
public void splitRestriction(
@Element KV<String, ReadableFile> csvFile,
@Restriction OffsetRange range,
OutputReceiver<OffsetRange> out) {
/** split the initial restriction by 1 */
for (final OffsetRange p : range.split(1, 1)) {
out.output(p);
}
}
@NewTracker
public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
}
private Table.Row convertCsvRowToTableRow(CSVRecord csvRow) {
/** convert from CSV row to DLP Table Row */
Iterator<String> valueIterator = csvRow.iterator();
Table.Row.Builder tableRowBuilder = Table.Row.newBuilder();
while (valueIterator.hasNext()) {
String value = valueIterator.next();
if (value != null) {
tableRowBuilder.addValues(Value.newBuilder().setStringValue(value.toString()).build());
} else {
tableRowBuilder.addValues(Value.newBuilder().setStringValue("").build());
}
}
return tableRowBuilder.build();
}
private List<String> getHeaders(List<KV<String, List<String>>> headerMap, String fileKey) {
return headerMap.stream()
.filter(map -> map.getKey().equalsIgnoreCase(fileKey))
.findFirst()
.map(e -> e.getValue())
.orElse(null);
}
}
/**
* The {@link DLPTokenizationDoFn} class executes tokenization request by calling DLP api. It uses
* DLP table as a content item as CSV file contains fully structured data. DLP templates (e.g.
* de-identify, inspect) need to exist before this pipeline runs. As response from the API is
* received, this DoFn ouptputs KV of new table with table id as key.
*/
static class DLPTokenizationDoFn extends DoFn<KV<String, Table>, KV<String, Table>> {
private ValueProvider<String> dlpProjectId;
private DlpServiceClient dlpServiceClient;
private ValueProvider<String> deIdentifyTemplateName;
private ValueProvider<String> inspectTemplateName;
private boolean inspectTemplateExist;
private Builder requestBuilder;
private final Distribution numberOfRowsTokenized =
Metrics.distribution(DLPTokenizationDoFn.class, "numberOfRowsTokenizedDistro");
private final Distribution numberOfBytesTokenized =
Metrics.distribution(DLPTokenizationDoFn.class, "numberOfBytesTokenizedDistro");
public DLPTokenizationDoFn(
ValueProvider<String> dlpProjectId,
ValueProvider<String> deIdentifyTemplateName,
ValueProvider<String> inspectTemplateName) {
this.dlpProjectId = dlpProjectId;
this.dlpServiceClient = null;
this.deIdentifyTemplateName = deIdentifyTemplateName;
this.inspectTemplateName = inspectTemplateName;
this.inspectTemplateExist = false;
}
@Setup
public void setup() {
if (this.inspectTemplateName.isAccessible()) {
if (this.inspectTemplateName.get() != null) {
this.inspectTemplateExist = true;
}
}
if (this.deIdentifyTemplateName.isAccessible()) {
if (this.deIdentifyTemplateName.get() != null) {
this.requestBuilder =
DeidentifyContentRequest.newBuilder()
.setParent(ProjectName.of(this.dlpProjectId.get()).toString())
.setDeidentifyTemplateName(this.deIdentifyTemplateName.get());
if (this.inspectTemplateExist) {
this.requestBuilder.setInspectTemplateName(this.inspectTemplateName.get());
}
}
}
}
@StartBundle
public void startBundle() throws SQLException {
try {
this.dlpServiceClient = DlpServiceClient.create();
} catch (IOException e) {
LOG.error("Failed to create DLP Service Client", e.getMessage());
throw new RuntimeException(e);
}
}
@FinishBundle
public void finishBundle() throws Exception {
if (this.dlpServiceClient != null) {
this.dlpServiceClient.close();
}
}
@ProcessElement
public void processElement(ProcessContext c) {
String key = c.element().getKey();
Table nonEncryptedData = c.element().getValue();
ContentItem tableItem = ContentItem.newBuilder().setTable(nonEncryptedData).build();
this.requestBuilder.setItem(tableItem);
DeidentifyContentResponse response =
dlpServiceClient.deidentifyContent(this.requestBuilder.build());
Table tokenizedData = response.getItem().getTable();
numberOfRowsTokenized.update(tokenizedData.getRowsList().size());
numberOfBytesTokenized.update(tokenizedData.toByteArray().length);
c.output(KV.of(key, tokenizedData));
}
}
/**
* The {@link TableRowProcessorDoFn} class process tokenized DLP tables and convert them to
* BigQuery Table Row.
*/
public static class TableRowProcessorDoFn extends DoFn<KV<String, Table>, KV<String, TableRow>> {
@ProcessElement
public void processElement(ProcessContext c) {
Table tokenizedData = c.element().getValue();
List<String> headers =
tokenizedData.getHeadersList().stream()
.map(fid -> fid.getName())
.collect(Collectors.toList());
List<Table.Row> outputRows = tokenizedData.getRowsList();
if (outputRows.size() > 0) {
for (Table.Row outputRow : outputRows) {
if (outputRow.getValuesCount() != headers.size()) {
throw new IllegalArgumentException(
"CSV file's header count must exactly match with data element count");
}
c.output(
KV.of(
c.element().getKey(),
createBqRow(outputRow, headers.toArray(new String[headers.size()]))));
}
}
}
private static TableRow createBqRow(Table.Row tokenizedValue, String[] headers) {
TableRow bqRow = new TableRow();
AtomicInteger headerIndex = new AtomicInteger(0);
List<TableCell> cells = new ArrayList<>();
tokenizedValue
.getValuesList()
.forEach(
value -> {
String checkedHeaderName =
checkHeaderName(headers[headerIndex.getAndIncrement()].toString());
bqRow.set(checkedHeaderName, value.getStringValue());
cells.add(new TableCell().set(checkedHeaderName, value.getStringValue()));
});
bqRow.setF(cells);
return bqRow;
}
}
/**
* The {@link BQDestination} class creates BigQuery table destination and table schema based on
* the CSV file processed in earlier transformations. Table id is same as filename Table schema is
* same as file header columns.
*/
public static class BQDestination
extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
private ValueProvider<String> datasetName;
private ValueProvider<String> projectId;
public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
this.datasetName = datasetName;
this.projectId = projectId;
}
@Override
public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
String key = element.getValue().getKey();
String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
LOG.debug("Table Name {}", tableName);
return KV.of(tableName, element.getValue().getValue());
}
@Override
public TableDestination getTable(KV<String, TableRow> destination) {
TableDestination dest =
new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
LOG.debug("Table Destination {}", dest.getTableSpec());
return dest;
}
@Override
public TableSchema getSchema(KV<String, TableRow> destination) {
TableRow bqRow = destination.getValue();
TableSchema schema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
List<TableCell> cells = bqRow.getF();
for (int i = 0; i < cells.size(); i++) {
Map<String, Object> object = cells.get(i);
String header = object.keySet().iterator().next();
/** currently all BQ data types are set to String */
fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
}
schema.setFields(fields);
return schema;
}
}
private static String getFileName(ReadableFile file) {
String csvFileName = file.getMetadata().resourceId().getFilename().toString();
/** taking out .csv extension from file name e.g fileName.csv->fileName */
String[] fileKey = csvFileName.split("\\.", 2);
if (!fileKey[1].equals(ALLOWED_FILE_EXTENSION) || !TABLE_REGEXP.matcher(fileKey[0]).matches()) {
throw new RuntimeException(
"[Filename must contain a CSV extension "
+ " BQ table name must contain only letters, numbers, or underscores ["
+ fileKey[1]
+ "], ["
+ fileKey[0]
+ "]");
}
/** returning file name without extension */
return fileKey[0];
}
private static BufferedReader getReader(ReadableFile csvFile) {
BufferedReader br = null;
ReadableByteChannel channel = null;
/** read the file and create buffered reader */
try {
channel = csvFile.openSeekable();
} catch (IOException e) {
LOG.error("Failed to Read File {}", e.getMessage());
throw new RuntimeException(e);
}
if (channel != null) {
br = new BufferedReader(Channels.newReader(channel, Charsets.UTF_8.name()));
}
return br;
}
private static String checkHeaderName(String name) {
/** some checks to make sure BQ column names don't fail e.g. special characters */
String checkedHeader = name.replaceAll("\\s", "_");
checkedHeader = checkedHeader.replaceAll("'", "");
checkedHeader = checkedHeader.replaceAll("/", "");
if (!COLUMN_NAME_REGEXP.matcher(checkedHeader).matches()) {
throw new IllegalArgumentException("Column name can't be matched to a valid format " + name);
}
return checkedHeader;
}
}
Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)
The Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub
template is a streaming pipeline that reads Pub/Sub messages with change data from
a MySQL database and writes the records to BigQuery. A Debezium connector captures
changes to the MySQL database and publishes the changed data to Pub/Sub. The
template then reads the Pub/Sub messages and writes them to BigQuery.
You can use this template to sync MySQL databases and BigQuery tables. The
pipeline writes the changed data to a BigQuery staging table and intermittently
updates a BigQuery table replicating the MySQL database.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
SUBSCRIPTIONS: your comma-separated list of Pub/Sub subscription names
CHANGELOG_DATASET: your BigQuery dataset for changelog data
REPLICA_DATASET: your BigQuery dataset for replica tables
Apache Kafka to BigQuery
The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery.
Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. If the errors table does not exist prior to execution, then it is created.
Requirements for this pipeline
The output BigQuery table must exist.
The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format.
Template parameters
Parameter
Description
outputTableSpec
The BigQuery output table location to write the Apache Kafka messages to, in the format of my-project:dataset.table
inputTopics
The Apache Kafka input topics to read from in a comma-separated list. For example: messages
bootstrapServers
The host address of the running Apache Kafka broker servers in a comma-separated list, each host address in the format of 35.70.252.199:9092
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
outputDeadletterTable
(Optional) The BigQuery table for messages that failed to reach the output table, in the format of my-project:dataset.my-deadletter-table. If it doesn't exist, the table is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
BIGQUERY_TABLE: your BigQuery table name
KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092.
If multiple addresses are provided, follow instructions on how to escape commas.
API
To run the template using the REST API, send an HTTP PO