Plantillas de transmisión de Dataflow que proporciona Google
Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
Google proporciona un conjunto de plantillas de código abierto de Dataflow.
Estas plantillas de Dataflow pueden ayudarte a resolver grandes tareas de datos, incluidas la importación, la exportación, la copia de seguridad y el restablecimiento de datos, y las operaciones de API masivas, todo sin el uso de un entorno de desarrollo dedicado. Las plantillas se compilan en Apache Beam y usan Dataflow para transformar los datos.
En esta guía, se documentan las plantillas de transmisión.
Suscripción de Pub/Sub a BigQuery
La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.
Requisitos para esta canalización:
El campo data de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON.
Por ejemplo, los mensajes con valores en el campo data con formato {"k1":"v1", "k2":"v2"} se pueden insertar en una tabla de BigQuery con dos columnas, llamadas k1 y k2, con un tipo de datos de string.
La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.
Parámetros de la plantilla
Parámetro
Descripción
inputSubscription
Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputTableSpec
Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable
La tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato <my-project>:<my-dataset>.<my-table>.
Si no existe, se crea durante la ejecución de la canalización.
Si no se especifica, se usa OUTPUT_TABLE_SPEC_error_records en su lugar.
javascriptTextTransformGcsPath
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
Ejecuta la plantilla de suscripción de Pub/Sub a BigQuery
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.
Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Subscription to BigQuery template.
En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
--region REGION_NAME \
--staging-location STAGING_LOCATION \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Reemplaza lo siguiente:
JOB_NAME:
Es el nombre del trabajo que elijas
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
DATASET: Es el conjunto de datos de BigQuery.
TABLE_NAME: Es el nombre de la tabla de BigQuery.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
@Template(
name = "PubSub_Subscription_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub subscription, transforms"
+ " them using a JavaScript user-defined function (UDF), and writes them to a"
+ " pre-existing BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputTopic",
contactInformation = "https://cloud.google.com/support")
@Template(
name = "PubSub_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Topic to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them"
+ " using a JavaScript user-defined function (UDF), and writes them to a pre-existing"
+ " BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The table’s schema must match the "
+ "input JSON objects.")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Input Pub/Sub topic",
helpText = "The Pub/Sub topic to read the input from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateParameter.PubsubSubscription(
order = 3,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
@TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
@Description(
"This determines whether the template reads from a Pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.BigQueryTable(
order = 5,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. It should be in the format"
+ " of \"your-project-id:your-dataset.your-table-name\". If it doesn't exist, it"
+ " will be created during pipeline execution. If not specified,"
+ " \"{outputTableSpec}_error_records\" is used instead.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Tema de Pub/Sub a BigQuery
La plantilla de tema de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de un tema de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.
Requisitos para esta canalización:
El campo data de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON.
Por ejemplo, los mensajes con valores en el campo data con formato {"k1":"v1", "k2":"v2"} se pueden insertar en una tabla de BigQuery con dos columnas, llamadas k1 y k2, con un tipo de datos de string.
La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.
Parámetros de la plantilla
Parámetro
Descripción
inputTopic
El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic>.
outputTableSpec
Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable
La tabla de BigQuery para los mensajes que no llegaron a la tabla de resultados. Debe estar en formato <my-project>:<my-dataset>.<my-table>.
Si no existe, se crea durante la ejecución de la canalización.
Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.
javascriptTextTransformGcsPath
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
Ejecuta la plantilla del tema de Pub/Sub a BigQuery
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.
Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic to BigQuery template.
En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
--region REGION_NAME \
--staging-location STAGING_LOCATION \
--parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Reemplaza lo siguiente:
JOB_NAME:
Es el nombre del trabajo que elijas
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
DATASET: Es el conjunto de datos de BigQuery.
TABLE_NAME: Es el nombre de la tabla de BigQuery.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
@Template(
name = "PubSub_Subscription_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub subscription, transforms"
+ " them using a JavaScript user-defined function (UDF), and writes them to a"
+ " pre-existing BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputTopic",
contactInformation = "https://cloud.google.com/support")
@Template(
name = "PubSub_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Topic to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them"
+ " using a JavaScript user-defined function (UDF), and writes them to a pre-existing"
+ " BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The table’s schema must match the "
+ "input JSON objects.")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Input Pub/Sub topic",
helpText = "The Pub/Sub topic to read the input from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateParameter.PubsubSubscription(
order = 3,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
@TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
@Description(
"This determines whether the template reads from a Pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.BigQueryTable(
order = 5,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. It should be in the format"
+ " of \"your-project-id:your-dataset.your-table-name\". If it doesn't exist, it"
+ " will be created during pipeline execution. If not specified,"
+ " \"{outputTableSpec}_error_records\" is used instead.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub Avro a BigQuery
La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery.
Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.
Requisitos para esta canalización
La suscripción de entrada de Pub/Sub debe existir.
El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
El tema de Pub/Sub sin procesar debe existir. ¿
El conjunto de datos de salida de BigQuery debe existir.
Parámetros de la plantilla
Parámetro
Descripción
schemaPath
Ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo, gs://path/to/my/schema.avsc.
inputSubscription
Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic
El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec
Ubicación de la tabla de salida de BigQuery. Por ejemplo, <my-project>:<my-dataset>.<my-table>.
Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el esquema de Avro proporcionado por el usuario.
writeDisposition
La WriteDisposition de BigQuery (opcional).
Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Predeterminada: WRITE_APPEND.
createDisposition
La CreateDisposition de BigQuery (opcional).
Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Predeterminada: CREATE_IF_NEEDED.
Ejecuta la plantilla de Pub/Sub Avro a BigQuery
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubAvroToBigQuery.PubsubAvroToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.Row;
/**
* A Dataflow pipeline to stream <a href="https://avro.apache.org/">Apache Avro</a> records from
* Pub/Sub into a BigQuery table.
*
* <p>Any persistent failures while writing to BigQuery will be written to a Pub/Sub dead-letter
* topic.
*/
@Template(
name = "PubSub_Avro_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Avro to BigQuery",
description =
"A streaming pipeline which inserts Avro records from a Pub/Sub subscription into a"
+ " BigQuery table.",
optionsClass = PubsubAvroToBigQueryOptions.class,
flexContainerName = "pubsub-avro-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public final class PubsubAvroToBigQuery {
/**
* Validates input flags and executes the Dataflow pipeline.
*
* @param args command line arguments to the pipeline
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
PubsubAvroToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubAvroToBigQueryOptions.class);
run(options);
}
/**
* Provides custom {@link org.apache.beam.sdk.options.PipelineOptions} required to execute the
* {@link PubsubAvroToBigQuery} pipeline.
*/
public interface PubsubAvroToBigQueryOptions
extends ReadSubscriptionOptions,
WriteOptions,
WriteTopicOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage path to the Avro schema file",
helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.")
@Required
String getSchemaPath();
void setSchemaPath(String schemaPath);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options execution parameters to the pipeline
* @return result of the pipeline execution as a {@link PipelineResult}
*/
private static PipelineResult run(PubsubAvroToBigQueryOptions options) {
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
Schema schema = SchemaUtils.getAvroSchema(options.getSchemaPath());
WriteResult writeResults =
pipeline
.apply(
"Read Avro records",
PubsubIO.readAvroGenericRecords(schema)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic()))
// Workaround for BEAM-12256. Eagerly convert to rows to avoid
// the RowToGenericRecord function that doesn't handle all data
// types.
// TODO: Remove this workaround when a fix for BEAM-12256 is
// released.
.apply(Convert.toRows())
.apply(
"Write to BigQuery",
BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResults, options)
.apply(
"Create error payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<GenericRecord>newBuilder()
.setPayloadCoder(AvroCoder.of(schema))
.setTranslateFunction(BigQueryConverters.TableRowToGenericRecordFn.of(schema))
.build())
.apply("Write failed records", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}
Proto de Pub/Sub a BigQuery
La plantilla de proto de Pub/Sub a BigQuery es una canalización de transmisión que transfiere
datos de proto desde una suscripción a Pub/Sub hacia una tabla de BigQuery.
Cualquier error que
ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de
Pub/Sub sin procesar.
Se puede proporcionar una función definida por el usuario (UDF) de JavaScript para transformar los datos. Los errores mientras se ejecuta la UDF se pueden enviar a un tema de Pub/Sub separado o al mismo tema sin procesar que los errores de BigQuery.
Requisitos para esta canalización:
La suscripción de entrada de Pub/Sub debe existir.
El archivo de esquema de los registros proto debe existir en Cloud Storage.
El tema de Pub/Sub de salida debe existir.
El conjunto de datos de salida de BigQuery debe existir.
Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos del proto, sin importar el valor createDisposition.
Parámetros de la plantilla
Parámetro
Descripción
protoSchemaPath
La ubicación de Cloud Storage del archivo de esquema proto autónomo. Por ejemplo, gs://path/to/my/file.pb.
Este archivo se puede generar con la marca --descriptor_set_out del comando protoc.
La marca --include_imports garantiza que el archivo sea autónomo.
fullMessageName
El nombre completo del mensaje proto. Por ejemplo, package.name.MessageName, en el que package.name es el valor proporcionado para la declaración package y no la declaración java_package.
inputSubscription
Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic
El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec
Ubicación de la tabla de salida de BigQuery. Por ejemplo, my-project:my_dataset.my_table.
Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el archivo de esquema de entrada.
preserveProtoFieldNames
true para conservar el nombre del campo Proto original en JSON (opcional). false para usar nombres JSON más estándar.
Por ejemplo, false cambiaría field_name a fieldName. (Default: false)
bigQueryTableSchemaPath
Ruta de Cloud Storage a la ruta del esquema de BigQuery (opcional). Por ejemplo, gs://path/to/my/schema.json. Si no se proporciona, entonces el esquema se infiere a partir del esquema Proto.
javascriptTextTransformGcsPath
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
udfOutputTopic
El tema de Pub/Sub que almacena los errores de las UDF (opcional). Por ejemplo, projects/<project-id>/topics/<topic-name>. Si no se proporciona, los errores de UDF se envían al mismo tema que outputTopic.
writeDisposition
La WriteDisposition de BigQuery (opcional).
Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Valor predeterminado: WRITE_APPEND.
createDisposition
La CreateDisposition de BigQuery (opcional).
Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Configuración predeterminada: CREATE_IF_NEEDED.
Ejecuta la plantilla de proto de Pub/Sub a BigQuery
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.ArrayUtils;
/**
* A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
* records from Pub/Sub to BigQuery.
*
* <p>Persistent failures are written to a Pub/Sub unprocessed topic.
*/
@Template(
name = "PubSub_Proto_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Proto to BigQuery",
description =
"A streaming pipeline that reads Protobuf messages from a Pub/Sub subscription and writes"
+ " them to a BigQuery table.",
optionsClass = PubSubProtoToBigQueryOptions.class,
flexContainerName = "pubsub-proto-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public final class PubsubProtoToBigQuery {
private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
public static void main(String[] args) {
UncaughtExceptionLogger.register();
run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
}
/** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
public interface PubSubProtoToBigQueryOptions
extends ReadSubscriptionOptions,
WriteOptions,
WriteTopicOptions,
JavascriptTextTransformerOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage Path to the Proto Schema File",
helpText =
"Cloud Storage path to a self-contained descriptor set file. Example:"
+ " gs://MyBucket/schema.pb. `schema.pb` can be generated by adding"
+ " `--descriptor_set_out=schema.pb` to the `protoc` command that compiles the"
+ " protos. The `--include_imports` flag can be used to guarantee that the file is"
+ " self-contained.")
@Required
String getProtoSchemaPath();
void setProtoSchemaPath(String value);
@TemplateParameter.Text(
order = 2,
regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
description = "Full Proto Message Name",
helpText =
"The full message name (example: package.name.MessageName). If the message is nested"
+ " inside of another message, then include all messages with the '.' delimiter"
+ " (example: package.name.OuterMessage.InnerMessage). 'package.name' should be"
+ " from the `package` statement, not the `java_package` statement.")
@Required
String getFullMessageName();
void setFullMessageName(String value);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Preserve Proto Field Names",
helpText =
"Flag to control whether proto field names should be kept or converted to"
+ " lowerCamelCase. If the table already exists, this should be based on what"
+ " matches the table's schema. Otherwise, it will determine the column names of"
+ " the created table. True to preserve proto snake_case. False will convert fields"
+ " to lowerCamelCase. (Default: false)")
@Default.Boolean(false)
Boolean getPreserveProtoFieldNames();
void setPreserveProtoFieldNames(Boolean value);
@TemplateParameter.GcsReadFile(
order = 4,
optional = true,
description = "BigQuery Table Schema Path",
helpText =
"Cloud Storage path to the BigQuery schema JSON file. "
+ "If this is not set, then the schema is inferred "
+ "from the Proto schema.",
example = "gs://MyBucket/bq_schema.json")
String getBigQueryTableSchemaPath();
void setBigQueryTableSchemaPath(String value);
@TemplateParameter.PubsubTopic(
order = 5,
optional = true,
description = "Pub/Sub output topic for UDF failures",
helpText =
"An optional output topic to send UDF failures to. If this option is not set, then"
+ " failures will be written to the same topic as the BigQuery failures.",
example = "projects/your-project-id/topics/your-topic-name")
String getUdfOutputTopic();
void setUdfOutputTopic(String udfOutputTopic);
}
/** Runs the pipeline and returns the results. */
private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
Pipeline pipeline = Pipeline.create(options);
Descriptor descriptor = getDescriptor(options);
PCollection<String> maybeForUdf =
pipeline
.apply("Read From Pubsub", readPubsubMessages(options, descriptor))
.apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
WriteResult writeResult =
runUdf(maybeForUdf, options)
.apply("Write to BigQuery", writeToBigQuery(options, descriptor));
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"Create Error Payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
.setPayloadCoder(StringUtf8Coder.of())
.setTranslateFunction(BigQueryConverters::tableRowToJson)
.build())
.apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
return pipeline.run();
}
/** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
@VisibleForTesting
static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
String schemaPath = options.getProtoSchemaPath();
String messageName = options.getFullMessageName();
Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
if (descriptor == null) {
throw new IllegalArgumentException(
messageName + " is not a recognized message in " + schemaPath);
}
return descriptor;
}
/** Returns the {@link PTransform} for reading Pub/Sub messages. */
private static Read<DynamicMessage> readPubsubMessages(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
return PubsubIO.readProtoDynamicMessages(descriptor)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic());
}
/**
* Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
*
* <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
* specified in {@code options}.
*/
@VisibleForTesting
static Write<String> writeToBigQuery(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
Write<String> write =
BigQueryConverters.<String>createWriteTransform(options)
.withFormatFunction(BigQueryConverters::convertJsonToTableRow);
String schemaPath = options.getBigQueryTableSchemaPath();
if (Strings.isNullOrEmpty(schemaPath)) {
return write.withSchema(
SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
} else {
return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
}
}
/** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
private static class ConvertDynamicProtoMessageToJson
extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
private final boolean preserveProtoName;
private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
this.preserveProtoName = options.getPreserveProtoFieldNames();
}
@Override
public PCollection<String> expand(PCollection<DynamicMessage> input) {
return input.apply(
"Map to JSON",
MapElements.into(TypeDescriptors.strings())
.via(
message -> {
try {
JsonFormat.Printer printer = JsonFormat.printer();
return preserveProtoName
? printer.preservingProtoFieldNames().print(message)
: printer.print(message);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}));
}
}
/**
* Handles running the UDF.
*
* <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
*
* <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
* topic.
*
* @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
* @param options the options containing info on running the UDF
* @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
* called
*/
@VisibleForTesting
static PCollection<String> runUdf(
PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
// In order to avoid generating a graph that makes it look like a UDF was called when none was
// intended, simply return the input as "success" output.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) {
return jsonCollection;
}
// For testing purposes, we need to do this check before creating the PTransform rather than
// in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
// a value.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
throw new IllegalArgumentException(
"JavaScript function name cannot be null or empty if file is set");
}
PCollectionTuple maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
maybeSuccess
.get(UDF_FAILURE_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Failures",
ConvertFailsafeElementToPubsubMessage.<String, String>builder()
.setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
.setErrorMessageAttributeKey("udfErrorMessage")
.build())
.apply("Write Failed UDF", writeUdfFailures(options));
return maybeSuccess
.get(UDF_SUCCESS_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Output",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.setCoder(NullableCoder.of(StringUtf8Coder.of()));
}
/** {@link PTransform} that calls a UDF and returns both success and failure output. */
private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
private final PubSubProtoToBigQueryOptions options;
RunUdf(PubSubProtoToBigQueryOptions options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<String> input) {
return input
.apply("Prepare Failsafe UDF", makeFailsafe())
.setCoder(FAILSAFE_CODER)
.apply(
"Call UDF",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_SUCCESS_TAG)
.setFailureTag(UDF_FAILURE_TAG)
.build());
}
private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
.via((String json) -> FailsafeElement.of(json, json));
}
}
/**
* Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
* topic.
*/
private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
PubSubProtoToBigQueryOptions options) {
PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
return Strings.isNullOrEmpty(options.getUdfOutputTopic())
? write.to(options.getOutputTopic())
: write.to(options.getUdfOutputTopic());
}
}
Pub/Sub a Pub/Sub
La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.
Requisitos para esta canalización:
La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento.
Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento.
Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubsubToPubsub.Options;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A template that copies messages from one Pubsub subscription to another Pubsub topic. */
@Template(
name = "Cloud_PubSub_to_Cloud_PubSub",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Pub/Sub",
description =
"Streaming pipeline. Reads from a Pub/Sub subscription and writes to a Pub/Sub topic. ",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class PubsubToPubsub {
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/**
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription. 2) Apply any
* filters if an attribute=value pair is provided. 3) Write each PubSubMessage to output PubSub
* topic.
*/
pipeline
.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(
"Filter Events If Enabled",
ParDo.of(
ExtractAndFilterEventsFn.newBuilder()
.withFilterKey(options.getFilterKey())
.withFilterValue(options.getFilterValue())
.build()))
.apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Options supported by {@link PubsubToPubsub}.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
@Validation.Required
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> inputSubscription);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Output Pub/Sub topic",
helpText =
"The name of the topic to which data should published, in the format of 'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Event filter key",
helpText =
"Attribute key by which events are filtered. No filters are applied if no key is specified.")
ValueProvider<String> getFilterKey();
void setFilterKey(ValueProvider<String> filterKey);
@TemplateParameter.Text(
order = 4,
optional = true,
description = "Event filter value",
helpText =
"Filter attribute value to use if an event filter key is provided. Accepts a valid "
+ "Java Regex string as an event filter value. In case a regex is provided, the complete "
+ "expression should match in order for the message to be filtered. Partial matches (e.g. "
+ "substring) will not be filtered. A null event filter value is used by default.")
ValueProvider<String> getFilterValue();
void setFilterValue(ValueProvider<String> filterValue);
}
/**
* DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
* publish events that pass the filter else, it will publish all input events.
*/
@AutoValue
public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);
// Counter tracking the number of incoming Pub/Sub messages.
private static final Counter INPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");
// Counter tracking the number of output Pub/Sub messages after the user provided filter
// is applied.
private static final Counter OUTPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");
private Boolean doFilter;
private String inputFilterKey;
private Pattern inputFilterValueRegex;
private Boolean isNullFilterValue;
public static Builder newBuilder() {
return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
}
@Nullable
abstract ValueProvider<String> filterKey();
@Nullable
abstract ValueProvider<String> filterValue();
@Setup
public void setup() {
if (this.doFilter != null) {
return; // Filter has been evaluated already
}
inputFilterKey = (filterKey() == null ? null : filterKey().get());
if (inputFilterKey == null) {
// Disable input message filtering.
this.doFilter = false;
} else {
this.doFilter = true; // Enable filtering.
String inputFilterValue = (filterValue() == null ? null : filterValue().get());
if (inputFilterValue == null) {
LOG.warn(
"User provided a NULL for filterValue. Only messages with a value of NULL for the"
+ " filterKey: {} will be filtered forward",
inputFilterKey);
// For backward compatibility, we are allowing filtering by null filterValue.
this.isNullFilterValue = true;
this.inputFilterValueRegex = null;
} else {
this.isNullFilterValue = false;
try {
inputFilterValueRegex = getFilterPattern(inputFilterValue);
} catch (PatternSyntaxException e) {
LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
throw new RuntimeException(e);
}
}
LOG.info(
"Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
}
}
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_COUNTER.inc();
if (!this.doFilter) {
// Filter is not enabled
writeOutput(context, context.element());
} else {
PubsubMessage message = context.element();
String extractedValue = message.getAttribute(this.inputFilterKey);
if (this.isNullFilterValue) {
if (extractedValue == null) {
// If we are filtering for null and the extracted value is null, we forward
// the message.
writeOutput(context, message);
}
} else {
if (extractedValue != null
&& this.inputFilterValueRegex.matcher(extractedValue).matches()) {
// If the extracted value is not null and it matches the filter,
// we forward the message.
writeOutput(context, message);
}
}
}
}
/**
* Write a {@link PubsubMessage} and increment the output counter.
*
* @param context {@link ProcessContext} to write {@link PubsubMessage} to.
* @param message {@link PubsubMessage} output.
*/
private void writeOutput(ProcessContext context, PubsubMessage message) {
OUTPUT_COUNTER.inc();
context.output(message);
}
/**
* Return a {@link Pattern} based on a user provided regex string.
*
* @param regex Regex string to compile.
* @return {@link Pattern}
* @throws PatternSyntaxException If the string is an invalid regex.
*/
private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
checkNotNull(regex, "Filter regex cannot be null.");
return Pattern.compile(regex);
}
/** Builder class for {@link ExtractAndFilterEventsFn}. */
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilterKey(ValueProvider<String> filterKey);
abstract Builder setFilterValue(ValueProvider<String> filterValue);
abstract ExtractAndFilterEventsFn build();
/**
* Method to set the filterKey used for filtering messages.
*
* @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterKey(ValueProvider<String> filterKey) {
checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
return setFilterKey(filterKey);
}
/**
* Method to set the filterValue used for filtering messages.
*
* @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterValue(ValueProvider<String> filterValue) {
checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
return setFilterValue(filterValue);
}
}
}
}
Pub/Sub a Splunk
La plantilla de Pub/Sub a Splunk es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub y escribe la carga útil del mensaje en Splunk mediante el recopilador de eventos HTTP (HEC) de Splunk. El caso de uso más común de esta plantilla es exportar registros a Splunk. Para ver un ejemplo del flujo de trabajo subyacente, consulta Implementa exportaciones de registros listas para la producción a Splunk mediante Dataflow.
Antes de escribir en Splunk, también puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje. Los mensajes con fallas de procesamiento se reenvían a un tema de mensajes no enviados de Pub/Sub para solucionar los problemas y volver a procesarlos.
Como una capa adicional de protección para tu token HEC, también puedes pasar una clave de Cloud KMS junto con el parámetro de token HEC codificado en base64 encriptado con la clave de Cloud KMS.
Consulta el extremo de encriptación de la API de Cloud KMS para obtener detalles adicionales sobre la encriptación de tu parámetro de token HEC.
Requisitos para esta canalización:
La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
El tema sin procesar de Pub/Sub debe existir antes de ejecutar la canalización.
Se debe poder acceder al extremo de HEC de Splunk desde la red de trabajadores de Dataflow.
El token HEC de Splunk se debe generar y estar disponible.
Parámetros de la plantilla
Parámetro
Descripción
inputSubscription
La suscripción de Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
token
(Opcional) El token de autenticación HEC de Splunk. Se debe proporcionar si tokenSource está configurado como PLAINTEXT o KMS.
url
La URL de HEC de Splunk. Debe ser enrutable desde la VPC en la que se ejecuta la canalización. Por ejemplo, https://splunk-hec-host:8088.
outputDeadletterTopic
El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
batchCount
El tamaño del lote para enviar varios eventos a Splunk (opcional). Predeterminado 1 (sin lotes).
parallelism
La cantidad máxima de solicitudes paralelas (opcional). Predeterminado 1 (sin paralelismo).
disableCertificateValidation
Inhabilita la validación del certificado SSL (opcional). El valor predeterminado es falso (validación habilitada). Si es verdadero, los certificados no se validan (todos los certificados son de confianza) y se ignora el parámetro “rootCaCertificatePath”.
includePubsubMessage
Incluye el mensaje de Pub/Sub completo en la carga útil (opcional). El valor predeterminado es falso (solo se incluye el elemento de datos en la carga útil).
tokenSource
Fuente del token. Puede ser PLAINTEXT, KMS o SECRET_MANAGER. Este parámetro se debe proporcionar si se usa Secret Manager.
Si tokenSource se configura como KMS, tokenKMSEncryptionKey y el token encriptado se deben proporcionar.
Si tokenSource se configura como SECRET_MANAGER, tokenSecretIdse debe proporcionar.
Si tokenSource se configura como PLAINTEXT, tokense debe proporcionar.
tokenKMSEncryptionKey
La clave de Cloud KMS para desencriptar la string del token HEC (opcional). Este parámetro se debe proporcionar si tokenSource se configura como KMS.
Si se proporciona la clave de Cloud KMS, la string del token HEC debe pasarse encriptada.
tokenSecretId
(Opcional) El ID del Secret de Secret Manager para el token. Este parámetro debe proporcionarse si el tokenSource está configurado como SECRET_MANAGER.
Debe tener el formato projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath
La URL completa al certificado de CA raíz en Cloud Storage (opcional). Por ejemplo, gs://mybucket/mycerts/privateCA.crt. El certificado provisto en Cloud Storage debe estar codificado en DER y puede proporcionarse en codificación binaria o imprimible (Base64).
Si el certificado se proporciona en codificación Base64, debe estar delimitado al comienzo por -----BEGIN CERTIFICATE----- y debe estar limitado al final por -----END CERTIFICATE-----. Si se proporciona este parámetro, este archivo de certificado de CA privado se recupera y se agrega al almacén de confianza del trabajador de Dataflow para verificar el certificado SSL del extremo del HEC de Splunk.
Si no se proporciona este parámetro, se usa el almacén de confianza predeterminado.
enableBatchLogs
(Opcional) Especifica si se deben habilitar los registros para los lotes escritos en Splunk. Valor predeterminado: true.
enableGzipHttpCompression
(Opcional) Especifica si las solicitudes HTTP enviadas a HEC de Splunk deben comprimirse (contenido gzip codificado). Valor predeterminado: true.
Ejecuta la plantilla de Pub/Sub a Splunk
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
ROOT_CA_CERTIFICATE_PATH: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo, gs://your-bucket/privateCA.crt)
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
ROOT_CA_CERTIFICATE_PATH: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo, gs://your-bucket/privateCA.crt)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.splunk.SplunkEvent;
import com.google.cloud.teleport.splunk.SplunkEventCoder;
import com.google.cloud.teleport.splunk.SplunkIO;
import com.google.cloud.teleport.splunk.SplunkWriteError;
import com.google.cloud.teleport.templates.PubSubToSplunk.PubSubToSplunkOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadSubscriptionOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubWriteDeadletterTopicOptions;
import com.google.cloud.teleport.templates.common.SplunkConverters;
import com.google.cloud.teleport.templates.common.SplunkConverters.SplunkOptions;
import com.google.cloud.teleport.util.TokenNestedValueProvider;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToSplunk} pipeline is a streaming pipeline which ingests data from Cloud
* Pub/Sub, executes a UDF, converts the output to {@link SplunkEvent}s and writes those records
* into Splunk's HEC endpoint. Any errors which occur in the execution of the UDF, conversion to
* {@link SplunkEvent} or writing to HEC will be streamed into a Pub/Sub topic.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The source Pub/Sub subscription exists.
* <li>HEC end-point is routable from the VPC where the Dataflow job executes.
* <li>Deadletter topic exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToSplunk \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --runner=${RUNNER}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-splunk-$USER-`date +"%Y%m%d-%H%M%S%z"`
* BATCH_COUNT=1
* PARALLELISM=5
*
* # Execute the templated pipeline:
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* token=my-splunk-hec-token,\
* url=http://splunk-hec-server-address:8088,\
* batchCount=${BATCH_COUNT},\
* parallelism=${PARALLELISM},\
* disableCertificateValidation=false,\
* outputDeadletterTopic=projects/${PROJECT_ID}/topics/deadletter-topic-name,\
* javascriptTextTransformGcsPath=gs://${BUCKET_NAME}/splunk/js/my-js-udf.js,\
* javascriptTextTransformFunctionName=myUdf"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_Splunk",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Splunk",
description =
"A pipeline that reads from a Pub/Sub subscription and writes to Splunk's HTTP Event Collector (HEC).",
optionsClass = PubSubToSplunkOptions.class,
optionsOrder = {
PubsubReadSubscriptionOptions.class,
SplunkOptions.class,
JavascriptTextTransformerOptions.class,
PubsubWriteDeadletterTopicOptions.class
},
contactInformation = "https://cloud.google.com/support")
public class PubSubToSplunk {
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** Counter to track inbound messages from source. */
private static final Counter INPUT_MESSAGES_COUNTER =
Metrics.counter(PubSubToSplunk.class, "inbound-pubsub-messages");
/** The tag for successful {@link SplunkEvent} conversion. */
private static final TupleTag<SplunkEvent> SPLUNK_EVENT_OUT = new TupleTag<SplunkEvent>() {};
/** The tag for failed {@link SplunkEvent} conversion. */
private static final TupleTag<FailsafeElement<String, String>> SPLUNK_EVENT_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the dead-letter output of the udf. */
private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** GSON to process a {@link PubsubMessage}. */
private static final Gson GSON = new Gson();
/** Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToSplunk.class);
private static final Boolean DEFAULT_INCLUDE_PUBSUB_MESSAGE = false;
@VisibleForTesting protected static final String PUBSUB_MESSAGE_ATTRIBUTE_FIELD = "attributes";
@VisibleForTesting protected static final String PUBSUB_MESSAGE_DATA_FIELD = "data";
private static final String PUBSUB_MESSAGE_ID_FIELD = "messageId";
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToSplunk#run(PubSubToSplunkOptions)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
PubSubToSplunkOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToSplunkOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(PubSubToSplunkOptions options) {
Pipeline pipeline = Pipeline.create(options);
// Register coders.
CoderRegistry registry = pipeline.getCoderRegistry();
registry.registerCoderForClass(SplunkEvent.class, SplunkEventCoder.of());
registry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Convert message to FailsafeElement for processing.
* 3) Apply user provided UDF (if any) on the input strings.
* 4) Convert successfully transformed messages into SplunkEvent objects
* 5) Write SplunkEvents to Splunk's HEC end point.
* 5a) Wrap write failures into a FailsafeElement.
* 6) Collect errors from UDF transform (#3), SplunkEvent transform (#4)
* and writing to Splunk HEC (#5) and stream into a Pub/Sub deadletter topic.
*/
// 1) Read messages in from Pub/Sub
PCollection<String> stringMessages =
pipeline.apply(
"ReadMessages",
new ReadMessages(options.getInputSubscription(), options.getIncludePubsubMessage()));
// 2) Convert message to FailsafeElement for processing.
PCollectionTuple transformedOutput =
stringMessages
.apply(
"ConvertToFailsafeElement",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(input -> FailsafeElement.of(input, input)))
// 3) Apply user provided UDF (if any) on the input strings.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setLoggingEnabled(ValueProvider.StaticValueProvider.of(true))
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// 4) Convert successfully transformed messages into SplunkEvent objects
PCollectionTuple convertToEventTuple =
transformedOutput
.get(UDF_OUT)
.apply(
"ConvertToSplunkEvent",
SplunkConverters.failsafeStringToSplunkEvent(
SPLUNK_EVENT_OUT, SPLUNK_EVENT_DEADLETTER_OUT));
// 5) Write SplunkEvents to Splunk's HEC end point.
PCollection<SplunkWriteError> writeErrors =
convertToEventTuple
.get(SPLUNK_EVENT_OUT)
.apply(
"WriteToSplunk",
SplunkIO.writeBuilder()
.withToken(
new TokenNestedValueProvider(
options.getTokenSecretId(),
options.getTokenKMSEncryptionKey(),
options.getToken(),
options.getTokenSource()))
.withUrl(options.getUrl())
.withBatchCount(options.getBatchCount())
.withParallelism(options.getParallelism())
.withDisableCertificateValidation(options.getDisableCertificateValidation())
.withRootCaCertificatePath(options.getRootCaCertificatePath())
.withEnableBatchLogs(options.getEnableBatchLogs())
.withEnableGzipHttpCompression(options.getEnableGzipHttpCompression())
.build());
// 5a) Wrap write failures into a FailsafeElement.
PCollection<FailsafeElement<String, String>> wrappedSplunkWriteErrors =
writeErrors.apply(
"WrapSplunkWriteErrors",
ParDo.of(
new DoFn<SplunkWriteError, FailsafeElement<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
SplunkWriteError error = context.element();
FailsafeElement<String, String> failsafeElement =
FailsafeElement.of(error.payload(), error.payload());
if (error.statusMessage() != null) {
failsafeElement.setErrorMessage(error.statusMessage());
}
if (error.statusCode() != null) {
failsafeElement.setErrorMessage(
String.format("Splunk write status code: %d", error.statusCode()));
}
context.output(failsafeElement);
}
}));
// 6) Collect errors from UDF transform (#4), SplunkEvent transform (#5)
// and writing to Splunk HEC (#6) and stream into a Pub/Sub deadletter topic.
PCollectionList.of(
ImmutableList.of(
convertToEventTuple.get(SPLUNK_EVENT_DEADLETTER_OUT),
wrappedSplunkWriteErrors,
transformedOutput.get(UDF_DEADLETTER_OUT)))
.apply("FlattenErrors", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
.setErrorRecordsTopic(options.getOutputDeadletterTopic())
.build());
return pipeline.run();
}
/**
* The {@link PubSubToSplunkOptions} class provides the custom options passed by the executor at
* the command line.
*/
public interface PubSubToSplunkOptions
extends SplunkOptions,
PubsubReadSubscriptionOptions,
PubsubWriteDeadletterTopicOptions,
JavascriptTextTransformerOptions {}
/**
* A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
* returns a {@link PCollection} of {@link String} messages.
*/
private static class ReadMessages extends PTransform<PBegin, PCollection<String>> {
private final ValueProvider<String> subscriptionName;
private final ValueProvider<Boolean> inputIncludePubsubMessageFlag;
private Boolean includePubsubMessage;
ReadMessages(
ValueProvider<String> subscriptionName,
ValueProvider<Boolean> inputIncludePubsubMessageFlag) {
this.subscriptionName = subscriptionName;
this.inputIncludePubsubMessageFlag = inputIncludePubsubMessageFlag;
}
@Override
public PCollection<String> expand(PBegin input) {
return input
.apply(
"ReadPubsubMessage",
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriptionName))
.apply(
"ExtractMessageIfRequired",
ParDo.of(
new DoFn<PubsubMessage, String>() {
@Setup
public void setup() {
if (inputIncludePubsubMessageFlag != null) {
includePubsubMessage = inputIncludePubsubMessageFlag.get();
}
includePubsubMessage =
MoreObjects.firstNonNull(
includePubsubMessage, DEFAULT_INCLUDE_PUBSUB_MESSAGE);
LOG.info("includePubsubMessage set to: {}", includePubsubMessage);
}
@ProcessElement
public void processElement(ProcessContext context) {
if (includePubsubMessage) {
context.output(formatPubsubMessage(context.element()));
} else {
context.output(
new String(context.element().getPayload(), StandardCharsets.UTF_8));
}
}
}))
.apply(
"CountMessages",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_MESSAGES_COUNTER.inc();
context.output(context.element());
}
}));
}
}
/**
* Utility method that formats {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage} according
* to the model defined in {@link com.google.pubsub.v1.PubsubMessage}.
*
* @param pubsubMessage {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage}
* @return JSON String that adheres to the model defined in {@link
* com.google.pubsub.v1.PubsubMessage}
*/
@VisibleForTesting
protected static String formatPubsubMessage(PubsubMessage pubsubMessage) {
JsonObject messageJson = new JsonObject();
String payload = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
try {
JsonObject data = GSON.fromJson(payload, JsonObject.class);
messageJson.add(PUBSUB_MESSAGE_DATA_FIELD, data);
} catch (JsonSyntaxException e) {
messageJson.addProperty(PUBSUB_MESSAGE_DATA_FIELD, payload);
}
JsonObject attributes = getAttributesJson(pubsubMessage.getAttributeMap());
messageJson.add(PUBSUB_MESSAGE_ATTRIBUTE_FIELD, attributes);
if (pubsubMessage.getMessageId() != null) {
messageJson.addProperty(PUBSUB_MESSAGE_ID_FIELD, pubsubMessage.getMessageId());
}
return messageJson.toString();
}
/**
* Constructs a {@link JsonObject} from a {@link Map} of Pub/Sub attributes.
*
* @param attributesMap {@link Map} of Pub/Sub attributes
* @return {@link JsonObject} of Pub/Sub attributes
*/
private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
JsonObject attributesJson = new JsonObject();
for (String key : attributesMap.keySet()) {
attributesJson.addProperty(key, attributesMap.get(key));
}
return attributesJson;
}
}
Pub/Sub a archivos Avro en Cloud Storage
La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.
Requisitos para esta canalización:
El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.
Parámetros de la plantilla
Parámetro
Descripción
inputTopic
Tema de Pub/Sub al cual suscribirse para el consumo de mensaje. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory
Directorio de salida en el que se archivan los archivos Avro de salida. Debe contener / al final.
Por ejemplo: gs://example-bucket/example-directory/.
avroTempDirectory
Directorio para los archivos de Avro temporales. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.
outputFilenamePrefix
Prefijo de nombre de archivo de salida para los archivos Avro (opcional).
outputFilenameSuffix
[Opcional] Sufijo de nombre de archivo de salida para los archivos Avro.
outputShardTemplate
[Opcional] Plantilla de fragmentación del archivo de salida. Se especifica como secuencias repetidas de las letras S o N. Por ejemplo, SSS-NNN. Estas se reemplazan por el número de fragmento o la cantidad total de fragmentos, respectivamente. Si no se especifica este parámetro, el formato de plantilla predeterminado es W-P-SS-of-NN.
Ejecuta la plantilla de Pub/Sub a Cloud Storage Avro
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToAvro.Options;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed Avro files at the specified output directory.
*
* <p>Files output will have the following schema:
*
* <pre>
* {
* "type": "record",
* "name": "AvroPubsubMessageRecord",
* "namespace": "com.google.cloud.teleport.avro",
* "fields": [
* {"name": "message", "type": {"type": "array", "items": "bytes"}},
* {"name": "attributes", "type": {"type": "map", "values": "string"}},
* {"name": "timestamp", "type": "long"}
* ]
* }
* </pre>
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToAvro
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-avro
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.avro"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_Avro",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Avro Files on Cloud Storage",
description =
"Streaming pipeline. Reads from a Pub/Sub subscription and outputs windowed Avro files to"
+ " the specified directory.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubsubToAvro {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.")
@Default.String("output")
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 7,
description = "Temporary Avro write directory",
helpText = "Directory for temporary Avro files.")
@Required
ValueProvider<String> getAvroTempDirectory();
void setAvroTempDirectory(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> messages = null;
/*
* Steps:
* 1) Read messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed data into Avro files, one per window by default.
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
messages
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
AvroIO.write(AvroPubsubMessageRecord.class)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input)))
/*.withTempDirectory(FileSystems.matchNewResource(
options.getAvroTempDirectory(),
Boolean.TRUE))
*/
.withWindowedWrites()
.withNumShards(options.getNumShards()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Converts an incoming {@link PubsubMessage} to the {@link AvroPubsubMessageRecord} class by
* copying it's fields and the timestamp of the message.
*/
static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
}
}
Tema de Pub/Sub a archivos de texto en Cloud Storage
La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.
Requisitos para esta canalización:
El tema de Pub/Sub debe existir antes de la ejecución.
Los mensajes publicados en el tema deben tener formato de texto.
Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.
Parámetros de la plantilla
Parámetro
Descripción
inputTopic
El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory
La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix
El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-.
outputFilenameSuffix
El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate
La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
Ejecuta la plantilla de Pub/Sub a archivos de texto en Cloud Storage
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
TOPIC_NAME: Es el nombre del tema de Pub/Sub.
BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToText.Options;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToText
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-text
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* windowDuration=5m,\
* numShards=1,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Text Files on Cloud Storage",
description =
"Streaming pipeline. Reads records from Pub/Sub and writes them to Cloud Storage, creating"
+ " a text file for each five minute window. Note that this pipeline assumes no"
+ " newlines in the body of the Pub/Sub message and thus each message becomes a single"
+ " line in the output file.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from a Pub/Sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
ValueProvider<String> getUserTempLocation();
void setUserTempLocation(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.")
@Default.String("output")
@Required
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static ValueProvider<String> maybeUseUserTempLocation(
ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
return DualInputNestedValueProvider.of(
userTempLocation,
outputLocation,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return (input.getX() != null) ? input.getX() : input.getY();
}
});
}
}
Tema de Pub/Sub o suscripción a archivos de texto en Cloud Storage
El tema de Pub/Sub o la suscripción a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.
Requisitos para esta canalización:
El tema o la suscripción a Pub/Sub deben existir antes de la ejecución.
Los mensajes publicados en el tema deben tener formato de texto.
Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.
Parámetros de la plantilla
Parámetro
Descripción
inputTopic
El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>. Si se proporciona este parámetro, no se debe proporcionar inputSubscription.
inputSubscription
El tema de Pub/Sub desde el que se lee la entrada. El nombre de la suscripción debe tener el formato projects/<project-id>/subscription/<subscription-name>. Si se proporciona este parámetro, no se debe proporcionar inputTopic.
outputDirectory
La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix
El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-.
outputFilenameSuffix
El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate
La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
windowDuration
La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida (opcional). Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 min, con un mínimo de 1 s. Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h).
Ejecuta la plantilla del tema o suscripción de Pub/Sub a archivos de texto en Cloud Storage
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
JOB_NAME:
Es el nombre del trabajo que elijas
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
/*
* Copyright (C) 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.pubsubtotext;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* export PROJECT={project id}
* export TEMPLATE_MODULE=googlecloud-to-googlecloud
* export TEMPLATE_NAME=pubsub-to-text
* export BUCKET_NAME=gs://{bucket name}
* export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${TEMPLATE_NAME}-image
* export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java11-template-launcher-base
* export BASE_CONTAINER_IMAGE_VERSION=latest
* export APP_ROOT=/template/${TEMPLATE_NAME}
* export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_NAME}-command-spec.json
* export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_NAME}-image-spec.json
*
* gcloud config set project ${PROJECT}
*
* # Build and push image to Google Container Repository
* mvn package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC} \
* -Djib.applicationCache=/tmp/jib-cache \
* -am -pl ${TEMPLATE_MODULE}
*
* # Create and upload image spec
* echo '{
* "image":"'${TARGET_GCR_IMAGE}'",
* "metadata":{
* "name":"Pub/Sub to text",
* "description":"Write Pub/Sub messages to GCS text files.",
* "parameters":[
* {
* "name":"inputSubscription",
* "label":"Pub/Sub subscription to read from",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"inputTopic",
* "label":"Pub/Sub topic to read from",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"outputDirectory",
* "label":"Directory to output files to",
* "paramType":"TEXT",
* "isOptional":false
* },
* {
* "name":"outputFilenamePrefix",
* "label":"The filename prefix of the files to write to",
* "paramType":"TEXT",
* "isOptional":false
* },
* {
* "name":"outputFilenameSuffix",
* "label":"The suffix of the files to write to",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"userTempLocation",
* "label":"The directory to output temporary files to",
* "paramType":"TEXT",
* "isOptional":true
* }
* ]
* },
* "sdk_info":{"language":"JAVA"}
* }' > image_spec.json
* gsutil cp image_spec.json ${TEMPLATE_IMAGE_SPEC}
* rm image_spec.json
*
* # Run template
* export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
* gcloud beta dataflow flex-template run ${JOB_NAME} \
* --project=${PROJECT} --region=us-central1 \
* --template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
* --parameters inputTopic={topic},outputDirectory={directory},outputFilenamePrefix={prefix}
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
description =
"Streaming pipeline. Reads records from Pub/Sub Subscription or Topic and writes them to"
+ " Cloud Storage, creating a text file for each five minute window. Note that this"
+ " pipeline assumes no newlines in the body of the Pub/Sub message and thus each"
+ " message becomes a single line in the output file.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-text",
contactInformation = "https://cloud.google.com/support")
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubTopic(
order = 1,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 2,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
@Required
String getOutputDirectory();
void setOutputDirectory(String value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
String getUserTempLocation();
void setUserTempLocation(String value);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
@Default.String("output")
@Required
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.",
example = ".txt")
@Default.String("")
String getOutputFilenameSuffix();
void setOutputFilenameSuffix(String value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}
Pub/Sub a MongoDB
La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos.
Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript. Cualquier error que se produjo debido a una falta de coincidencia del esquema, JSON con errores o mientras se ejecutaban transformaciones se registra en una tabla de BigQuery para mensajes no procesados junto con un mensaje de entrada. Si no existe una tabla para los registros no procesados antes de la ejecución, la canalización crea esta tabla de forma automática.
Requisitos para esta canalización:
La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.
Parámetros de la plantilla
Parámetro
Descripción
inputSubscription
Nombre de la suscripción a Pub/Sub. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
mongoDBUri
Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017.
database
La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db.
collection
Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable
La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
batchSize
El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
batchSizeBytes
El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880.
maxConnectionIdleTime
El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000.
sslEnabled
El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true.
ignoreSSLCertificate
El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true.
withOrdered
El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true.
withSSLInvalidHostNameAllowed
El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true.
Ejecuta la plantilla de Pub/Sub a MongoDB
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
REGION_NAME:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
JOB_NAME:
Es el nombre del trabajo que elijas
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
PROJECT_ID:
El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
LOCATION:
El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
JOB_NAME:
Es el nombre del trabajo que elijas
VERSION:
Es la versión de la plantilla que deseas usar.
Puedes usar los siguientes valores:
latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.PubSubToMongoDB.Options;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToMongoDB} pipeline is a streaming pipeline which ingests data in JSON format
* from PubSub, applies a Javascript UDF if provided and inserts resulting records as Bson Document
* in MongoDB. If the element fails to be processed then it is written to a deadletter table in
* BigQuery.
*
* <p><b>Pipeline Requirements</b