Stay organized with collections
Save and categorize content based on your preferences.
Google provides a set of open-source
Dataflow templates.
These Dataflow templates can help you solve large data tasks, including data
import, data export, data backup, data restore and bulk API operations—all without the use of a
dedicated development environment. The templates are built on Apache Beam and use Dataflow to
transform the data.
The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads
JSON-formatted messages from a Pub/Sub subscription and writes them to a
BigQuery table. You can use the template as a quick solution to move
Pub/Sub data to BigQuery. The template reads JSON-formatted messages
from Pub/Sub and converts them to BigQuery elements.
Requirements for this pipeline:
The data field
of Pub/Sub messages must use the JSON format, described in this
JSON guide.
For example, messages with values in the data field formatted as {"k1":"v1", "k2":"v2"} can be inserted into a BigQuery
table with two columns, named k1 and k2, with a string data type.
The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.
Template parameters
Parameter
Description
inputSubscription
The Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription>.
outputTableSpec
The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>
outputDeadletterTable
The BigQuery table for messages that failed to reach the output table, in the format of <my-project>:<my-dataset>.<my-table>.
If it doesn't exist, it is created during pipeline execution.
If not specified, OUTPUT_TABLE_SPEC_error_records is used instead.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
Running the Pub/Sub Subscription to BigQuery template
Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is us-central1.
For a list of regions where you can run a Dataflow job, see
Dataflow locations.
From the Dataflow template drop-down menu, select
the Pub/Sub Subscription to BigQuery template.
In the provided parameter fields, enter your parameter values.
Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
--region REGION_NAME \
--staging-location TEMP_LOCATION \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Replace the following:
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
SUBSCRIPTION_NAME: your Pub/Sub subscription name
DATASET: your BigQuery dataset
TABLE_NAME: your BigQuery table name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
@Template(
name = "PubSub_Subscription_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub subscription, transforms"
+ " them using a JavaScript user-defined function (UDF), and writes them to a"
+ " pre-existing BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputTopic",
contactInformation = "https://cloud.google.com/support")
@Template(
name = "PubSub_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Topic to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them"
+ " using a JavaScript user-defined function (UDF), and writes them to a pre-existing"
+ " BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The table’s schema must match the "
+ "input JSON objects.")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Input Pub/Sub topic",
helpText = "The Pub/Sub topic to read the input from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateParameter.PubsubSubscription(
order = 3,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
@TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
@Description(
"This determines whether the template reads from a Pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.BigQueryTable(
order = 5,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. It should be in the format"
+ " of \"your-project-id:your-dataset.your-table-name\". If it doesn't exist, it"
+ " will be created during pipeline execution. If not specified,"
+ " \"{outputTableSpec}_error_records\" is used instead.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub Topic to BigQuery
The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads
JSON-formatted messages from a Pub/Sub topic and writes them to a
BigQuery table. You can use the template as a quick solution to move
Pub/Sub data to BigQuery. The template reads JSON-formatted messages
from Pub/Sub and converts them to BigQuery elements.
Requirements for this pipeline:
The data field
of Pub/Sub messages must use the JSON format, described in this
JSON guide.
For example, messages with values in the data field formatted as {"k1":"v1", "k2":"v2"} can be inserted into a BigQuery
table with two columns, named k1 and k2, with a string data type.
The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.
Template parameters
Parameter
Description
inputTopic
The Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic>.
outputTableSpec
The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>
outputDeadletterTable
The BigQuery table for messages that failed to reach the output table. It should be in <my-project>:<my-dataset>.<my-table> format.
If it doesn't exist, it is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is us-central1.
For a list of regions where you can run a Dataflow job, see
Dataflow locations.
From the Dataflow template drop-down menu, select
the Pub/Sub Topic to BigQuery template.
In the provided parameter fields, enter your parameter values.
Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
--region REGION_NAME \
--staging-location TEMP_LOCATION \
--parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Replace the following:
JOB_NAME:
a unique job name of your choice
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
DATASET: your BigQuery dataset
TABLE_NAME: your BigQuery table name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
@Template(
name = "PubSub_Subscription_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub subscription, transforms"
+ " them using a JavaScript user-defined function (UDF), and writes them to a"
+ " pre-existing BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputTopic",
contactInformation = "https://cloud.google.com/support")
@Template(
name = "PubSub_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Topic to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them"
+ " using a JavaScript user-defined function (UDF), and writes them to a pre-existing"
+ " BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The table’s schema must match the "
+ "input JSON objects.")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Input Pub/Sub topic",
helpText = "The Pub/Sub topic to read the input from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateParameter.PubsubSubscription(
order = 3,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
@TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
@Description(
"This determines whether the template reads from a Pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.BigQueryTable(
order = 5,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. It should be in the format"
+ " of \"your-project-id:your-dataset.your-table-name\". If it doesn't exist, it"
+ " will be created during pipeline execution. If not specified,"
+ " \"{outputTableSpec}_error_records\" is used instead.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub Avro to BigQuery
The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro
data from a Pub/Sub subscription into a BigQuery table.
Any errors which occur while writing to the BigQuery table are streamed into a
Pub/Sub unprocessed topic.
Requirements for this pipeline
The input Pub/Sub subscription must exist.
The schema file for the Avro records must exist on Cloud Storage.
The unprocessed Pub/Sub topic must exist.
The output BigQuery dataset must exist.
Template parameters
Parameter
Description
schemaPath
The Cloud Storage location of the Avro schema file. For example, gs://path/to/my/schema.avsc.
inputSubscription
The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic
The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec
The BigQuery output table location. For example, <my-project>:<my-dataset>.<my-table>.
Depending on the createDisposition specified, the output table may be created
automatically using the user provided Avro schema.
writeDisposition
(Optional) The BigQuery WriteDisposition.
For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND
createDisposition
(Optional) The BigQuery CreateDisposition.
For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubAvroToBigQuery.PubsubAvroToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.Row;
/**
* A Dataflow pipeline to stream <a href="https://avro.apache.org/">Apache Avro</a> records from
* Pub/Sub into a BigQuery table.
*
* <p>Any persistent failures while writing to BigQuery will be written to a Pub/Sub dead-letter
* topic.
*/
@Template(
name = "PubSub_Avro_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Avro to BigQuery",
description =
"A streaming pipeline which inserts Avro records from a Pub/Sub subscription into a"
+ " BigQuery table.",
optionsClass = PubsubAvroToBigQueryOptions.class,
flexContainerName = "pubsub-avro-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public final class PubsubAvroToBigQuery {
/**
* Validates input flags and executes the Dataflow pipeline.
*
* @param args command line arguments to the pipeline
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
PubsubAvroToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubAvroToBigQueryOptions.class);
run(options);
}
/**
* Provides custom {@link org.apache.beam.sdk.options.PipelineOptions} required to execute the
* {@link PubsubAvroToBigQuery} pipeline.
*/
public interface PubsubAvroToBigQueryOptions
extends ReadSubscriptionOptions,
WriteOptions,
WriteTopicOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage path to the Avro schema file",
helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.")
@Required
String getSchemaPath();
void setSchemaPath(String schemaPath);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options execution parameters to the pipeline
* @return result of the pipeline execution as a {@link PipelineResult}
*/
private static PipelineResult run(PubsubAvroToBigQueryOptions options) {
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
Schema schema = SchemaUtils.getAvroSchema(options.getSchemaPath());
WriteResult writeResults =
pipeline
.apply(
"Read Avro records",
PubsubIO.readAvroGenericRecords(schema)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic()))
// Workaround for BEAM-12256. Eagerly convert to rows to avoid
// the RowToGenericRecord function that doesn't handle all data
// types.
// TODO: Remove this workaround when a fix for BEAM-12256 is
// released.
.apply(Convert.toRows())
.apply(
"Write to BigQuery",
BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResults, options)
.apply(
"Create error payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<GenericRecord>newBuilder()
.setPayloadCoder(AvroCoder.of(schema))
.setTranslateFunction(BigQueryConverters.TableRowToGenericRecordFn.of(schema))
.build())
.apply("Write failed records", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}
Pub/Sub Proto to BigQuery
The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto
data from a Pub/Sub subscription into a BigQuery table.
Any errors that occur while writing to the BigQuery table are streamed into a
Pub/Sub unprocessed topic.
A JavaScript user-defined function (UDF) can be provided to transform data. Errors while executing
the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as
the BigQuery errors.
Requirements for this pipeline:
The input Pub/Sub subscription must exist.
The schema file for the Proto records must exist on Cloud Storage.
The output Pub/Sub topic must exist.
The output BigQuery dataset must exist.
If the BigQuery table exists, it must have a schema matching the proto data regardless of the createDisposition value.
Template parameters
Parameter
Description
protoSchemaPath
The Cloud Storage location of the self-contained proto schema file. For example, gs://path/to/my/file.pb.
This file can be generated with the --descriptor_set_out flag of the protoc command.
The --include_imports flag guarantees that the file is self-contained.
fullMessageName
The full proto message name. For example, package.name.MessageName, where package.name is the value
provided for the package statement and not the java_package statement.
inputSubscription
The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic
The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec
The BigQuery output table location. For example, my-project:my_dataset.my_table.
Depending on the createDisposition specified, the output table might be created
automatically using the input schema file.
preserveProtoFieldNames
(Optional) true to preserve the original Proto field name in JSON. false to use more standard JSON names.
For example, false would change field_name to fieldName. (Default: false)
bigQueryTableSchemaPath
(Optional) Cloud Storage path to BigQuery schema path. For example, gs://path/to/my/schema.json. If this is
not provided, then the schema is inferred from the Proto schema.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
udfOutputTopic
(Optional) The Pub/Sub topic storing the UDF errors. For example,
projects/<project-id>/topics/<topic-name>. If this is not provided, UDF errors
are sent to the same topic as outputTopic.
writeDisposition
(Optional) The BigQuery WriteDisposition.
For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND.
createDisposition
(Optional) The BigQuery CreateDisposition.
For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED.
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Proto schema file (for example, gs://MyBucket/file.pb)
PROTO_MESSAGE_NAME: the Proto message name (for example, package.name.MessageName)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
UNPROCESSED_TOPIC: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SCHEMA_PATH: the Cloud Storage path to the Proto schema file (for example, gs://MyBucket/file.pb)
PROTO_MESSAGE_NAME: the Proto message name (for example, package.name.MessageName)
SUBSCRIPTION_NAME: the Pub/Sub input subscription name
BIGQUERY_TABLE: the BigQuery output table name
UNPROCESSED_TOPIC: the Pub/Sub topic to use for the unprocessed queue
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.ArrayUtils;
/**
* A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
* records from Pub/Sub to BigQuery.
*
* <p>Persistent failures are written to a Pub/Sub unprocessed topic.
*/
@Template(
name = "PubSub_Proto_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Proto to BigQuery",
description =
"A streaming pipeline that reads Protobuf messages from a Pub/Sub subscription and writes"
+ " them to a BigQuery table.",
optionsClass = PubSubProtoToBigQueryOptions.class,
flexContainerName = "pubsub-proto-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public final class PubsubProtoToBigQuery {
private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
public static void main(String[] args) {
UncaughtExceptionLogger.register();
run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
}
/** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
public interface PubSubProtoToBigQueryOptions
extends ReadSubscriptionOptions,
WriteOptions,
WriteTopicOptions,
JavascriptTextTransformerOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage Path to the Proto Schema File",
helpText =
"Cloud Storage path to a self-contained descriptor set file. Example:"
+ " gs://MyBucket/schema.pb. `schema.pb` can be generated by adding"
+ " `--descriptor_set_out=schema.pb` to the `protoc` command that compiles the"
+ " protos. The `--include_imports` flag can be used to guarantee that the file is"
+ " self-contained.")
@Required
String getProtoSchemaPath();
void setProtoSchemaPath(String value);
@TemplateParameter.Text(
order = 2,
regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
description = "Full Proto Message Name",
helpText =
"The full message name (example: package.name.MessageName). If the message is nested"
+ " inside of another message, then include all messages with the '.' delimiter"
+ " (example: package.name.OuterMessage.InnerMessage). 'package.name' should be"
+ " from the `package` statement, not the `java_package` statement.")
@Required
String getFullMessageName();
void setFullMessageName(String value);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Preserve Proto Field Names",
helpText =
"Flag to control whether proto field names should be kept or converted to"
+ " lowerCamelCase. If the table already exists, this should be based on what"
+ " matches the table's schema. Otherwise, it will determine the column names of"
+ " the created table. True to preserve proto snake_case. False will convert fields"
+ " to lowerCamelCase. (Default: false)")
@Default.Boolean(false)
Boolean getPreserveProtoFieldNames();
void setPreserveProtoFieldNames(Boolean value);
@TemplateParameter.GcsReadFile(
order = 4,
optional = true,
description = "BigQuery Table Schema Path",
helpText =
"Cloud Storage path to the BigQuery schema JSON file. "
+ "If this is not set, then the schema is inferred "
+ "from the Proto schema.",
example = "gs://MyBucket/bq_schema.json")
String getBigQueryTableSchemaPath();
void setBigQueryTableSchemaPath(String value);
@TemplateParameter.PubsubTopic(
order = 5,
optional = true,
description = "Pub/Sub output topic for UDF failures",
helpText =
"An optional output topic to send UDF failures to. If this option is not set, then"
+ " failures will be written to the same topic as the BigQuery failures.",
example = "projects/your-project-id/topics/your-topic-name")
String getUdfOutputTopic();
void setUdfOutputTopic(String udfOutputTopic);
}
/** Runs the pipeline and returns the results. */
private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
Pipeline pipeline = Pipeline.create(options);
Descriptor descriptor = getDescriptor(options);
PCollection<String> maybeForUdf =
pipeline
.apply("Read From Pubsub", readPubsubMessages(options, descriptor))
.apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
WriteResult writeResult =
runUdf(maybeForUdf, options)
.apply("Write to BigQuery", writeToBigQuery(options, descriptor));
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"Create Error Payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
.setPayloadCoder(StringUtf8Coder.of())
.setTranslateFunction(BigQueryConverters::tableRowToJson)
.build())
.apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
return pipeline.run();
}
/** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
@VisibleForTesting
static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
String schemaPath = options.getProtoSchemaPath();
String messageName = options.getFullMessageName();
Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
if (descriptor == null) {
throw new IllegalArgumentException(
messageName + " is not a recognized message in " + schemaPath);
}
return descriptor;
}
/** Returns the {@link PTransform} for reading Pub/Sub messages. */
private static Read<DynamicMessage> readPubsubMessages(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
return PubsubIO.readProtoDynamicMessages(descriptor)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic());
}
/**
* Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
*
* <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
* specified in {@code options}.
*/
@VisibleForTesting
static Write<String> writeToBigQuery(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
Write<String> write =
BigQueryConverters.<String>createWriteTransform(options)
.withFormatFunction(BigQueryConverters::convertJsonToTableRow);
String schemaPath = options.getBigQueryTableSchemaPath();
if (Strings.isNullOrEmpty(schemaPath)) {
return write.withSchema(
SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
} else {
return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
}
}
/** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
private static class ConvertDynamicProtoMessageToJson
extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
private final boolean preserveProtoName;
private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
this.preserveProtoName = options.getPreserveProtoFieldNames();
}
@Override
public PCollection<String> expand(PCollection<DynamicMessage> input) {
return input.apply(
"Map to JSON",
MapElements.into(TypeDescriptors.strings())
.via(
message -> {
try {
JsonFormat.Printer printer = JsonFormat.printer();
return preserveProtoName
? printer.preservingProtoFieldNames().print(message)
: printer.print(message);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}));
}
}
/**
* Handles running the UDF.
*
* <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
*
* <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
* topic.
*
* @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
* @param options the options containing info on running the UDF
* @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
* called
*/
@VisibleForTesting
static PCollection<String> runUdf(
PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
// In order to avoid generating a graph that makes it look like a UDF was called when none was
// intended, simply return the input as "success" output.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) {
return jsonCollection;
}
// For testing purposes, we need to do this check before creating the PTransform rather than
// in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
// a value.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
throw new IllegalArgumentException(
"JavaScript function name cannot be null or empty if file is set");
}
PCollectionTuple maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
maybeSuccess
.get(UDF_FAILURE_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Failures",
ConvertFailsafeElementToPubsubMessage.<String, String>builder()
.setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
.setErrorMessageAttributeKey("udfErrorMessage")
.build())
.apply("Write Failed UDF", writeUdfFailures(options));
return maybeSuccess
.get(UDF_SUCCESS_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Output",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.setCoder(NullableCoder.of(StringUtf8Coder.of()));
}
/** {@link PTransform} that calls a UDF and returns both success and failure output. */
private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
private final PubSubProtoToBigQueryOptions options;
RunUdf(PubSubProtoToBigQueryOptions options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<String> input) {
return input
.apply("Prepare Failsafe UDF", makeFailsafe())
.setCoder(FAILSAFE_CODER)
.apply(
"Call UDF",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_SUCCESS_TAG)
.setFailureTag(UDF_FAILURE_TAG)
.build());
}
private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
.via((String json) -> FailsafeElement.of(json, json));
}
}
/**
* Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
* topic.
*/
private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
PubSubProtoToBigQueryOptions options) {
PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
return Strings.isNullOrEmpty(options.getUdfOutputTopic())
? write.to(options.getOutputTopic())
: write.to(options.getUdfOutputTopic());
}
}
Pub/Sub to Pub/Sub
The Pub/Sub to Pub/Sub template is a streaming pipeline that reads
messages from a Pub/Sub subscription and writes the messages to another Pub/Sub
topic. The pipeline also accepts an optional message attribute key and a value that can be used
to filter the messages that should be written to the Pub/Sub topic. You can use this
template to copy messages from a Pub/Sub subscription to another Pub/Sub
topic with an optional message filter.
Requirements for this pipeline:
The source Pub/Sub subscription must exist prior to execution.
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
SUBSCRIPTION_NAME: the Pub/Sub subscription
name
TOPIC_NAME: the Pub/Sub topic name
FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
FILTER_VALUE: filter attribute value to use if an event filter key is provided.
Accepts a valid Java Regex string as an event filter value. In case a regex is provided,
the complete expression should match in order for the message to be filtered. Partial matches
(such as substring) are not filtered. A null event filter value is used by default.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
SUBSCRIPTION_NAME: the Pub/Sub subscription
name
TOPIC_NAME: the Pub/Sub topic name
FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
FILTER_VALUE: filter attribute value to use if an event filter key is provided.
Accepts a valid Java Regex string as an event filter value. In case a regex is provided,
the complete expression should match in order for the message to be filtered. Partial matches
(such as substring) are not filtered. A null event filter value is used by default.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubsubToPubsub.Options;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A template that copies messages from one Pubsub subscription to another Pubsub topic. */
@Template(
name = "Cloud_PubSub_to_Cloud_PubSub",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Pub/Sub",
description =
"Streaming pipeline. Reads from a Pub/Sub subscription and writes to a Pub/Sub topic. ",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class PubsubToPubsub {
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/**
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription. 2) Apply any
* filters if an attribute=value pair is provided. 3) Write each PubSubMessage to output PubSub
* topic.
*/
pipeline
.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(
"Filter Events If Enabled",
ParDo.of(
ExtractAndFilterEventsFn.newBuilder()
.withFilterKey(options.getFilterKey())
.withFilterValue(options.getFilterValue())
.build()))
.apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Options supported by {@link PubsubToPubsub}.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
@Validation.Required
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> inputSubscription);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Output Pub/Sub topic",
helpText =
"The name of the topic to which data should published, in the format of 'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Event filter key",
helpText =
"Attribute key by which events are filtered. No filters are applied if no key is specified.")
ValueProvider<String> getFilterKey();
void setFilterKey(ValueProvider<String> filterKey);
@TemplateParameter.Text(
order = 4,
optional = true,
description = "Event filter value",
helpText =
"Filter attribute value to use if an event filter key is provided. Accepts a valid "
+ "Java Regex string as an event filter value. In case a regex is provided, the complete "
+ "expression should match in order for the message to be filtered. Partial matches (e.g. "
+ "substring) will not be filtered. A null event filter value is used by default.")
ValueProvider<String> getFilterValue();
void setFilterValue(ValueProvider<String> filterValue);
}
/**
* DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
* publish events that pass the filter else, it will publish all input events.
*/
@AutoValue
public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);
// Counter tracking the number of incoming Pub/Sub messages.
private static final Counter INPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");
// Counter tracking the number of output Pub/Sub messages after the user provided filter
// is applied.
private static final Counter OUTPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");
private Boolean doFilter;
private String inputFilterKey;
private Pattern inputFilterValueRegex;
private Boolean isNullFilterValue;
public static Builder newBuilder() {
return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
}
@Nullable
abstract ValueProvider<String> filterKey();
@Nullable
abstract ValueProvider<String> filterValue();
@Setup
public void setup() {
if (this.doFilter != null) {
return; // Filter has been evaluated already
}
inputFilterKey = (filterKey() == null ? null : filterKey().get());
if (inputFilterKey == null) {
// Disable input message filtering.
this.doFilter = false;
} else {
this.doFilter = true; // Enable filtering.
String inputFilterValue = (filterValue() == null ? null : filterValue().get());
if (inputFilterValue == null) {
LOG.warn(
"User provided a NULL for filterValue. Only messages with a value of NULL for the"
+ " filterKey: {} will be filtered forward",
inputFilterKey);
// For backward compatibility, we are allowing filtering by null filterValue.
this.isNullFilterValue = true;
this.inputFilterValueRegex = null;
} else {
this.isNullFilterValue = false;
try {
inputFilterValueRegex = getFilterPattern(inputFilterValue);
} catch (PatternSyntaxException e) {
LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
throw new RuntimeException(e);
}
}
LOG.info(
"Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
}
}
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_COUNTER.inc();
if (!this.doFilter) {
// Filter is not enabled
writeOutput(context, context.element());
} else {
PubsubMessage message = context.element();
String extractedValue = message.getAttribute(this.inputFilterKey);
if (this.isNullFilterValue) {
if (extractedValue == null) {
// If we are filtering for null and the extracted value is null, we forward
// the message.
writeOutput(context, message);
}
} else {
if (extractedValue != null
&& this.inputFilterValueRegex.matcher(extractedValue).matches()) {
// If the extracted value is not null and it matches the filter,
// we forward the message.
writeOutput(context, message);
}
}
}
}
/**
* Write a {@link PubsubMessage} and increment the output counter.
*
* @param context {@link ProcessContext} to write {@link PubsubMessage} to.
* @param message {@link PubsubMessage} output.
*/
private void writeOutput(ProcessContext context, PubsubMessage message) {
OUTPUT_COUNTER.inc();
context.output(message);
}
/**
* Return a {@link Pattern} based on a user provided regex string.
*
* @param regex Regex string to compile.
* @return {@link Pattern}
* @throws PatternSyntaxException If the string is an invalid regex.
*/
private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
checkNotNull(regex, "Filter regex cannot be null.");
return Pattern.compile(regex);
}
/** Builder class for {@link ExtractAndFilterEventsFn}. */
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilterKey(ValueProvider<String> filterKey);
abstract Builder setFilterValue(ValueProvider<String> filterValue);
abstract ExtractAndFilterEventsFn build();
/**
* Method to set the filterKey used for filtering messages.
*
* @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterKey(ValueProvider<String> filterKey) {
checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
return setFilterKey(filterKey);
}
/**
* Method to set the filterValue used for filtering messages.
*
* @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterValue(ValueProvider<String> filterValue) {
checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
return setFilterValue(filterValue);
}
}
}
}
Pub/Sub to Splunk
The Pub/Sub to Splunk template is a streaming pipeline that reads
messages from a Pub/Sub subscription and writes the message payload to Splunk via
Splunk's HTTP Event Collector (HEC). The most common use case of this template is to export logs
to Splunk. To see an example of the underlying workflow, see
Deploying production-ready log exports to Splunk using Dataflow.
Before writing to Splunk, you can also apply a JavaScript user-defined function to the message
payload. Any messages that experience processing failures are forwarded to a
Pub/Sub unprocessed topic for further troubleshooting and reprocessing.
As an extra layer of protection for your HEC token, you can also pass in a Cloud KMS key along with the base64-encoded HEC
token parameter encrypted with the Cloud KMS key.
See the Cloud KMS API encryption endpoint
for additional details on encrypting your HEC token parameter.
Requirements for this pipeline:
The source Pub/Sub subscription must exist prior to running the pipeline.
The Pub/Sub unprocessed topic must exist prior to running the pipeline.
The Splunk HEC endpoint must be accessible from the Dataflow workers' network.
The Splunk HEC token must be generated and available.
Template parameters
Parameter
Description
inputSubscription
The Pub/Sub subscription from which to read the input. For example, projects/<project-id>/subscriptions/<subscription-name>.
token
(Optional) The Splunk HEC authentication token. Must be provided if the tokenSource is set to PLAINTEXT or KMS.
url
The Splunk HEC url. This must be routable from the VPC in which the pipeline runs. For example, https://splunk-hec-host:8088.
outputDeadletterTopic
The Pub/Sub topic to forward undeliverable messages. For example, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
batchCount
(Optional) The batch size for sending multiple events to Splunk. Default 1 (no batching).
parallelism
(Optional) The maximum number of parallel requests. Default 1 (no parallelism).
disableCertificateValidation
(Optional) Disable SSL certificate validation. Default false (validation enabled). If true, the certificates are not validated (all certificates are trusted) and `rootCaCertificatePath` parameter is ignored.
includePubsubMessage
(Optional) Include the full Pub/Sub message in the payload. Default false
(only the data element is included in the payload).
tokenSource
Source of the token. One of PLAINTEXT, KMS or SECRET_MANAGER. This parameter must be provided if Secret Manager is used.
If tokenSource is set to KMS, tokenKMSEncryptionKey and encrypted tokenmust be provided.
If tokenSource is set to SECRET_MANAGER, tokenSecretIdmust be provided.
If tokenSource is set to PLAINTEXT, tokenmust be provided.
tokenKMSEncryptionKey
(Optional) The Cloud KMS key to decrypt the HEC token string. This parameter must be provided if the tokenSource is set to KMS.
If the Cloud KMS key is provided, the HEC token string must be passed in encrypted.
tokenSecretId
(Optional) The Secret Manager secret ID for the token. This parameter must provided if the tokenSource is set to SECRET_MANAGER.
Should be in the format projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath
(Optional) The full URL to root CA certificate in Cloud Storage. For example, gs://mybucket/mycerts/privateCA.crt. The certificate provided in Cloud Storage must be DER-encoded and may be supplied in binary or printable (Base64) encoding.
If the certificate is provided in Base64 encoding, it must be bounded at the beginning by -----BEGIN CERTIFICATE-----, and must be bounded at the end by -----END CERTIFICATE-----.
If this parameter is provided, this private CA certificate file is fetched and added to the Dataflow worker's trust store in order to verify Splunk HEC endpoint's SSL certificate.
If this parameter is not provided, the default trust store is used.
enableBatchLogs
(Optional) Specifies whether logs should be enabled for batches written to Splunk. Default: true.
enableGzipHttpCompression
(Optional) Specifies whether HTTP requests sent to Splunk HEC should be compressed (gzip content encoded). Default: true.
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
TOKEN: Splunk's Http Event Collector token
URL: the URL path for Splunk's Http Event Collector
(for example, https://splunk-hec-host:8088)
DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT: the batch size to use for sending multiple events to Splunk
PARALLELISM: the number of parallel requests to use for sending events to Splunk
DISABLE_VALIDATION: true if you want to disable SSL certificate validation
ROOT_CA_CERTIFICATE_PATH: the path to root CA certificate in Cloud Storage (for example, gs://your-bucket/privateCA.crt)
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
TOKEN: Splunk's Http Event Collector token
URL: the URL path for Splunk's Http Event Collector
(for example, https://splunk-hec-host:8088)
DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
JAVASCRIPT_FUNCTION:
the name of the JavaScript user-defined function (UDF) that you want to use
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
PATH_TO_JAVASCRIPT_UDF_FILE:
the Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT: the batch size to use for sending multiple events to Splunk
PARALLELISM: the number of parallel requests to use for sending events to Splunk
DISABLE_VALIDATION: true if you want to disable SSL certificate validation
ROOT_CA_CERTIFICATE_PATH: the path to root CA certificate in Cloud Storage (for example, gs://your-bucket/privateCA.crt)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.splunk.SplunkEvent;
import com.google.cloud.teleport.splunk.SplunkEventCoder;
import com.google.cloud.teleport.splunk.SplunkIO;
import com.google.cloud.teleport.splunk.SplunkWriteError;
import com.google.cloud.teleport.templates.PubSubToSplunk.PubSubToSplunkOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadSubscriptionOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubWriteDeadletterTopicOptions;
import com.google.cloud.teleport.templates.common.SplunkConverters;
import com.google.cloud.teleport.templates.common.SplunkConverters.SplunkOptions;
import com.google.cloud.teleport.util.TokenNestedValueProvider;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToSplunk} pipeline is a streaming pipeline which ingests data from Cloud
* Pub/Sub, executes a UDF, converts the output to {@link SplunkEvent}s and writes those records
* into Splunk's HEC endpoint. Any errors which occur in the execution of the UDF, conversion to
* {@link SplunkEvent} or writing to HEC will be streamed into a Pub/Sub topic.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The source Pub/Sub subscription exists.
* <li>HEC end-point is routable from the VPC where the Dataflow job executes.
* <li>Deadletter topic exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToSplunk \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --runner=${RUNNER}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-splunk-$USER-`date +"%Y%m%d-%H%M%S%z"`
* BATCH_COUNT=1
* PARALLELISM=5
*
* # Execute the templated pipeline:
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* token=my-splunk-hec-token,\
* url=http://splunk-hec-server-address:8088,\
* batchCount=${BATCH_COUNT},\
* parallelism=${PARALLELISM},\
* disableCertificateValidation=false,\
* outputDeadletterTopic=projects/${PROJECT_ID}/topics/deadletter-topic-name,\
* javascriptTextTransformGcsPath=gs://${BUCKET_NAME}/splunk/js/my-js-udf.js,\
* javascriptTextTransformFunctionName=myUdf"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_Splunk",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Splunk",
description =
"A pipeline that reads from a Pub/Sub subscription and writes to Splunk's HTTP Event Collector (HEC).",
optionsClass = PubSubToSplunkOptions.class,
optionsOrder = {
PubsubReadSubscriptionOptions.class,
SplunkOptions.class,
JavascriptTextTransformerOptions.class,
PubsubWriteDeadletterTopicOptions.class
},
contactInformation = "https://cloud.google.com/support")
public class PubSubToSplunk {
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** Counter to track inbound messages from source. */
private static final Counter INPUT_MESSAGES_COUNTER =
Metrics.counter(PubSubToSplunk.class, "inbound-pubsub-messages");
/** The tag for successful {@link SplunkEvent} conversion. */
private static final TupleTag<SplunkEvent> SPLUNK_EVENT_OUT = new TupleTag<SplunkEvent>() {};
/** The tag for failed {@link SplunkEvent} conversion. */
private static final TupleTag<FailsafeElement<String, String>> SPLUNK_EVENT_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the dead-letter output of the udf. */
private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** GSON to process a {@link PubsubMessage}. */
private static final Gson GSON = new Gson();
/** Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToSplunk.class);
private static final Boolean DEFAULT_INCLUDE_PUBSUB_MESSAGE = false;
@VisibleForTesting protected static final String PUBSUB_MESSAGE_ATTRIBUTE_FIELD = "attributes";
@VisibleForTesting protected static final String PUBSUB_MESSAGE_DATA_FIELD = "data";
private static final String PUBSUB_MESSAGE_ID_FIELD = "messageId";
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToSplunk#run(PubSubToSplunkOptions)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
PubSubToSplunkOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToSplunkOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(PubSubToSplunkOptions options) {
Pipeline pipeline = Pipeline.create(options);
// Register coders.
CoderRegistry registry = pipeline.getCoderRegistry();
registry.registerCoderForClass(SplunkEvent.class, SplunkEventCoder.of());
registry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Convert message to FailsafeElement for processing.
* 3) Apply user provided UDF (if any) on the input strings.
* 4) Convert successfully transformed messages into SplunkEvent objects
* 5) Write SplunkEvents to Splunk's HEC end point.
* 5a) Wrap write failures into a FailsafeElement.
* 6) Collect errors from UDF transform (#3), SplunkEvent transform (#4)
* and writing to Splunk HEC (#5) and stream into a Pub/Sub deadletter topic.
*/
// 1) Read messages in from Pub/Sub
PCollection<String> stringMessages =
pipeline.apply(
"ReadMessages",
new ReadMessages(options.getInputSubscription(), options.getIncludePubsubMessage()));
// 2) Convert message to FailsafeElement for processing.
PCollectionTuple transformedOutput =
stringMessages
.apply(
"ConvertToFailsafeElement",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(input -> FailsafeElement.of(input, input)))
// 3) Apply user provided UDF (if any) on the input strings.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setLoggingEnabled(ValueProvider.StaticValueProvider.of(true))
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// 4) Convert successfully transformed messages into SplunkEvent objects
PCollectionTuple convertToEventTuple =
transformedOutput
.get(UDF_OUT)
.apply(
"ConvertToSplunkEvent",
SplunkConverters.failsafeStringToSplunkEvent(
SPLUNK_EVENT_OUT, SPLUNK_EVENT_DEADLETTER_OUT));
// 5) Write SplunkEvents to Splunk's HEC end point.
PCollection<SplunkWriteError> writeErrors =
convertToEventTuple
.get(SPLUNK_EVENT_OUT)
.apply(
"WriteToSplunk",
SplunkIO.writeBuilder()
.withToken(
new TokenNestedValueProvider(
options.getTokenSecretId(),
options.getTokenKMSEncryptionKey(),
options.getToken(),
options.getTokenSource()))
.withUrl(options.getUrl())
.withBatchCount(options.getBatchCount())
.withParallelism(options.getParallelism())
.withDisableCertificateValidation(options.getDisableCertificateValidation())
.withRootCaCertificatePath(options.getRootCaCertificatePath())
.withEnableBatchLogs(options.getEnableBatchLogs())
.withEnableGzipHttpCompression(options.getEnableGzipHttpCompression())
.build());
// 5a) Wrap write failures into a FailsafeElement.
PCollection<FailsafeElement<String, String>> wrappedSplunkWriteErrors =
writeErrors.apply(
"WrapSplunkWriteErrors",
ParDo.of(
new DoFn<SplunkWriteError, FailsafeElement<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
SplunkWriteError error = context.element();
FailsafeElement<String, String> failsafeElement =
FailsafeElement.of(error.payload(), error.payload());
if (error.statusMessage() != null) {
failsafeElement.setErrorMessage(error.statusMessage());
}
if (error.statusCode() != null) {
failsafeElement.setErrorMessage(
String.format("Splunk write status code: %d", error.statusCode()));
}
context.output(failsafeElement);
}
}));
// 6) Collect errors from UDF transform (#4), SplunkEvent transform (#5)
// and writing to Splunk HEC (#6) and stream into a Pub/Sub deadletter topic.
PCollectionList.of(
ImmutableList.of(
convertToEventTuple.get(SPLUNK_EVENT_DEADLETTER_OUT),
wrappedSplunkWriteErrors,
transformedOutput.get(UDF_DEADLETTER_OUT)))
.apply("FlattenErrors", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
.setErrorRecordsTopic(options.getOutputDeadletterTopic())
.build());
return pipeline.run();
}
/**
* The {@link PubSubToSplunkOptions} class provides the custom options passed by the executor at
* the command line.
*/
public interface PubSubToSplunkOptions
extends SplunkOptions,
PubsubReadSubscriptionOptions,
PubsubWriteDeadletterTopicOptions,
JavascriptTextTransformerOptions {}
/**
* A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
* returns a {@link PCollection} of {@link String} messages.
*/
private static class ReadMessages extends PTransform<PBegin, PCollection<String>> {
private final ValueProvider<String> subscriptionName;
private final ValueProvider<Boolean> inputIncludePubsubMessageFlag;
private Boolean includePubsubMessage;
ReadMessages(
ValueProvider<String> subscriptionName,
ValueProvider<Boolean> inputIncludePubsubMessageFlag) {
this.subscriptionName = subscriptionName;
this.inputIncludePubsubMessageFlag = inputIncludePubsubMessageFlag;
}
@Override
public PCollection<String> expand(PBegin input) {
return input
.apply(
"ReadPubsubMessage",
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriptionName))
.apply(
"ExtractMessageIfRequired",
ParDo.of(
new DoFn<PubsubMessage, String>() {
@Setup
public void setup() {
if (inputIncludePubsubMessageFlag != null) {
includePubsubMessage = inputIncludePubsubMessageFlag.get();
}
includePubsubMessage =
MoreObjects.firstNonNull(
includePubsubMessage, DEFAULT_INCLUDE_PUBSUB_MESSAGE);
LOG.info("includePubsubMessage set to: {}", includePubsubMessage);
}
@ProcessElement
public void processElement(ProcessContext context) {
if (includePubsubMessage) {
context.output(formatPubsubMessage(context.element()));
} else {
context.output(
new String(context.element().getPayload(), StandardCharsets.UTF_8));
}
}
}))
.apply(
"CountMessages",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_MESSAGES_COUNTER.inc();
context.output(context.element());
}
}));
}
}
/**
* Utility method that formats {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage} according
* to the model defined in {@link com.google.pubsub.v1.PubsubMessage}.
*
* @param pubsubMessage {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage}
* @return JSON String that adheres to the model defined in {@link
* com.google.pubsub.v1.PubsubMessage}
*/
@VisibleForTesting
protected static String formatPubsubMessage(PubsubMessage pubsubMessage) {
JsonObject messageJson = new JsonObject();
String payload = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
try {
JsonObject data = GSON.fromJson(payload, JsonObject.class);
messageJson.add(PUBSUB_MESSAGE_DATA_FIELD, data);
} catch (JsonSyntaxException e) {
messageJson.addProperty(PUBSUB_MESSAGE_DATA_FIELD, payload);
}
JsonObject attributes = getAttributesJson(pubsubMessage.getAttributeMap());
messageJson.add(PUBSUB_MESSAGE_ATTRIBUTE_FIELD, attributes);
if (pubsubMessage.getMessageId() != null) {
messageJson.addProperty(PUBSUB_MESSAGE_ID_FIELD, pubsubMessage.getMessageId());
}
return messageJson.toString();
}
/**
* Constructs a {@link JsonObject} from a {@link Map} of Pub/Sub attributes.
*
* @param attributesMap {@link Map} of Pub/Sub attributes
* @return {@link JsonObject} of Pub/Sub attributes
*/
private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
JsonObject attributesJson = new JsonObject();
for (String key : attributesMap.keySet()) {
attributesJson.addProperty(key, attributesMap.get(key));
}
return attributesJson;
}
}
Pub/Sub to Avro Files on Cloud Storage
The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads
data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage
bucket.
Requirements for this pipeline:
The input Pub/Sub topic must exist prior to pipeline execution.
Template parameters
Parameter
Description
inputTopic
Pub/Sub topic to subscribe for message consumption. The topic name must be
in the format of projects/<project-id>/topics/<topic-name>.
outputDirectory
Output directory where output Avro files are archived. Must contain / at the end.
For example: gs://example-bucket/example-directory/.
avroTempDirectory
Directory for temporary Avro files. Must contain / at the end. For example:
gs://example-bucket/example-directory/.
outputFilenamePrefix
(Optional) Output filename prefix for the Avro files.
outputFilenameSuffix
(Optional) Output filename suffix for the Avro files.
outputShardTemplate
(Optional) The shard template of the output file. It is specified as repeating sequences of
the letters S or N. For example, SSS-NNN. These are
replaced with either the shard number or the total number of shards, respectively. When this
parameter is not specified, the default template format is W-P-SS-of-NN.
Running the Pub/Sub to Cloud Storage Avro template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files; for
example, gs://your-bucket/temp
TOPIC_NAME: the Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
FILENAME_PREFIX: the preferred output filename prefix
FILENAME_SUFFIX: the preferred output filename suffix
SHARD_TEMPLATE: the preferred output shard template
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files; for
example, gs://your-bucket/temp
TOPIC_NAME: the Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
FILENAME_PREFIX: the preferred output filename prefix
FILENAME_SUFFIX: the preferred output filename suffix
SHARD_TEMPLATE: the preferred output shard template
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToAvro.Options;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed Avro files at the specified output directory.
*
* <p>Files output will have the following schema:
*
* <pre>
* {
* "type": "record",
* "name": "AvroPubsubMessageRecord",
* "namespace": "com.google.cloud.teleport.avro",
* "fields": [
* {"name": "message", "type": {"type": "array", "items": "bytes"}},
* {"name": "attributes", "type": {"type": "map", "values": "string"}},
* {"name": "timestamp", "type": "long"}
* ]
* }
* </pre>
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToAvro
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-avro
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.avro"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_Avro",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Avro Files on Cloud Storage",
description =
"Streaming pipeline. Reads from a Pub/Sub subscription and outputs windowed Avro files to"
+ " the specified directory.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubsubToAvro {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.")
@Default.String("output")
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 7,
description = "Temporary Avro write directory",
helpText = "Directory for temporary Avro files.")
@Required
ValueProvider<String> getAvroTempDirectory();
void setAvroTempDirectory(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> messages = null;
/*
* Steps:
* 1) Read messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed data into Avro files, one per window by default.
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
messages
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
AvroIO.write(AvroPubsubMessageRecord.class)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input)))
/*.withTempDirectory(FileSystems.matchNewResource(
options.getAvroTempDirectory(),
Boolean.TRUE))
*/
.withWindowedWrites()
.withNumShards(options.getNumShards()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Converts an incoming {@link PubsubMessage} to the {@link AvroPubsubMessageRecord} class by
* copying it's fields and the timestamp of the message.
*/
static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
}
}
Pub/Sub Topic to Text Files on Cloud Storage
The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads
records from Pub/Sub topic and saves them as a series of Cloud Storage files in
text format. The template can be used as a quick way to save data in Pub/Sub for
future use. By default, the template generates a new file every 5 minutes.
Requirements for this pipeline:
The Pub/Sub topic must exist prior to execution.
The messages published to the topic must be in text format.
The messages published to the topic must not contain any newlines. Note that each
Pub/Sub message is saved as a single line in the output file.
Template parameters
Parameter
Description
inputTopic
The Pub/Sub topic to read the input from. The topic name should be in the
format projects/<project-id>/topics/<topic-name>.
outputDirectory
The path and filename prefix for writing output files. For example,
gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix
The prefix to place on each windowed file. For example, output-.
outputFilenameSuffix
The suffix to place on each windowed file, typically a file extension such as
.txt or .csv.
outputShardTemplate
The shard template defines the dynamic portion of each windowed file. By default, the
pipeline uses a single shard for output to the file system within each window. This means
that all data outputs into a single file per window. The outputShardTemplate
defaults to W-P-SS-of-NN where W is the window date range,
P is the pane info, S is the shard number, and N is
the number of shards. In case of a single file, the SS-of-NN portion of the
outputShardTemplate is 00-of-01.
Running the Pub/Sub Topic to Text Files on Cloud Storage template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
TOPIC_NAME: your Pub/Sub topic name
BUCKET_NAME: the name of your Cloud Storage
bucket
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToText.Options;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToText
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-text
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* windowDuration=5m,\
* numShards=1,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Text Files on Cloud Storage",
description =
"Streaming pipeline. Reads records from Pub/Sub and writes them to Cloud Storage, creating"
+ " a text file for each five minute window. Note that this pipeline assumes no"
+ " newlines in the body of the Pub/Sub message and thus each message becomes a single"
+ " line in the output file.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from a Pub/Sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
ValueProvider<String> getUserTempLocation();
void setUserTempLocation(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.")
@Default.String("output")
@Required
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static ValueProvider<String> maybeUseUserTempLocation(
ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
return DualInputNestedValueProvider.of(
userTempLocation,
outputLocation,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return (input.getX() != null) ? input.getX() : input.getY();
}
});
}
}
Pub/Sub Topic or Subscription to Text Files on Cloud Storage
The Pub/Sub Topic or Subscription to Cloud Storage Text template is a
streaming pipeline that reads records from Pub/Sub and saves them as a series of
Cloud Storage files in text format. The template can be used as a quick way to save data in
Pub/Sub for future use. By default, the template generates a new file every 5
minutes.
Requirements for this pipeline:
The Pub/Sub topic or subscription must exist prior to execution.
The messages published to the topic must be in text format.
The messages published to the topic must not contain any newlines. Note that each
Pub/Sub message is saved as a single line in the output file.
Template parameters
Parameter
Description
inputTopic
The Pub/Sub topic to read the input from. The topic name should be in the
format projects/<project-id>/topics/<topic-name>. If this parameter
is provided inputSubscription should not be provided.
inputSubscription
The Pub/Sub subscription to read the input from. The subscription name should
be in the format
projects/<project-id>/subscription/<subscription-name>. If this
parameter is provided inputTopic should not be provided.
outputDirectory
The path and filename prefix for writing output files. For example,
gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix
The prefix to place on each windowed file. For example, output-.
outputFilenameSuffix
The suffix to place on each windowed file, typically a file extension such as
.txt or .csv.
outputShardTemplate
The shard template defines the dynamic portion of each windowed file. By default, the
pipeline uses a single shard for output to the file system within each window. This means
that all data outputs into a single file per window. The outputShardTemplate
defaults to W-P-SS-of-NN where W is the window date range,
P is the pane info, S is the shard number, and N is
the number of shards. In case of a single file, the SS-of-NN portion of the
outputShardTemplate is 00-of-01.
windowDuration
(Optional) The window duration is the interval in which data is written to the output
directory. Configure the duration based on the pipeline's throughput. For example, a higher
throughput might require smaller window sizes so that the data fits into memory. Defaults to
5m, with a minimum of 1s. Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for
minutes, example: 12m), [int]h (for hours, example: 2h).
Running the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SUBSCRIPTION_NAME: your Pub/Sub subscription name
BUCKET_NAME: the name of your Cloud Storage bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
JOB_NAME:
a unique job name of your choice
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
SUBSCRIPTION_NAME: your Pub/Sub subscription name
BUCKET_NAME: the name of your Cloud Storage bucket
/*
* Copyright (C) 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.pubsubtotext;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* export PROJECT=<project id>
* export TEMPLATE_MODULE=googlecloud-to-googlecloud
* export TEMPLATE_NAME=pubsub-to-text
* export BUCKET_NAME=gs://<bucket name>
* export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${TEMPLATE_NAME}-image
* export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java11-template-launcher-base
* export BASE_CONTAINER_IMAGE_VERSION=latest
* export APP_ROOT=/template/${TEMPLATE_NAME}
* export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_NAME}-command-spec.json
* export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_NAME}-image-spec.json
*
* gcloud config set project ${PROJECT}
*
* # Build and push image to Google Container Repository
* mvn package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC} \
* -Djib.applicationCache=/tmp/jib-cache \
* -am -pl ${TEMPLATE_MODULE}
*
* # Create and upload image spec
* echo '{
* "image":"'${TARGET_GCR_IMAGE}'",
* "metadata":{
* "name":"Pub/Sub to text",
* "description":"Write Pub/Sub messages to GCS text files.",
* "parameters":[
* {
* "name":"inputSubscription",
* "label":"Pub/Sub subscription to read from",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"inputTopic",
* "label":"Pub/Sub topic to read from",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"outputDirectory",
* "label":"Directory to output files to",
* "paramType":"TEXT",
* "isOptional":false
* },
* {
* "name":"outputFilenamePrefix",
* "label":"The filename prefix of the files to write to",
* "paramType":"TEXT",
* "isOptional":false
* },
* {
* "name":"outputFilenameSuffix",
* "label":"The suffix of the files to write to",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"userTempLocation",
* "label":"The directory to output temporary files to",
* "paramType":"TEXT",
* "isOptional":true
* }
* ]
* },
* "sdk_info":{"language":"JAVA"}
* }' > image_spec.json
* gsutil cp image_spec.json ${TEMPLATE_IMAGE_SPEC}
* rm image_spec.json
*
* # Run template
* export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
* gcloud beta dataflow flex-template run ${JOB_NAME} \
* --project=${PROJECT} --region=us-central1 \
* --template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
* --parameters inputTopic=<topic>,outputDirectory=<directory>,outputFilenamePrefix=<prefix>
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
description =
"Streaming pipeline. Reads records from Pub/Sub Subscription or Topic and writes them to"
+ " Cloud Storage, creating a text file for each five minute window. Note that this"
+ " pipeline assumes no newlines in the body of the Pub/Sub message and thus each"
+ " message becomes a single line in the output file.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-text",
contactInformation = "https://cloud.google.com/support")
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubTopic(
order = 1,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 2,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
@Required
String getOutputDirectory();
void setOutputDirectory(String value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
String getUserTempLocation();
void setUserTempLocation(String value);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
@Default.String("output")
@Required
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.",
example = ".txt")
@Default.String("")
String getOutputFilenameSuffix();
void setOutputFilenameSuffix(String value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}
Pub/Sub to MongoDB
The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents.
If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). Any errors occurred due to schema mismatch,
malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. If a table for unprocessed records does
not exist prior to execution, the pipeline automatically creates this table.
Requirements for this pipeline:
The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.
The MongoDB cluster must exist and should be acccessible from the Dataflow worker machines.
Template parameters
Parameter
Description
inputSubscription
Name of the Pub/Sub subscription. For example: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri
Comma separated list of MongoDB servers. For example: 192.285.234.12:27017,192.287.123.11:27017
database
Database in MongoDB to store the collection. For example: my-db.
collection
Name of the collection inside MongoDB database. For example: my-collection.
deadletterTable
BigQuery table that store messages due to failures (mismatched schema, malformed json etc). For example: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
batchSize
(Optional) Batch size used for batch insertion of documents into MongoDB. Default: 1000.
batchSizeBytes
(Optional) Batch size in bytes. Default: 5242880.
maxConnectionIdleTime
(Optional) Maximum idle time allowed in seconds before connection time out occurs. Default: 60000.
sslEnabled
(Optional) Boolean value indicating whether connection to MongoDB is SSL enabled. Default: true.
ignoreSSLCertificate
(Optional) Boolean value indicating if SSL certifcate should be ignored. Default: true.
withOrdered
(Optional) Boolean value enabling ordered bulk insertions into MongoDB. Default: true.
withSSLInvalidHostNameAllowed
(Optional) Boolean value indicating if invalid host name is allowed for SSL connection. Default: true.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
REGION_NAME:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
JOB_NAME:
a unique job name of your choice
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
DATABASE: the name of the MongoDB database (for example, users)
COLLECTION: the name of the MongoDB collection (for example, profiles)
UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
PROJECT_ID:
the Cloud project ID where you want to run the Dataflow job
LOCATION:
the regional endpoint where you want to
deploy your Dataflow job—for example, us-central1
JOB_NAME:
a unique job name of your choice
VERSION:
the version of the template that you want to use
You can use the following values:
latest to use the latest version of the template, which is available in the
non-dated parent folder in the bucket—
gs://dataflow-templates/latest/
the version name, like 2021-09-20-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates/
INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
DATABASE: the name of the MongoDB database (for example, users)
COLLECTION: the name of the MongoDB collection (for example, profiles)
UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.PubSubToMongoDB.Options;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToMongoDB} pipeline is a streaming pipeline which ingests data in JSON format
* from PubSub, applies a Javascript UDF if provided and inserts resulting records as Bson Document
* in MongoDB. If the element fails to be processed then it is written to a deadletter table in
* BigQuery.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The PubSub topic and subscriptions exist
* <li>The MongoDB is up and running
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_NAME=my-project
* BUCKET_NAME=my-bucket
* INPUT_SUBSCRIPTION=my-subscription
* MONGODB_DATABASE_NAME=testdb
* MONGODB_HOSTNAME=my-host:port
* MONGODB_COLLECTION_NAME=testCollection
* DEADLETTERTABLE=project:dataset.deadletter_table_name
*
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.v2.templates.PubSubToMongoDB \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_NAME} \
* --stagingLocation=gs://${BUCKET_NAME}/staging \
* --tempLocation=gs://${BUCKET_NAME}/temp \
* --runner=DataflowRunner \
* --inputSubscription=${INPUT_SUBSCRIPTION} \
* --mongoDBUri=${MONGODB_HOSTNAME} \
* --database=${MONGODB_DATABASE_NAME} \
* --collection=${MONGODB_COLLECTION_NAME} \
* --deadletterTable=${DEADLETTERTABLE}"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_MongoDB",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to MongoDB",
description =
"Streaming pipeline that reads JSON encoded messages from a Pub/Sub subscription,"
+ " transforms them using a JavaScript user-defined function (UDF), and writes them to"
+ " a MongoDB as documents.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-mongodb",
contactInformation = "https://cloud.google.com/support")
public class PubSubToMongoDB {
/**
* Options supported by {@link PubSubToMongoDB}
*
* <p>Inherits standard configuration options.
*/
/** The tag for the main output of the json transformation. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToMongoDB.class);
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*
* <p>Inherits standard configuration options, options from {@link
* JavascriptTextTransformer.JavascriptTextTransformerOptions}.
*/
public interface Options
extends JavascriptTextTransformer.JavascriptTextTransformerOptions, PipelineOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
@Validation.Required
String getInputSubscription();
void setInputSubscription(String inputSubscription);
@TemplateParameter.Text(
order = 2,
description = "MongoDB Connection URI",
helpText = "List of Mongo DB nodes separated by comma.",
example = "host1:port,host2:port,host3:port")
@Validation.Required
String getMongoDBUri();
void setMongoDBUri(String mongoDBUri);
@TemplateParameter.Text(
order = 3,
description = "MongoDB Database",
helpText = "Database in MongoDB to store the collection.",
example = "my-db")
@Validation.Required
String getDatabase();
void setDatabase(String database);
@TemplateParameter.Text(
order = 4,
description = "MongoDB collection",
helpText = "Name of the collection inside MongoDB database to put the documents to.",
example = "my-collection")
@Validation.Required
String getCollection();
void setCollection(String collection);
@TemplateParameter.BigQueryTable(
order = 5,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. If it doesn't exist, it will"
+ " be created during pipeline execution. If not specified,"
+ " \"outputTableSpec_error_records\" is used instead.",
example = "your-project-id:your-dataset.your-table-name")
@Validation.Required
String getDeadletterTable();
void setDeadletterTable(String deadletterTable);
@TemplateParameter.Long(
order = 6,
optional = true,
description = "Batch Size",
helpText = "Batch Size used for batch insertion of documents into MongoDB.")
@Default.Long(1000)
Long getBatchSize();
void setBatchSize(Long batchSize);
@TemplateParameter.Long(
order = 7,
optional = true,
description = "Batch Size in Bytes",
helpText =
"Batch Size in bytes used for batch insertion of documents into MongoDB. Default:"
+ " 5242880 (5mb)")
@Default.Long(5242880)
Long getBatchSizeBytes();
void setBatchSizeBytes(Long batchSizeBytes);
@TemplateParameter.Integer(
order = 8,
optional = true,
description = "Max Connection idle time",
helpText = "Maximum idle time allowed in seconds before connection timeout occurs.")
@Default.Integer(60000)
int getMaxConnectionIdleTime();
void setMaxConnectionIdleTime(int maxConnectionIdleTime);
@TemplateParameter.Boolean(
order = 9,
optional = true,
description = "SSL Enabled",
helpText = "Indicates whether connection to MongoDB is ssl enabled.")
@Default.Boolean(true)
Boolean getSslEnabled();
void setSslEnabled(Boolean sslEnabled);
@TemplateParameter.Boolean(
order = 10,
optional = true,
description = "Ignore SSL Certificate",
helpText = "Indicates whether SSL certificate should be ignored.")
@Default.Boolean(true)
Boolean getIgnoreSSLCertificate();
void setIgnoreSSLCertificate(Boolean ignoreSSLCertificate);
@TemplateParameter.Boolean(
order = 11,
optional = true,
description = "withOrdered",
helpText = "Enables ordered bulk insertions into MongoDB.")
@Default.Boolean(true)
Boolean getWithOrdered();
void setWithOrdered(Boolean withOrdered);
@TemplateParameter.Boolean(
order = 12,
optional = true,
description = "withSSLInvalidHostNameAllowed",
helpText = "Indicates whether invalid host name is allowed for ssl connection.")
@Default.Boolean(true)
Boolean getWithSSLInvalidHostNameAllowed();
void setWithSSLInvalidHostNameAllowed(Boolean withSSLInvalidHostNameAllowed);
}
/** DoFn that will parse the given string elements as Bson Documents. */
private static class ParseAsDocumentsFn extends DoFn<String, Document> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(Document.parse(context.element()));
}
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
// Parse the user options passed from the command-line.
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coders for pipeline
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
* 2) Apply Javascript UDF if provided.
* 3) Write to MongoDB
*
*/
LOG.info("Reading from subscription: " + options.getInputSubscription());
PCollectionTuple convertedPubsubMessages =
pipeline
/*
* Step #1: Read from a PubSub subscription.
*/
.apply(
"Read PubSub Subscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()))
/*
* Step #2: Apply Javascript Transform and transform, if provided and transform
* the PubsubMessages into Json documents.
*/
.apply(
"Apply Javascript UDF",
PubSubMessageToJsonDocument.newBuilder()
.setJavascriptTextTransformFunctionName(
options.getJavascriptTextTransformFunctionName())
.setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
.build());
/*
* Step #3a: Write Json documents into MongoDB using {@link MongoDbIO.write}.
*/
convertedPubsubMessages
.get(TRANSFORM_OUT)
.apply(
"Get Json Documents",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.apply("Parse as BSON Document", ParDo.of(new ParseAsDocumentsFn()))
.apply(
"Put to MongoDB",
MongoDbIO.write()
.withBatchSize(options.getBatchSize())
.withUri(String.format("mongodb://%s", options.getMongoDBUri()))
.withDatabase(options.getDatabase())
.withCollection(options.getCollection())
.withIgnoreSSLCertificate(options.getIgnoreSSLCertificate())
.withMaxConnectionIdleTime(options.getMaxConnectionIdleTime())
.withOrdered(options.getWithOrdered())
.withSSLEnabled(options.getSslEnabled())
.withSSLInvalidHostNameAllowed(options.getWithSSLInvalidHostNameAllowed()));
/*
* Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
*/
convertedPubsubMessages
.get(TRANSFORM_DEADLETTER_OUT)
.apply(
"Write Transform Failures To BigQuery",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(options.getDeadletterTable())
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* The {@link PubSubMessageToJsonDocument} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into JSON objects for insertion into MongoDB while applying an
* optional UDF to the input. The executions of the UDF and transformation to Json objects is done
* in a fail-safe way by wrapping the element with it's original payload inside the {@link
* FailsafeElement} class. The {@link PubSubMessageToJsonDocument} transform will output a {@link
* PCollectionTuple} which contains all output and dead-letter {@link PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToMongoDB#TRANSFORM_OUT} - Contains all records successfully converted to
* JSON objects.
* <li>{@link PubSubToMongoDB#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
@AutoValue
public abstract static class PubSubMessageToJsonDocument
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
public static Builder newBuilder() {
return new AutoValue_PubSubToMongoDB_PubSubMessageToJsonDocument.Builder();
}
@Nullable
public abstract String javascriptTextTransformGcsPath();
@Nullable
public abstract String javascriptTextTransformFunctionName();
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));
// If a Udf is supplied then use it to parse the PubSubMessages.
if (javascriptTextTransformGcsPath() != null) {
return failsafeElements.apply(
"InvokeUDF",
JavascriptTextTransformer.FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(javascriptTextTransformGcsPath())
.setFunctionName(javascriptTextTransformFunctionName())
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
} else {
return failsafeElements.apply(
"ProcessPubSubMessages",
ParDo.of(new ProcessFailsafePubSubFn())
.withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
}
}
/** Builder for {@link PubSubMessageToJsonDocument}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setJavascriptTextTransformGcsPath(
String javascriptTextTransformGcsPath);
public abstract Builder setJavascriptTextTransformFunctionName(
String javascriptTextTransformFunctionName);
public abstract PubSubMessageToJsonDocument build();
}
}
/**
* The {@link ProcessFailsafePubSubFn} class processes a {@link FailsafeElement} containing a
* {@link PubsubMessage} and a String of the message's payload {@link PubsubMessage#getPayload()}
* into a {@link FailsafeElement} of the original {@link PubsubMessage} and a JSON string that has
* been processed with {@link Gson}.
*
* <p>If {@link PubsubMessage#getAttributeMap()} is not empty then the message attributes will be
* serialized along with the message payload.
*/
static class ProcessFailsafePubSubFn
extends DoFn<FailsafeElement<PubsubMessage, String>, FailsafeElement<PubsubMessage, String>> {
private static final Counter successCounter =
Metrics.counter(PubSubMessageToJsonDocument.class, "successful-json-conversion");
private static Gson gson = new Gson();
private static final Counter failedCounter =
Metrics.counter(PubSubMessageToJsonDocument.class, "failed-json-conversion");
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage pubsubMessage = context.element().getOriginalPayload();
JsonObject messageObject = new JsonObject();
try {
if (pubsubMessage.getPayload().length > 0) {
messageObject = gson.fromJson(new String(pubsubMessage.getPayload()), JsonObject.class);
}
// If message attributes are present they will be serialized along with the message payload
if (pubsubMessage.getAttributeMap() != null) {
pubsubMessage.getAttributeMap().forEach(messageObject::addProperty);
}
context.output(FailsafeElement.of(pubsubMessage, messageObject.toString()));
successCounter.inc();
} catch (JsonSyntaxException e) {
context.output(
TRANSFORM_DEADLETTER_OUT,
FailsafeElement.of(context.element())
.setErrorMessage(e.getMessage())
.setStacktrace(Throwables.getStackTraceAsString(e)));
failedCounter.inc();
}
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub to Elasticsearch
The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. The Dataflow template uses Elasticsearch's data streams feature to store time series data across multiple indices while giving you a single named resource for requests. Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.
Requirements for this pipeline
The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.
A publicly reachable Elasticsearch host on a GCP instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See Google Cloud Integration for Elastic for more details.
A Pub/Sub topic for error output.
Template parameters
Parameter
Description
inputSubscription
The Pub/Sub subscription to consume from. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl
Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud.
apiKey
Base64 Encoded API key used for authentication.
errorOutputTopic
Pub/Sub output topic for publishing failed records in the format of projects/<project-id>/topics/<topic-name>
dataset
(Optional) The type of logs sent via Pub/Sub, for which we have an out-of-the box dashboard. Known log types values are audit, vpcflow, and firewall. Default: pubsub.
namespace
(Optional) An arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. Default: default.
batchSize
(Optional) Batch size in number of documents. Default: 1000.
batchSizeBytes
(Optional) Batch size in number of bytes. Default: 5242880 (5mb).
maxRetryAttempts
(Optional) Max retry attempts, must be > 0. Default: no retries.
maxRetryDuration
(Optional) Max retry duration in milliseconds, must be > 0. Default: no retries.
javascriptTextTransformGcsPath
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name is
myTransform. For sample JavaScript UDFs, see
UDF Examples.
propertyAsIndex
(Optional) A property in the document being indexed whose value will specify _index metadata to be included with document in bulk request (takes precedence over an _index UDF). Default: none.
propertyAsId
(Optional) A property in the document being indexed whose value will specify _id metadata to be included with document in bulk request (takes precedence over an _id UDF). Default: none.
javaScriptIndexFnGcsPath
(Optional) The Cloud Storage path to the JavaScript UDF source for a function that will specify _index metadata to be included with document in bulk request. Default: none.
javaScriptIndexFnName
(Optional) UDF JavaScript function name for function that will specify _index metadata to be included with document in bulk request. Default: none.
javaScriptIdFnGcsPath
(Optional) The Cloud Storage path to the JavaScript UDF source for a function that will specify _id metadata to be included with document in bulk request. Default: none.
javaScriptIdFnName
(Optional) UDF JavaScript function name for function that will specify _id metadata to be included with document in bulk request. Default: none.
javaScriptTypeFnGcsPath
(Optional) The Cloud Storage path to the JavaScript UDF source for a function that will specify _type metadata to be included with document in bulk request. Default: none.
javaScriptTypeFnName
(Optional) UDF JavaScript function name for function that will specify _type metadata to be included with document in bulk request. Default: none.
javaScriptIsDeleteFnGcsPath
(Optional) The Cloud Storage path to JavaScript UDF source for function that will determine if document should be deleted rather than inserted or updated. The function should return string value "true" or "false". Default: none.
javaScriptIsDeleteFnName
(Optional) UDF JavaScript function name for function that will determine if document should be deleted rather than inserted or updated. The function should return string value "true" or "false". Default: none.
usePartialUpdate
(Optional) Whether to use partial updates (update rather than create or index, allowing partial docs) with Elasticsearch requests. Default: false.
bulkInsertMethod
(Optional) Whether to use INDEX (index, allows upserts) or CREATE (create, errors on duplicate _id) with Elasticsearch bulk requests. Default: CREATE.