Modèles Dataflow de traitement par flux fournis par Google
Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Google fournit un ensemble de modèles Dataflow Open Source.
Ces modèles Dataflow peuvent vous aider à traiter des tâches de données volumineuses, y compris l'importation, l'exportation, la sauvegarde et la restauration de données, ainsi que les opérations d'API groupées, le tout sans avoir à utiliser un environnement de développement dédié. Les modèles sont basés sur Apache Beam et exploitent Dataflow pour transformer les données.
Le modèle Abonnement Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un abonnement Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.
Conditions requises pour ce pipeline :
Le champ data des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON.
Par exemple, vous pouvez insérer des messages contenant les valeurs du champ data au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant un type de données de chaîne ("string").
La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.
Paramètres de modèle
Paramètres
Description
inputSubscription
Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription>.
outputTableSpec
Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>
outputDeadletterTable
Table BigQuery des messages qui n'ont pas pu atteindre la table de sortie, au format <my-project>:<my-dataset>.<my-table>.
Si elle n'existe pas, elle est créée lors de l'exécution du pipeline.
Si ce paramètre n'est pas spécifié, OUTPUT_TABLE_SPEC_error_records est utilisé à la place.
javascriptTextTransformGcsPath
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
Exécuter le modèle Abonnement Pub/Sub vers BigQuery
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Subscription to BigQuery template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
--region REGION_NAME \
--staging-location STAGING_LOCATION \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Remplacez les éléments suivants :
JOB_NAME : nom de la tâche de votre choix
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
DATASET : votre ensemble de données BigQuery.
TABLE_NAME : nom de votre table BigQuery.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
@Template(
name = "PubSub_Subscription_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub subscription, transforms"
+ " them using a JavaScript user-defined function (UDF), and writes them to a"
+ " pre-existing BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputTopic",
contactInformation = "https://cloud.google.com/support")
@Template(
name = "PubSub_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Topic to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them"
+ " using a JavaScript user-defined function (UDF), and writes them to a pre-existing"
+ " BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The table’s schema must match the "
+ "input JSON objects.")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Input Pub/Sub topic",
helpText = "The Pub/Sub topic to read the input from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateParameter.PubsubSubscription(
order = 3,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
@TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
@Description(
"This determines whether the template reads from a Pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.BigQueryTable(
order = 5,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. It should be in the format"
+ " of \"your-project-id:your-dataset.your-table-name\". If it doesn't exist, it"
+ " will be created during pipeline execution. If not specified,"
+ " \"{outputTableSpec}_error_records\" is used instead.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Sujet Pub/Sub vers BigQuery
Le modèle Sujet Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un sujet Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.
Conditions requises pour ce pipeline :
Le champ data des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON.
Par exemple, vous pouvez insérer des messages contenant les valeurs du champ data au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant un type de données de chaîne ("string").
La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.
Paramètres de modèle
Paramètres
Description
inputTopic
Sujet d'entrée Pub/Sub à lire, au format projects/<project>/topics/<topic>.
outputTableSpec
Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>
outputDeadletterTable
La table BigQuery des messages n'ayant pas pu atteindre la table de sortie Elle doit être au format <my-project>:<my-dataset>.<my-table>.
Si elle n'existe pas, elle est créée lors de l'exécution du pipeline.
Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.
javascriptTextTransformGcsPath
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
Exécuter le modèle Sujet Pub/Sub vers BigQuery
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Topic to BigQuery template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
--region REGION_NAME \
--staging-location STAGING_LOCATION \
--parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Remplacez les éléments suivants :
JOB_NAME : nom de la tâche de votre choix
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
TOPIC_NAME : nom de votre sujet Pub/Sub
DATASET : votre ensemble de données BigQuery.
TABLE_NAME : nom de votre table BigQuery.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
* </pre>
*/
@Template(
name = "PubSub_Subscription_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub subscription, transforms"
+ " them using a JavaScript user-defined function (UDF), and writes them to a"
+ " pre-existing BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputTopic",
contactInformation = "https://cloud.google.com/support")
@Template(
name = "PubSub_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Topic to BigQuery",
description =
"Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them"
+ " using a JavaScript user-defined function (UDF), and writes them to a pre-existing"
+ " BigQuery table as BigQuery elements.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The table’s schema must match the "
+ "input JSON objects.")
ValueProvider<String> getOutputTableSpec();
void setOutputTableSpec(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Input Pub/Sub topic",
helpText = "The Pub/Sub topic to read the input from.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateParameter.PubsubSubscription(
order = 3,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
@TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
@Description(
"This determines whether the template reads from a Pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.BigQueryTable(
order = 5,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. It should be in the format"
+ " of \"your-project-id:your-dataset.your-table-name\". If it doesn't exist, it"
+ " will be created during pipeline execution. If not specified,"
+ " \"{outputTableSpec}_error_records\" is used instead.")
ValueProvider<String> getOutputDeadletterTable();
void setOutputDeadletterTable(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
* defaultDeadLetterTableSuffix is returned instead.
*/
private static ValueProvider<String> maybeUseDefaultDeadletterTable(
ValueProvider<String> deadletterTable,
ValueProvider<String> outputTableSpec,
String defaultDeadLetterTableSuffix) {
return DualInputNestedValueProvider.of(
deadletterTable,
outputTableSpec,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
String userProvidedTable = input.getX();
String outputTableSpec = input.getY();
if (userProvidedTable == null) {
return outputTableSpec + defaultDeadLetterTableSuffix;
}
return userProvidedTable;
}
});
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
PCollectionTuple udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
Pub/Sub Avro vers BigQuery
Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery.
Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.
Conditions requises pour ce pipeline
L'abonnement Pub/Sub d'entrée doit exister.
Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
Le sujet Pub/Sub non traité doit exister.
L'ensemble de données BigQuery de sortie doit exister.
Paramètres de modèle
Paramètres
Description
schemaPath
Emplacement Cloud Storage du fichier de schéma Avro. Par exemple, gs://path/to/my/schema.avsc.
inputSubscription
Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription>.
outputTopic
Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name>.
outputTableSpec
Emplacement de la table de sortie BigQuery. Par exemple, <my-project>:<my-dataset>.<my-table>.
Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur.
writeDisposition
(Facultatif) La propriété WriteDisposition de BigQuery.
Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Par défaut, WRITE_APPEND.
createDisposition
(Facultatif) La propriété CreateDisposition de BigQuery.
Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. Par défaut, CREATE_IF_NEEDED.
Exécuter le modèle Pub/Sub Avro vers BigQuery
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
BIGQUERY_TABLE : nom de la table de sortie BigQuery
DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
BIGQUERY_TABLE : nom de la table de sortie BigQuery
DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubAvroToBigQuery.PubsubAvroToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.Row;
/**
* A Dataflow pipeline to stream <a href="https://avro.apache.org/">Apache Avro</a> records from
* Pub/Sub into a BigQuery table.
*
* <p>Any persistent failures while writing to BigQuery will be written to a Pub/Sub dead-letter
* topic.
*/
@Template(
name = "PubSub_Avro_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Avro to BigQuery",
description =
"A streaming pipeline which inserts Avro records from a Pub/Sub subscription into a"
+ " BigQuery table.",
optionsClass = PubsubAvroToBigQueryOptions.class,
flexContainerName = "pubsub-avro-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public final class PubsubAvroToBigQuery {
/**
* Validates input flags and executes the Dataflow pipeline.
*
* @param args command line arguments to the pipeline
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
PubsubAvroToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubAvroToBigQueryOptions.class);
run(options);
}
/**
* Provides custom {@link org.apache.beam.sdk.options.PipelineOptions} required to execute the
* {@link PubsubAvroToBigQuery} pipeline.
*/
public interface PubsubAvroToBigQueryOptions
extends ReadSubscriptionOptions,
WriteOptions,
WriteTopicOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage path to the Avro schema file",
helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.")
@Required
String getSchemaPath();
void setSchemaPath(String schemaPath);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options execution parameters to the pipeline
* @return result of the pipeline execution as a {@link PipelineResult}
*/
private static PipelineResult run(PubsubAvroToBigQueryOptions options) {
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
Schema schema = SchemaUtils.getAvroSchema(options.getSchemaPath());
WriteResult writeResults =
pipeline
.apply(
"Read Avro records",
PubsubIO.readAvroGenericRecords(schema)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic()))
// Workaround for BEAM-12256. Eagerly convert to rows to avoid
// the RowToGenericRecord function that doesn't handle all data
// types.
// TODO: Remove this workaround when a fix for BEAM-12256 is
// released.
.apply(Convert.toRows())
.apply(
"Write to BigQuery",
BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResults, options)
.apply(
"Create error payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<GenericRecord>newBuilder()
.setPayloadCoder(AvroCoder.of(schema))
.setTranslateFunction(BigQueryConverters.TableRowToGenericRecordFn.of(schema))
.build())
.apply("Write failed records", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}
Proto Pub/Sub vers BigQuery
Le modèle proto Pub/Sub vers BigQuery est un pipeline de streaming qui ingère les données proto d'un abonnement Pub/Sub dans une table BigQuery.
Les erreurs qui se produisent lors de l'écriture dans la table BigQuery sont insérées en flux continu dans un sujet Pub/Sub non traité.
Une fonction définie par l'utilisateur (UDF) JavaScript peut être fournie pour transformer les données. Les erreurs lors de l'exécution de l'UDF peuvent être envoyées à un sujet Pub/Sub distinct ou au même sujet non traité que les erreurs BigQuery.
Conditions requises pour ce pipeline :
L'abonnement Pub/Sub d'entrée doit exister.
Le fichier de schéma des enregistrements proto doit exister dans Cloud Storage.
Le sujet Pub/Sub de sortie doit exister.
L'ensemble de données BigQuery de sortie doit exister.
Si la table BigQuery existe, elle doit posséder un schéma correspondant aux données proto, quelle que soit la valeur de createDisposition.
Paramètres de modèle
Paramètres
Description
protoSchemaPath
Emplacement Cloud Storage du fichier de schéma proto autonome. Par exemple, gs://path/to/my/file.pb.
Ce fichier peut être généré avec l'option --descriptor_set_out de la commande protoc.
L'option --include_imports garantit que le fichier est autonome.
fullMessageName
Nom complet du message proto. Par exemple, package.name.MessageName, où package.name est la valeur fournie pour l'instruction package, et non pour l'instruction java_package.
inputSubscription
Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription>.
outputTopic
Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name>.
outputTableSpec
Emplacement de la table de sortie BigQuery. Par exemple, my-project:my_dataset.my_table.
Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du fichier de schéma d'entrée.
preserveProtoFieldNames
(Facultatif) true pour conserver le nom du champ Proto d'origine au format JSON. false pour utiliser des noms JSON plus standards.
Par exemple, false remplace field_name par fieldName. (Par défaut : false)
bigQueryTableSchemaPath
(Facultatif) Chemin d'accès Cloud Storage vers le chemin d'accès du schéma BigQuery. Par exemple, gs://path/to/my/schema.json. S'il n'est pas fourni, le schéma est obtenu à partir du schéma Proto.
javascriptTextTransformGcsPath
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
udfOutputTopic
(Facultatif) Sujet Pub/Sub stockant les erreurs UDF. Par exemple : projects/<project-id>/topics/<topic-name> Si cet élément n'est pas fourni, les erreurs UDF sont envoyées au même sujet que outputTopic.
writeDisposition
(Facultatif) La disposition WriteDisposition de BigQuery.
Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Valeur par défaut : WRITE_APPEND.
createDisposition
(Facultatif) La disposition CreateDisposition de BigQuery.
Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. Valeur par défaut : CREATE_IF_NEEDED.
Exécuter le modèle Proto Pub/Sub vers BigQuery
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Proto to BigQuery template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple, gs://MyBucket/file.pb)
PROTO_MESSAGE_NAME : nom du message Proto (par exemple, package.name.MessageName)
SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
BIGQUERY_TABLE : nom de la table de sortie BigQuery
UNPROCESSED_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple, gs://MyBucket/file.pb)
PROTO_MESSAGE_NAME : nom du message Proto (par exemple, package.name.MessageName)
SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
BIGQUERY_TABLE : nom de la table de sortie BigQuery
UNPROCESSED_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.ArrayUtils;
/**
* A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
* records from Pub/Sub to BigQuery.
*
* <p>Persistent failures are written to a Pub/Sub unprocessed topic.
*/
@Template(
name = "PubSub_Proto_to_BigQuery",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Proto to BigQuery",
description =
"A streaming pipeline that reads Protobuf messages from a Pub/Sub subscription and writes"
+ " them to a BigQuery table.",
optionsClass = PubSubProtoToBigQueryOptions.class,
flexContainerName = "pubsub-proto-to-bigquery",
contactInformation = "https://cloud.google.com/support")
public final class PubsubProtoToBigQuery {
private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
public static void main(String[] args) {
UncaughtExceptionLogger.register();
run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
}
/** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
public interface PubSubProtoToBigQueryOptions
extends ReadSubscriptionOptions,
WriteOptions,
WriteTopicOptions,
JavascriptTextTransformerOptions,
BigQueryStorageApiStreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage Path to the Proto Schema File",
helpText =
"Cloud Storage path to a self-contained descriptor set file. Example:"
+ " gs://MyBucket/schema.pb. `schema.pb` can be generated by adding"
+ " `--descriptor_set_out=schema.pb` to the `protoc` command that compiles the"
+ " protos. The `--include_imports` flag can be used to guarantee that the file is"
+ " self-contained.")
@Required
String getProtoSchemaPath();
void setProtoSchemaPath(String value);
@TemplateParameter.Text(
order = 2,
regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
description = "Full Proto Message Name",
helpText =
"The full message name (example: package.name.MessageName). If the message is nested"
+ " inside of another message, then include all messages with the '.' delimiter"
+ " (example: package.name.OuterMessage.InnerMessage). 'package.name' should be"
+ " from the `package` statement, not the `java_package` statement.")
@Required
String getFullMessageName();
void setFullMessageName(String value);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Preserve Proto Field Names",
helpText =
"Flag to control whether proto field names should be kept or converted to"
+ " lowerCamelCase. If the table already exists, this should be based on what"
+ " matches the table's schema. Otherwise, it will determine the column names of"
+ " the created table. True to preserve proto snake_case. False will convert fields"
+ " to lowerCamelCase. (Default: false)")
@Default.Boolean(false)
Boolean getPreserveProtoFieldNames();
void setPreserveProtoFieldNames(Boolean value);
@TemplateParameter.GcsReadFile(
order = 4,
optional = true,
description = "BigQuery Table Schema Path",
helpText =
"Cloud Storage path to the BigQuery schema JSON file. "
+ "If this is not set, then the schema is inferred "
+ "from the Proto schema.",
example = "gs://MyBucket/bq_schema.json")
String getBigQueryTableSchemaPath();
void setBigQueryTableSchemaPath(String value);
@TemplateParameter.PubsubTopic(
order = 5,
optional = true,
description = "Pub/Sub output topic for UDF failures",
helpText =
"An optional output topic to send UDF failures to. If this option is not set, then"
+ " failures will be written to the same topic as the BigQuery failures.",
example = "projects/your-project-id/topics/your-topic-name")
String getUdfOutputTopic();
void setUdfOutputTopic(String udfOutputTopic);
}
/** Runs the pipeline and returns the results. */
private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
Pipeline pipeline = Pipeline.create(options);
Descriptor descriptor = getDescriptor(options);
PCollection<String> maybeForUdf =
pipeline
.apply("Read From Pubsub", readPubsubMessages(options, descriptor))
.apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
WriteResult writeResult =
runUdf(maybeForUdf, options)
.apply("Write to BigQuery", writeToBigQuery(options, descriptor));
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"Create Error Payload",
ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
.setPayloadCoder(StringUtf8Coder.of())
.setTranslateFunction(BigQueryConverters::tableRowToJson)
.build())
.apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
return pipeline.run();
}
/** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
@VisibleForTesting
static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
String schemaPath = options.getProtoSchemaPath();
String messageName = options.getFullMessageName();
Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
if (descriptor == null) {
throw new IllegalArgumentException(
messageName + " is not a recognized message in " + schemaPath);
}
return descriptor;
}
/** Returns the {@link PTransform} for reading Pub/Sub messages. */
private static Read<DynamicMessage> readPubsubMessages(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
return PubsubIO.readProtoDynamicMessages(descriptor)
.fromSubscription(options.getInputSubscription())
.withDeadLetterTopic(options.getOutputTopic());
}
/**
* Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
*
* <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
* specified in {@code options}.
*/
@VisibleForTesting
static Write<String> writeToBigQuery(
PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
Write<String> write =
BigQueryConverters.<String>createWriteTransform(options)
.withFormatFunction(BigQueryConverters::convertJsonToTableRow);
String schemaPath = options.getBigQueryTableSchemaPath();
if (Strings.isNullOrEmpty(schemaPath)) {
return write.withSchema(
SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
} else {
return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
}
}
/** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
private static class ConvertDynamicProtoMessageToJson
extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
private final boolean preserveProtoName;
private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
this.preserveProtoName = options.getPreserveProtoFieldNames();
}
@Override
public PCollection<String> expand(PCollection<DynamicMessage> input) {
return input.apply(
"Map to JSON",
MapElements.into(TypeDescriptors.strings())
.via(
message -> {
try {
JsonFormat.Printer printer = JsonFormat.printer();
return preserveProtoName
? printer.preservingProtoFieldNames().print(message)
: printer.print(message);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}));
}
}
/**
* Handles running the UDF.
*
* <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
*
* <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
* topic.
*
* @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
* @param options the options containing info on running the UDF
* @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
* called
*/
@VisibleForTesting
static PCollection<String> runUdf(
PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
// In order to avoid generating a graph that makes it look like a UDF was called when none was
// intended, simply return the input as "success" output.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) {
return jsonCollection;
}
// For testing purposes, we need to do this check before creating the PTransform rather than
// in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
// a value.
if (Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
throw new IllegalArgumentException(
"JavaScript function name cannot be null or empty if file is set");
}
PCollectionTuple maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
maybeSuccess
.get(UDF_FAILURE_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Failures",
ConvertFailsafeElementToPubsubMessage.<String, String>builder()
.setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
.setErrorMessageAttributeKey("udfErrorMessage")
.build())
.apply("Write Failed UDF", writeUdfFailures(options));
return maybeSuccess
.get(UDF_SUCCESS_TAG)
.setCoder(FAILSAFE_CODER)
.apply(
"Get UDF Output",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
.setCoder(NullableCoder.of(StringUtf8Coder.of()));
}
/** {@link PTransform} that calls a UDF and returns both success and failure output. */
private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
private final PubSubProtoToBigQueryOptions options;
RunUdf(PubSubProtoToBigQueryOptions options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<String> input) {
return input
.apply("Prepare Failsafe UDF", makeFailsafe())
.setCoder(FAILSAFE_CODER)
.apply(
"Call UDF",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setSuccessTag(UDF_SUCCESS_TAG)
.setFailureTag(UDF_FAILURE_TAG)
.build());
}
private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
.via((String json) -> FailsafeElement.of(json, json));
}
}
/**
* Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
* topic.
*/
private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
PubSubProtoToBigQueryOptions options) {
PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
return Strings.isNullOrEmpty(options.getUdfOutputTopic())
? write.to(options.getOutputTopic())
: write.to(options.getUdfOutputTopic());
}
}
Pub/Sub vers Pub/Sub
Le modèle Pub/Sub vers Pub/Sub est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et les écrit dans un autre sujet Pub/Sub. Le pipeline accepte également une clé facultative d'attribut de message et une valeur qui peut être utilisée pour filtrer les messages devant être écrits dans le sujet Pub/Sub. Vous pouvez utiliser ce modèle pour copier des messages d'un abonnement Pub/Sub à un autre sujet Pub/Sub avec un filtre de message facultatif.
Conditions requises pour ce pipeline :
L'abonnement Pub/Sub source doit exister avant l'exécution.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Pub/Sub template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
TOPIC_NAME : nom du sujet Pub/Sub
FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie.
Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
TOPIC_NAME : nom du sujet Pub/Sub
FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie.
Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubsubToPubsub.Options;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A template that copies messages from one Pubsub subscription to another Pubsub topic. */
@Template(
name = "Cloud_PubSub_to_Cloud_PubSub",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Pub/Sub",
description =
"Streaming pipeline. Reads from a Pub/Sub subscription and writes to a Pub/Sub topic. ",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class PubsubToPubsub {
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/**
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription. 2) Apply any
* filters if an attribute=value pair is provided. 3) Write each PubSubMessage to output PubSub
* topic.
*/
pipeline
.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(
"Filter Events If Enabled",
ParDo.of(
ExtractAndFilterEventsFn.newBuilder()
.withFilterKey(options.getFilterKey())
.withFilterValue(options.getFilterValue())
.build()))
.apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Options supported by {@link PubsubToPubsub}.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
@Validation.Required
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> inputSubscription);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Output Pub/Sub topic",
helpText =
"The name of the topic to which data should published, in the format of 'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
@TemplateParameter.Text(
order = 3,
optional = true,
description = "Event filter key",
helpText =
"Attribute key by which events are filtered. No filters are applied if no key is specified.")
ValueProvider<String> getFilterKey();
void setFilterKey(ValueProvider<String> filterKey);
@TemplateParameter.Text(
order = 4,
optional = true,
description = "Event filter value",
helpText =
"Filter attribute value to use if an event filter key is provided. Accepts a valid "
+ "Java Regex string as an event filter value. In case a regex is provided, the complete "
+ "expression should match in order for the message to be filtered. Partial matches (e.g. "
+ "substring) will not be filtered. A null event filter value is used by default.")
ValueProvider<String> getFilterValue();
void setFilterValue(ValueProvider<String> filterValue);
}
/**
* DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
* publish events that pass the filter else, it will publish all input events.
*/
@AutoValue
public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);
// Counter tracking the number of incoming Pub/Sub messages.
private static final Counter INPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");
// Counter tracking the number of output Pub/Sub messages after the user provided filter
// is applied.
private static final Counter OUTPUT_COUNTER =
Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");
private Boolean doFilter;
private String inputFilterKey;
private Pattern inputFilterValueRegex;
private Boolean isNullFilterValue;
public static Builder newBuilder() {
return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
}
@Nullable
abstract ValueProvider<String> filterKey();
@Nullable
abstract ValueProvider<String> filterValue();
@Setup
public void setup() {
if (this.doFilter != null) {
return; // Filter has been evaluated already
}
inputFilterKey = (filterKey() == null ? null : filterKey().get());
if (inputFilterKey == null) {
// Disable input message filtering.
this.doFilter = false;
} else {
this.doFilter = true; // Enable filtering.
String inputFilterValue = (filterValue() == null ? null : filterValue().get());
if (inputFilterValue == null) {
LOG.warn(
"User provided a NULL for filterValue. Only messages with a value of NULL for the"
+ " filterKey: {} will be filtered forward",
inputFilterKey);
// For backward compatibility, we are allowing filtering by null filterValue.
this.isNullFilterValue = true;
this.inputFilterValueRegex = null;
} else {
this.isNullFilterValue = false;
try {
inputFilterValueRegex = getFilterPattern(inputFilterValue);
} catch (PatternSyntaxException e) {
LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
throw new RuntimeException(e);
}
}
LOG.info(
"Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
}
}
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_COUNTER.inc();
if (!this.doFilter) {
// Filter is not enabled
writeOutput(context, context.element());
} else {
PubsubMessage message = context.element();
String extractedValue = message.getAttribute(this.inputFilterKey);
if (this.isNullFilterValue) {
if (extractedValue == null) {
// If we are filtering for null and the extracted value is null, we forward
// the message.
writeOutput(context, message);
}
} else {
if (extractedValue != null
&& this.inputFilterValueRegex.matcher(extractedValue).matches()) {
// If the extracted value is not null and it matches the filter,
// we forward the message.
writeOutput(context, message);
}
}
}
}
/**
* Write a {@link PubsubMessage} and increment the output counter.
*
* @param context {@link ProcessContext} to write {@link PubsubMessage} to.
* @param message {@link PubsubMessage} output.
*/
private void writeOutput(ProcessContext context, PubsubMessage message) {
OUTPUT_COUNTER.inc();
context.output(message);
}
/**
* Return a {@link Pattern} based on a user provided regex string.
*
* @param regex Regex string to compile.
* @return {@link Pattern}
* @throws PatternSyntaxException If the string is an invalid regex.
*/
private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
checkNotNull(regex, "Filter regex cannot be null.");
return Pattern.compile(regex);
}
/** Builder class for {@link ExtractAndFilterEventsFn}. */
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilterKey(ValueProvider<String> filterKey);
abstract Builder setFilterValue(ValueProvider<String> filterValue);
abstract ExtractAndFilterEventsFn build();
/**
* Method to set the filterKey used for filtering messages.
*
* @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterKey(ValueProvider<String> filterKey) {
checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
return setFilterKey(filterKey);
}
/**
* Method to set the filterValue used for filtering messages.
*
* @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
* @return {@link Builder}
*/
public Builder withFilterValue(ValueProvider<String> filterValue) {
checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
return setFilterValue(filterValue);
}
}
}
}
Pub/Sub vers Splunk
Le modèle Pub/Sub vers Splunk est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et écrit leur charge utile dans Splunk via la solution HEC (HTTP Event Collector) de Splunk. Le cas d'utilisation le plus courant de ce modèle est l'exportation de journaux vers Splunk. Pour découvrir un exemple du workflow sous-jacent, consultez la section Déployer des exportations de journaux prêtes pour la production vers Splunk à l'aide de Dataflow.
Avant d'écrire vers Splunk, vous pouvez également appliquer une fonction JavaScript définie par l'utilisateur vers la charge utile du message. Tous les messages dont le traitement échoue sont transférés vers un sujet Pub/Sub non traité en vue d'opérations de dépannage supplémentaires et d'un nouveau traitement.
Pour ajouter une couche de protection à votre jeton HEC, vous pouvez également transmettre une clé Cloud KMS ainsi que le paramètre de jeton HEC encodé en base64 chiffré avec cette clé.
Pour en savoir plus sur le chiffrement des paramètres du jeton HEC, consultez la page sur le point de terminaison du chiffrement de l'API Cloud KMS.
Conditions requises pour ce pipeline :
L'abonnement Pub/Sub source doit exister avant l'exécution du pipeline.
Le sujet Pub/Sub non traité doit exister avant l'exécution du pipeline.
Le point de terminaison Splunk HEC doit être accessible à partir du réseau de nœuds de calcul Dataflow.
Le jeton de la solution HEC de Splunk doit être généré et disponible.
Paramètres de modèle
Paramètres
Description
inputSubscription
Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
token
(Facultatif) Jeton d'authentification HEC Splunk. Doit être spécifié si tokenSource est défini sur PLAINTEXT ou KMS.
url
URL HEC Splunk. Il doit être routable depuis le VPC dans lequel le pipeline est exécuté. Par exemple, https://splunk-hec-host:8088.
outputDeadletterTopic
Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
batchCount
(Facultatif) Taille de lot pour l'envoi de plusieurs événements vers Splunk. Valeur par défaut 1 (pas de traitement par lots).
parallelism
(Facultatif) Nombre maximal de demandes en parallèle. Valeur par défaut 1 (aucun parallélisme).
disableCertificateValidation
(Facultatif) Désactiver la validation du certificat SSL. Valeur par défaut "false" (validation activée). Si la valeur est "true", les certificats ne sont pas validés (tous les certificats sont approuvés) et le paramètre "rootCaCertificatePath" est ignoré.
includePubsubMessage
(Facultatif) Inclure le message Pub/Sub complet dans la charge utile. Valeur "false" par défaut (seul l'élément de données est inclus dans la charge utile).
tokenSource
Source du jeton. Valeurs possibles : PLAINTEXT, KMS ou SECRET_MANAGER. Ce paramètre doit être spécifié si Secret Manager est utilisé.
Si tokenSource est défini sur KMS, tokenKMSEncryptionKey et le token chiffré doivent être spécifiés.
Si tokenSource est défini sur SECRET_MANAGER, tokenSecretIddoit être spécifié.
Si tokenSource est défini sur PLAINTEXT, tokendoit être spécifié.
tokenKMSEncryptionKey
(Facultatif) Clé Cloud KMS permettant de déchiffrer la chaîne du jeton HEC. Ce paramètre doit être spécifié si tokenSource est défini sur KMS.
Si la clé Cloud KMS est fournie, la chaîne du jeton HEC doit être transmise sous forme chiffrée.
tokenSecretId
(Facultatif) ID du secret fourni par Secret Manager pour le jeton. Ce paramètre doit être spécifié si tokenSource est défini sur SECRET_MANAGER.
Format requis : projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath
(Facultatif) URL complète du certificat CA racine dans Cloud Storage. Par exemple, gs://mybucket/mycerts/privateCA.crt. Le certificat fourni dans Cloud Storage doit être encodé au format DER et peut être fourni en encodage binaire ou imprimable (base64).
Si le certificat est fourni avec un encodage en base64, il doit être délimité par "------BEGIN CERTIFICATE-----" au début et par "-----END CERTIFICATE-----" à la fin. Si ce paramètre est fourni, ce fichier de certificat CA privé est extrait et ajouté au trust store du nœud de calcul Dataflow pour vérifier le certificat SSL du point de terminaison HEC de Splunk.
Si ce paramètre n'est pas fourni, le trust store par défaut est utilisé.
enableBatchLogs
(Facultatif) Spécifie si les journaux doivent être activés pour les lots écrits dans Splunk. Valeur par défaut : true
enableGzipHttpCompression
(Facultatif) Indique si les requêtes HTTP envoyées à la solution HEC de Splunk doivent être compressées (codage de contenu gzip). Valeur par défaut : true
Exécuter le modèle Pub/Sub vers Splunk
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Splunk template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
TOKEN : jeton HTTP Event Collector de Splunk
URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
JAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
PATH_TO_JAVASCRIPT_UDF_FILE :
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
ROOT_CA_CERTIFICATE_PATH : chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple, gs://your-bucket/privateCA.crt)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
TOKEN : jeton HTTP Event Collector de Splunk
URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
JAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
PATH_TO_JAVASCRIPT_UDF_FILE :
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
ROOT_CA_CERTIFICATE_PATH : chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple, gs://your-bucket/privateCA.crt)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.splunk.SplunkEvent;
import com.google.cloud.teleport.splunk.SplunkEventCoder;
import com.google.cloud.teleport.splunk.SplunkIO;
import com.google.cloud.teleport.splunk.SplunkWriteError;
import com.google.cloud.teleport.templates.PubSubToSplunk.PubSubToSplunkOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadSubscriptionOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubWriteDeadletterTopicOptions;
import com.google.cloud.teleport.templates.common.SplunkConverters;
import com.google.cloud.teleport.templates.common.SplunkConverters.SplunkOptions;
import com.google.cloud.teleport.util.TokenNestedValueProvider;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToSplunk} pipeline is a streaming pipeline which ingests data from Cloud
* Pub/Sub, executes a UDF, converts the output to {@link SplunkEvent}s and writes those records
* into Splunk's HEC endpoint. Any errors which occur in the execution of the UDF, conversion to
* {@link SplunkEvent} or writing to HEC will be streamed into a Pub/Sub topic.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The source Pub/Sub subscription exists.
* <li>HEC end-point is routable from the VPC where the Dataflow job executes.
* <li>Deadletter topic exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToSplunk \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --runner=${RUNNER}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-splunk-$USER-`date +"%Y%m%d-%H%M%S%z"`
* BATCH_COUNT=1
* PARALLELISM=5
*
* # Execute the templated pipeline:
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template/PubSubToSplunk \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* token=my-splunk-hec-token,\
* url=http://splunk-hec-server-address:8088,\
* batchCount=${BATCH_COUNT},\
* parallelism=${PARALLELISM},\
* disableCertificateValidation=false,\
* outputDeadletterTopic=projects/${PROJECT_ID}/topics/deadletter-topic-name,\
* javascriptTextTransformGcsPath=gs://${BUCKET_NAME}/splunk/js/my-js-udf.js,\
* javascriptTextTransformFunctionName=myUdf"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_Splunk",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Splunk",
description =
"A pipeline that reads from a Pub/Sub subscription and writes to Splunk's HTTP Event Collector (HEC).",
optionsClass = PubSubToSplunkOptions.class,
optionsOrder = {
PubsubReadSubscriptionOptions.class,
SplunkOptions.class,
JavascriptTextTransformerOptions.class,
PubsubWriteDeadletterTopicOptions.class
},
contactInformation = "https://cloud.google.com/support")
public class PubSubToSplunk {
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** Counter to track inbound messages from source. */
private static final Counter INPUT_MESSAGES_COUNTER =
Metrics.counter(PubSubToSplunk.class, "inbound-pubsub-messages");
/** The tag for successful {@link SplunkEvent} conversion. */
private static final TupleTag<SplunkEvent> SPLUNK_EVENT_OUT = new TupleTag<SplunkEvent>() {};
/** The tag for failed {@link SplunkEvent} conversion. */
private static final TupleTag<FailsafeElement<String, String>> SPLUNK_EVENT_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the main output for the UDF. */
private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** The tag for the dead-letter output of the udf. */
private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<String, String>>() {};
/** GSON to process a {@link PubsubMessage}. */
private static final Gson GSON = new Gson();
/** Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToSplunk.class);
private static final Boolean DEFAULT_INCLUDE_PUBSUB_MESSAGE = false;
@VisibleForTesting protected static final String PUBSUB_MESSAGE_ATTRIBUTE_FIELD = "attributes";
@VisibleForTesting protected static final String PUBSUB_MESSAGE_DATA_FIELD = "data";
private static final String PUBSUB_MESSAGE_ID_FIELD = "messageId";
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToSplunk#run(PubSubToSplunkOptions)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
PubSubToSplunkOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToSplunkOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(PubSubToSplunkOptions options) {
Pipeline pipeline = Pipeline.create(options);
// Register coders.
CoderRegistry registry = pipeline.getCoderRegistry();
registry.registerCoderForClass(SplunkEvent.class, SplunkEventCoder.of());
registry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Convert message to FailsafeElement for processing.
* 3) Apply user provided UDF (if any) on the input strings.
* 4) Convert successfully transformed messages into SplunkEvent objects
* 5) Write SplunkEvents to Splunk's HEC end point.
* 5a) Wrap write failures into a FailsafeElement.
* 6) Collect errors from UDF transform (#3), SplunkEvent transform (#4)
* and writing to Splunk HEC (#5) and stream into a Pub/Sub deadletter topic.
*/
// 1) Read messages in from Pub/Sub
PCollection<String> stringMessages =
pipeline.apply(
"ReadMessages",
new ReadMessages(options.getInputSubscription(), options.getIncludePubsubMessage()));
// 2) Convert message to FailsafeElement for processing.
PCollectionTuple transformedOutput =
stringMessages
.apply(
"ConvertToFailsafeElement",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(input -> FailsafeElement.of(input, input)))
// 3) Apply user provided UDF (if any) on the input strings.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setLoggingEnabled(ValueProvider.StaticValueProvider.of(true))
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
// 4) Convert successfully transformed messages into SplunkEvent objects
PCollectionTuple convertToEventTuple =
transformedOutput
.get(UDF_OUT)
.apply(
"ConvertToSplunkEvent",
SplunkConverters.failsafeStringToSplunkEvent(
SPLUNK_EVENT_OUT, SPLUNK_EVENT_DEADLETTER_OUT));
// 5) Write SplunkEvents to Splunk's HEC end point.
PCollection<SplunkWriteError> writeErrors =
convertToEventTuple
.get(SPLUNK_EVENT_OUT)
.apply(
"WriteToSplunk",
SplunkIO.writeBuilder()
.withToken(
new TokenNestedValueProvider(
options.getTokenSecretId(),
options.getTokenKMSEncryptionKey(),
options.getToken(),
options.getTokenSource()))
.withUrl(options.getUrl())
.withBatchCount(options.getBatchCount())
.withParallelism(options.getParallelism())
.withDisableCertificateValidation(options.getDisableCertificateValidation())
.withRootCaCertificatePath(options.getRootCaCertificatePath())
.withEnableBatchLogs(options.getEnableBatchLogs())
.withEnableGzipHttpCompression(options.getEnableGzipHttpCompression())
.build());
// 5a) Wrap write failures into a FailsafeElement.
PCollection<FailsafeElement<String, String>> wrappedSplunkWriteErrors =
writeErrors.apply(
"WrapSplunkWriteErrors",
ParDo.of(
new DoFn<SplunkWriteError, FailsafeElement<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
SplunkWriteError error = context.element();
FailsafeElement<String, String> failsafeElement =
FailsafeElement.of(error.payload(), error.payload());
if (error.statusMessage() != null) {
failsafeElement.setErrorMessage(error.statusMessage());
}
if (error.statusCode() != null) {
failsafeElement.setErrorMessage(
String.format("Splunk write status code: %d", error.statusCode()));
}
context.output(failsafeElement);
}
}));
// 6) Collect errors from UDF transform (#4), SplunkEvent transform (#5)
// and writing to Splunk HEC (#6) and stream into a Pub/Sub deadletter topic.
PCollectionList.of(
ImmutableList.of(
convertToEventTuple.get(SPLUNK_EVENT_DEADLETTER_OUT),
wrappedSplunkWriteErrors,
transformedOutput.get(UDF_DEADLETTER_OUT)))
.apply("FlattenErrors", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
.setErrorRecordsTopic(options.getOutputDeadletterTopic())
.build());
return pipeline.run();
}
/**
* The {@link PubSubToSplunkOptions} class provides the custom options passed by the executor at
* the command line.
*/
public interface PubSubToSplunkOptions
extends SplunkOptions,
PubsubReadSubscriptionOptions,
PubsubWriteDeadletterTopicOptions,
JavascriptTextTransformerOptions {}
/**
* A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
* returns a {@link PCollection} of {@link String} messages.
*/
private static class ReadMessages extends PTransform<PBegin, PCollection<String>> {
private final ValueProvider<String> subscriptionName;
private final ValueProvider<Boolean> inputIncludePubsubMessageFlag;
private Boolean includePubsubMessage;
ReadMessages(
ValueProvider<String> subscriptionName,
ValueProvider<Boolean> inputIncludePubsubMessageFlag) {
this.subscriptionName = subscriptionName;
this.inputIncludePubsubMessageFlag = inputIncludePubsubMessageFlag;
}
@Override
public PCollection<String> expand(PBegin input) {
return input
.apply(
"ReadPubsubMessage",
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriptionName))
.apply(
"ExtractMessageIfRequired",
ParDo.of(
new DoFn<PubsubMessage, String>() {
@Setup
public void setup() {
if (inputIncludePubsubMessageFlag != null) {
includePubsubMessage = inputIncludePubsubMessageFlag.get();
}
includePubsubMessage =
MoreObjects.firstNonNull(
includePubsubMessage, DEFAULT_INCLUDE_PUBSUB_MESSAGE);
LOG.info("includePubsubMessage set to: {}", includePubsubMessage);
}
@ProcessElement
public void processElement(ProcessContext context) {
if (includePubsubMessage) {
context.output(formatPubsubMessage(context.element()));
} else {
context.output(
new String(context.element().getPayload(), StandardCharsets.UTF_8));
}
}
}))
.apply(
"CountMessages",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
INPUT_MESSAGES_COUNTER.inc();
context.output(context.element());
}
}));
}
}
/**
* Utility method that formats {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage} according
* to the model defined in {@link com.google.pubsub.v1.PubsubMessage}.
*
* @param pubsubMessage {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage}
* @return JSON String that adheres to the model defined in {@link
* com.google.pubsub.v1.PubsubMessage}
*/
@VisibleForTesting
protected static String formatPubsubMessage(PubsubMessage pubsubMessage) {
JsonObject messageJson = new JsonObject();
String payload = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
try {
JsonObject data = GSON.fromJson(payload, JsonObject.class);
messageJson.add(PUBSUB_MESSAGE_DATA_FIELD, data);
} catch (JsonSyntaxException e) {
messageJson.addProperty(PUBSUB_MESSAGE_DATA_FIELD, payload);
}
JsonObject attributes = getAttributesJson(pubsubMessage.getAttributeMap());
messageJson.add(PUBSUB_MESSAGE_ATTRIBUTE_FIELD, attributes);
if (pubsubMessage.getMessageId() != null) {
messageJson.addProperty(PUBSUB_MESSAGE_ID_FIELD, pubsubMessage.getMessageId());
}
return messageJson.toString();
}
/**
* Constructs a {@link JsonObject} from a {@link Map} of Pub/Sub attributes.
*
* @param attributesMap {@link Map} of Pub/Sub attributes
* @return {@link JsonObject} of Pub/Sub attributes
*/
private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
JsonObject attributesJson = new JsonObject();
for (String key : attributesMap.keySet()) {
attributesJson.addProperty(key, attributesMap.get(key));
}
return attributesJson;
}
}
Pub/Sub vers fichiers Avro dans Cloud Storage
Le modèle Pub/Sub vers fichiers Avro dans Cloud Storage est un pipeline de streaming qui lit les données d'un sujet Pub/Sub et écrit des fichiers Avro dans le bucket Cloud Storage spécifié.
Conditions requises pour ce pipeline :
Le sujet Pub/Sub d'entrée doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres
Description
inputTopic
Sujet Pub/Sub permettant de s'abonner à la consultation de messages. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory
Répertoire de sortie dans lequel les fichiers de sortie Avro seront archivés. Doit inclure / à la fin.
Exemple : gs://example-bucket/example-directory/.
avroTempDirectory
Répertoire des fichiers Avro temporaires. Doit inclure / à la fin. Par exemple : gs://example-bucket/example-directory/.
outputFilenamePrefix
(Facultatif) Préfixe du nom de fichier de sortie pour les fichiers Avro.
outputFilenameSuffix
(Facultatif) Suffixe du nom de fichier de sortie pour les fichiers Avro.
outputShardTemplate
[Facultatif] Modèle de partition du fichier de sortie. Spécifié en tant que séquences répétées des lettres S ou N. Exemple : SSS-NNN. Celles-ci sont remplacées par le numéro de partition ou par le nombre total de partitions, respectivement. Si ce paramètre n'est pas spécifié, le format du modèle par défaut est W-P-SS-of-NN.
Exécuter le modèle Pub/Sub vers Cloud Storage Avro
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Avro Files on Cloud Storage template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
TOPIC_NAME : nom du sujet Pub/Sub
BUCKET_NAME : nom du bucket Cloud Storage
FILENAME_PREFIX : préfixe du nom de fichier de sortie préféré
FILENAME_SUFFIX : suffixe du nom de fichier de sortie préféré
SHARD_TEMPLATE : modèle de partition de sortie préféré
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
TOPIC_NAME : nom du sujet Pub/Sub
BUCKET_NAME : nom du bucket Cloud Storage
FILENAME_PREFIX : préfixe du nom de fichier de sortie préféré
FILENAME_SUFFIX : suffixe du nom de fichier de sortie préféré
SHARD_TEMPLATE : modèle de partition de sortie préféré
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToAvro.Options;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed Avro files at the specified output directory.
*
* <p>Files output will have the following schema:
*
* <pre>
* {
* "type": "record",
* "name": "AvroPubsubMessageRecord",
* "namespace": "com.google.cloud.teleport.avro",
* "fields": [
* {"name": "message", "type": {"type": "array", "items": "bytes"}},
* {"name": "attributes", "type": {"type": "map", "values": "string"}},
* {"name": "timestamp", "type": "long"}
* ]
* }
* </pre>
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToAvro
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-avro
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.avro"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_Avro",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Avro Files on Cloud Storage",
description =
"Streaming pipeline. Reads from a Pub/Sub subscription and outputs windowed Avro files to"
+ " the specified directory.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
contactInformation = "https://cloud.google.com/support")
public class PubsubToAvro {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.")
@Default.String("output")
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 7,
description = "Temporary Avro write directory",
helpText = "Directory for temporary Avro files.")
@Required
ValueProvider<String> getAvroTempDirectory();
void setAvroTempDirectory(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> messages = null;
/*
* Steps:
* 1) Read messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed data into Avro files, one per window by default.
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
messages
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
AvroIO.write(AvroPubsubMessageRecord.class)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input)))
/*.withTempDirectory(FileSystems.matchNewResource(
options.getAvroTempDirectory(),
Boolean.TRUE))
*/
.withWindowedWrites()
.withNumShards(options.getNumShards()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Converts an incoming {@link PubsubMessage} to the {@link AvroPubsubMessageRecord} class by
* copying it's fields and the timestamp of the message.
*/
static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
}
}
Sujet Pub/Sub vers des fichiers texte dans Cloud Storage
Le modèle Cloud Pub/Sub vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Cloud Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.
Conditions requises pour ce pipeline :
Le sujet Pub/Sub doit exister avant l'exécution.
Les messages publiés sur le thème doivent être au format texte.
Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.
Paramètres de modèle
Paramètres
Description
inputTopic
Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory
Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/. Cette valeur doit se terminer par une barre oblique.
outputFilenamePrefix
Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple, output-.
outputFilenameSuffix
Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv.
outputShardTemplate
Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données sortent dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NN où W correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate est 00-of-01.
Exécuter le modèle Pub/Sub vers des fichiers texte dans Cloud Storage
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Text Files on Cloud Storage template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
TOPIC_NAME : nom de votre sujet Pub/Sub
BUCKET_NAME : nom du bucket Cloud Storage
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToText.Options;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* PIPELINE_NAME=PubsubToText
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_BUCKET=TEMPLATE STORAGE BUCKET NAME HERE
* OUTPUT_BUCKET=JOB OUTPUT BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${PIPELINE_BUCKET}/dataflow/pipelines/pubsub-to-gcs-text
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.${PIPELINE_NAME} \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER} \
* --useSubscription=${USE_SUBSCRIPTION}"
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* windowDuration=5m,\
* numShards=1,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* windowDuration=5m,\
* numShards=1,\
* userTempLocation=gs://${OUTPUT_BUCKET}/tmp/,\
* outputDirectory=gs://${OUTPUT_BUCKET}/output/,\
* outputFilenamePrefix=windowed-file,\
* outputFilenameSuffix=.txt"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Text Files on Cloud Storage",
description =
"Streaming pipeline. Reads records from Pub/Sub and writes them to Cloud Storage, creating"
+ " a text file for each five minute window. Note that this pipeline assumes no"
+ " newlines in the body of the Pub/Sub message and thus each message becomes a single"
+ " line in the output file.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from a Pub/Sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
ValueProvider<String> getUserTempLocation();
void setUserTempLocation(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.")
@Default.String("output")
@Required
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static ValueProvider<String> maybeUseUserTempLocation(
ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
return DualInputNestedValueProvider.of(
userTempLocation,
outputLocation,
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return (input.getX() != null) ? input.getX() : input.getY();
}
});
}
}
Sujet Pub/Sub ou abonnement vers des fichiers texte dans Cloud Storage
Le sujet Pub/Sub ou l'abonnement vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.
Conditions requises pour ce pipeline :
Le sujet Pub/Sub ou l'abonnement doivent exister avant l'exécution.
Les messages publiés sur le thème doivent être au format texte.
Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.
Paramètres de modèle
Paramètres
Description
inputTopic
Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>. Si ce paramètre est fourni, inputSubscription ne doit pas être fourni.
inputSubscription
Abonnement Pub/Sub à partir duquel lire l'entrée. Le nom de l'abonnement doit être au format projects/<project-id>/subscription/<subscription-name>. Si ce paramètre est fourni, inputTopic ne doit pas être fourni.
outputDirectory
Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/. Cette valeur doit se terminer par une barre oblique.
outputFilenamePrefix
Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple, output-.
outputFilenameSuffix
Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv.
outputShardTemplate
Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données sortent dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NN où W correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate est 00-of-01.
windowDuration
(Facultatif) La durée de fenêtre correspond à l'intervalle au cours duquel les données sont écrites dans le répertoire de sortie. Configurez la durée en fonction du débit du pipeline. Par exemple, un débit plus élevé peut nécessiter des tailles de fenêtre plus petites pour que les données s'intègrent à la mémoire. La valeur par défaut est "5m", avec une durée minimale de 1 s. Les formats autorisés sont les suivants : [int]s (pour les secondes, exemple : 5s), [int]m (pour les minutes, exemple : 12m), [int]h (pour les heures, exemple : 2h).
Exécuter le modèle Sujet Pub/Sub ou l'abonnement vers les fichiers texte dans Cloud Storage
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
BUCKET_NAME : nom de votre bucket Cloud Storage
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
/*
* Copyright (C) 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.pubsubtotext;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Example Usage:
*
* <pre>
* # Set the pipeline vars
* export PROJECT={project id}
* export TEMPLATE_MODULE=googlecloud-to-googlecloud
* export TEMPLATE_NAME=pubsub-to-text
* export BUCKET_NAME=gs://{bucket name}
* export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${TEMPLATE_NAME}-image
* export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java11-template-launcher-base
* export BASE_CONTAINER_IMAGE_VERSION=latest
* export APP_ROOT=/template/${TEMPLATE_NAME}
* export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_NAME}-command-spec.json
* export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_NAME}-image-spec.json
*
* gcloud config set project ${PROJECT}
*
* # Build and push image to Google Container Repository
* mvn package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC} \
* -Djib.applicationCache=/tmp/jib-cache \
* -am -pl ${TEMPLATE_MODULE}
*
* # Create and upload image spec
* echo '{
* "image":"'${TARGET_GCR_IMAGE}'",
* "metadata":{
* "name":"Pub/Sub to text",
* "description":"Write Pub/Sub messages to GCS text files.",
* "parameters":[
* {
* "name":"inputSubscription",
* "label":"Pub/Sub subscription to read from",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"inputTopic",
* "label":"Pub/Sub topic to read from",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"outputDirectory",
* "label":"Directory to output files to",
* "paramType":"TEXT",
* "isOptional":false
* },
* {
* "name":"outputFilenamePrefix",
* "label":"The filename prefix of the files to write to",
* "paramType":"TEXT",
* "isOptional":false
* },
* {
* "name":"outputFilenameSuffix",
* "label":"The suffix of the files to write to",
* "paramType":"TEXT",
* "isOptional":true
* },
* {
* "name":"userTempLocation",
* "label":"The directory to output temporary files to",
* "paramType":"TEXT",
* "isOptional":true
* }
* ]
* },
* "sdk_info":{"language":"JAVA"}
* }' > image_spec.json
* gsutil cp image_spec.json ${TEMPLATE_IMAGE_SPEC}
* rm image_spec.json
*
* # Run template
* export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
* gcloud beta dataflow flex-template run ${JOB_NAME} \
* --project=${PROJECT} --region=us-central1 \
* --template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
* --parameters inputTopic={topic},outputDirectory={directory},outputFilenamePrefix={prefix}
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
description =
"Streaming pipeline. Reads records from Pub/Sub Subscription or Topic and writes them to"
+ " Cloud Storage, creating a text file for each five minute window. Note that this"
+ " pipeline assumes no newlines in the body of the Pub/Sub message and thus each"
+ " message becomes a single line in the output file.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-text",
contactInformation = "https://cloud.google.com/support")
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubTopic(
order = 1,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 2,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
@Required
String getOutputDirectory();
void setOutputDirectory(String value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
String getUserTempLocation();
void setUserTempLocation(String value);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
@Default.String("output")
@Required
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.",
example = ".txt")
@Default.String("")
String getOutputFilenameSuffix();
void setOutputFilenameSuffix(String value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}
Pub/Sub vers MongoDB
Le modèle Pub/Sub vers MongoDB est un pipeline de streaming qui lit les messages encodés au format JSON d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents.
Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction JavaScript définie par l'utilisateur. Toute erreur survenue en raison d'une non-concordance du schéma ou d'un format JSON non valide, ou pendant l'exécution de transformations, est enregistrée avec le message d'entrée dans une table BigQuery destinée aux messages non traités. Si la table des enregistrements non traités n'existe pas avant l'exécution, le pipeline la crée automatiquement.
Conditions requises pour ce pipeline :
L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.
Paramètres de modèle
Paramètres
Description
inputSubscription
Nom de l'abonnement Pub/Sub. Par exemple : projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri
Liste de serveurs MongoDB séparés par une virgule. Par exemple : 192.285.234.12:27017,192.287.123.11:27017
database
Base de données dans MongoDB pour stocker la collection. Exemple : my-db.
collection
Nom de la collection dans la base de données MongoDB. Exemple : my-collection.
deadletterTable
Table BigQuery qui stocke les messages en raison d'échecs (schéma non correspondant, format JSON non valide, etc.). Exemple : project-id:dataset-name.table-name.
javascriptTextTransformGcsPath
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
batchSize
(Facultatif) Taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. Valeur par défaut : 1000
batchSizeBytes
(Facultatif) Taille du lot en octets. Valeur par défaut : 5242880
maxConnectionIdleTime
(Facultatif) Durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. Valeur par défaut : 60000
sslEnabled
(Facultatif) Valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. Valeur par défaut : true
ignoreSSLCertificate
(Facultatif) Valeur booléenne indiquant si le certificat SSL doit être ignoré. Valeur par défaut : true
withOrdered
(Facultatif) Valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. Valeur par défaut : true
withSSLInvalidHostNameAllowed
(Facultatif) Valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. Valeur par défaut : true
Exécuter le modèle Pub/Sub vers MongoDB
Console
Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.
Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to MongoDB template.
Dans les champs fournis, saisissez vos valeurs de paramètres.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
JOB_NAME : nom de la tâche de votre choix
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
DATABASE : nom de la base de données MongoDB (par exemple, users)
COLLECTION : nom de la collection MongoDB (par exemple, profiles)
UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
VERSION : version du modèle que vous souhaitez utiliser
Vous pouvez utiliser les valeurs suivantes :
latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
DATABASE : nom de la base de données MongoDB (par exemple, users)
COLLECTION : nom de la collection MongoDB (par exemple, profiles)
UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.PubSubToMongoDB.Options;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToMongoDB} pipeline is a streaming pipeline which ingests data in JSON format
* from PubSub, applies a Javascript UDF if provided and inserts resulting records as Bson Document
* in MongoDB. If the element fails to be processed then it is written to a deadletter table in
* BigQuery.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The PubSub topic and subscriptions exist
* <li>The MongoDB is up and running
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_NAME=my-project
* BUCKET_NAME=my-bucket
* INPUT_SUBSCRIPTION=my-subscription
* MONGODB_DATABASE_NAME=testdb
* MONGODB_HOSTNAME=my-host:port
* MONGODB_COLLECTION_NAME=testCollection
* DEADLETTERTABLE=project:dataset.deadletter_table_name
*
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.v2.templates.PubSubToMongoDB \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_NAME} \
* --stagingLocation=gs://${BUCKET_NAME}/staging \
* --tempLocation=gs://${BUCKET_NAME}/temp \
* --runner=DataflowRunner \
* --inputSubscription=${INPUT_SUBSCRIPTION} \
* --mongoDBUri=${MONGODB_HOSTNAME} \
* --database=${MONGODB_DATABASE_NAME} \
* --collection=${MONGODB_COLLECTION_NAME} \
* --deadletterTable=${DEADLETTERTABLE}"
* </pre>
*/
@Template(
name = "Cloud_PubSub_to_MongoDB",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to MongoDB",
description =
"Streaming pipeline that reads JSON encoded messages from a Pub/Sub subscription,"
+ " transforms them using a JavaScript user-defined function (UDF), and writes them to"
+ " a MongoDB as documents.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-mongodb",
contactInformation = "https://cloud.google.com/support")
public class PubSubToMongoDB {
/**
* Options supported by {@link PubSubToMongoDB}
*
* <p>Inherits standard configuration options.
*/
/** The tag for the main output of the json transformation. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToMongoDB.class);
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*
* <p>Inherits standard configuration options, options from {@link
* JavascriptTextTransformer.JavascriptTextTransformerOptions}.
*/
public interface Options
extends JavascriptTextTransformer.JavascriptTextTransformerOptions, PipelineOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
@Validation.Required
String getInputSubscription();
void setInputSubscription(String inputSubscription);
@TemplateParameter.Text(
order = 2,
description = "MongoDB Connection URI",
helpText = "List of Mongo DB nodes separated by comma.",
example = "host1:port,host2:port,host3:port")
@Validation.Required
String getMongoDBUri();
void setMongoDBUri(String mongoDBUri);
@TemplateParameter.Text(
order = 3,
description = "MongoDB Database",
helpText = "Database in MongoDB to store the collection.",
example = "my-db")
@Validation.Required
String getDatabase();
void setDatabase(String database);
@TemplateParameter.Text(
order = 4,
description = "MongoDB collection",
helpText = "Name of the collection inside MongoDB database to put the documents to.",
example = "my-collection")
@Validation.Required
String getCollection();
void setCollection(String collection);
@TemplateParameter.BigQueryTable(
order = 5,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+ " schema, malformed json) are written to this table. If it doesn't exist, it will"
+ " be created during pipeline execution. If not specified,"
+ " \"outputTableSpec_error_records\" is used instead.",
example = "your-project-id:your-dataset.your-table-name")
@Validation.Required
String getDeadletterTable();
void setDeadletterTable(String deadletterTable);
@TemplateParameter.Long(
order = 6,
optional = true,
description = "Batch Size",
helpText = "Batch Size used for batch insertion of documents into MongoDB.")
@Default.Long(1000)
Long getBatchSize();
void setBatchSize(Long batchSize);
@TemplateParameter.Long(
order = 7,
optional = true,
description = "Batch Size in Bytes",
helpText =
"Batch Size in bytes used for batch insertion of documents into MongoDB. Default:"
+ " 5242880 (5mb)")
@Default.Long(5242880)
Long getBatchSizeBytes();
void setBatchSizeBytes(Long batchSizeBytes);
@TemplateParameter.Integer(
order = 8,
optional = true,
description = "Max Connection idle time",
helpText = "Maximum idle time allowed in seconds before connection timeout occurs.")
@Default.Integer(60000)
int getMaxConnectionIdleTime();
void setMaxConnectionIdleTime(int maxConnectionIdleTime);
@TemplateParameter.Boolean(
order = 9,
optional = true,
description = "SSL Enabled",
helpText = "Indicates whether connection to MongoDB is ssl enabled.")
@Default.Boolean(true)
Boolean getSslEnabled();
void setSslEnabled(Boolean sslEnabled);
@TemplateParameter.Boolean(
order = 10,
optional = true,
description = "Ignore SSL Certificate",
helpText = "Indicates whether SSL certificate should be ignored.")
@Default.Boolean(true)
Boolean getIgnoreSSLCertificate();
void setIgnoreSSLCertificate(Boolean ignoreSSLCertificate);
@TemplateParameter.Boolean(
order = 11,
optional = true,
description = "withOrdered",
helpText = "Enables ordered bulk insertions into MongoDB.")
@Default.Boolean(true)
Boolean getWithOrdered();
void setWithOrdered(Boolean withOrdered);
@TemplateParameter.Boolean(
order = 12,
optional = true,
description = "withSSLInvalidHostNameAllowed",
helpText = "Indicates whether invalid host name is allowed for ssl connection.")
@Default.Boolean(true)
Boolean getWithSSLInvalidHostNameAllowed();
void setWithSSLInvalidHostNameAllowed(Boolean withSSLInvalidHostNameAllowed);
}
/** DoFn that will parse the given string elements as Bson Documents. */
private static class ParseAsDocumentsFn extends DoFn<String, Document> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(Document.parse(context.element()));
}
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
// Parse the user options passed from the command-line.
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coders for pipeline
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
* 2) Apply Javascript UDF if provided.
* 3) Write to MongoDB
*
*/
LOG.info("Reading from subscription: " + options.getInputSubscription());
PCollectionTuple convertedPubsubMessages =
pipeline
/*
* Step #1: Read from a PubSub subscription.
*/
.apply(
"Read PubSub Subscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()))
/*
* Step #2: Apply Javascript Transform and transform, if provided and transform
* the PubsubMessages into Json documents.
*/
.apply(
"Apply Javascript UDF",
PubSubMessageToJsonDocument.newBuilder()
.setJavascriptTextTransformFunctionName(
options.getJavascriptTextTransformFunctionName())
.setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
.build());
/*
* Step #3a: Write Json documents into MongoDB using {@link MongoDbIO.write}.
*/
convertedPubsubMessages
.get(TRANSFORM_OUT)
.apply(
"Get Json Documents",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::g