Modèle Pub/Sub vers MongoDB

Le modèle Pub/Sub vers MongoDB est un pipeline de streaming qui lit les messages encodés au format JSON d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents. Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction JavaScript définie par l'utilisateur.

Si des erreurs se produisent lors du traitement des enregistrements, le modèle les écrit dans une table BigQuery, avec le message d'entrée. Par exemple, des erreurs peuvent se produire en raison d'une non-concordance du schéma, d'un format JSON non valide ou lors de l'exécution de transformations. Indiquez le nom de la table dans le paramètre deadletterTable. Si la table n'existe pas, le pipeline la crée automatiquement.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
  • Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.

Paramètres de modèle

Paramètres obligatoires

  • inputSubscription: nom de l'abonnement Pub/Sub. Exemple :projects/your-project-id/subscriptions/your-subscription-name
  • mongoDBUri: liste de serveurs MongoDB séparés par une virgule. Exemple :host1:port,host2:port,host3:port
  • database: base de données dans MongoDB pour stocker la collection. Exemple :my-db
  • collection: nom de la collection dans la base de données MongoDB. Exemple :my-collection
  • deadletterTable: table BigQuery qui stocke les messages causés par des échecs, tels qu'un schéma non correspondant, un format JSON non valide, etc. Exemple :your-project-id:your-dataset.your-table-name

Paramètres facultatifs

  • batchSize: taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. La valeur par défaut est 1000.
  • batchSizeBytes: taille du lot en octets. La valeur par défaut est 5242880.
  • maxConnectionIdleTime: durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. La valeur par défaut est 60000.
  • sslEnabled: valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. La valeur par défaut est "true".
  • ignoreSSLCertificate: valeur booléenne indiquant si le certificat SSL doit être ignoré. La valeur par défaut est "true".
  • withOrdered: valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. La valeur par défaut est "true".
  • withSSLInvalidHostNameAllowed: valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. La valeur par défaut est "true".
  • javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes: spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : une ligne provenant d'un fichier CSV d'entrée.
  • Sortie : document JSON concaténé à insérer dans MongoDB.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to MongoDB template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.PubSubToMongoDB.Options;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToMongoDB} pipeline is a streaming pipeline which ingests data in JSON format
 * from PubSub, applies a Javascript UDF if provided and inserts resulting records as Bson Document
 * in MongoDB. If the element fails to be processed then it is written to a deadletter table in
 * BigQuery.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The PubSub topic and subscriptions exist
 *   <li>The MongoDB is up and running
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-to-mongodb/README_Cloud_PubSub_to_MongoDB.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "Cloud_PubSub_to_MongoDB",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to MongoDB",
      description =
          "The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. "
              + "If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). "
              + "Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. "
              + "If a table for unprocessed records does not exist prior to execution, the pipeline automatically creates this table.",
      skipOptions = {
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      optionsClass = Options.class,
      flexContainerName = "pubsub-to-mongodb",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-mongodb",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.",
        "The MongoDB cluster must exist and should be accessible from the Dataflow worker machines."
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "Cloud_PubSub_to_MongoDB_Xlang",
      category = TemplateCategory.STREAMING,
      type = Template.TemplateType.XLANG,
      displayName = "Pub/Sub to MongoDB with Python UDFs",
      description =
          "The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. "
              + "If required, this pipeline supports additional transforms that can be included using a Python user-defined function (UDF). "
              + "Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. "
              + "If a table for unprocessed records does not exist prior to execution, the pipeline automatically creates this table.",
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      optionsClass = Options.class,
      flexContainerName = "pubsub-to-mongodb-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-mongodb",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.",
        "The MongoDB cluster must exist and should be accessible from the Dataflow worker machines."
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public class PubSubToMongoDB {
  /**
   * Options supported by {@link PubSubToMongoDB}
   *
   * <p>Inherits standard configuration options.
   */

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** Pubsub message/string coder for pipeline. */
  public static final FailsafeElementCoder<PubsubMessage, String> CODER =
      FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToMongoDB.class);

  /**
   * The {@link Options} class provides the custom execution options passed by the executor at the
   * command-line.
   *
   * <p>Inherits standard configuration options, options from {@link
   * PythonExternalTextTransformer.PythonExternalTextTransformerOptions}.
   */
  public interface Options
      extends PythonExternalTextTransformer.PythonExternalTextTransformerOptions, PipelineOptions {
    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText = "Name of the Pub/Sub subscription.",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    @Validation.Required
    String getInputSubscription();

    void setInputSubscription(String inputSubscription);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        description = "MongoDB Connection URI",
        helpText = "Comma separated list of MongoDB servers.",
        example = "host1:port,host2:port,host3:port")
    @Validation.Required
    String getMongoDBUri();

    void setMongoDBUri(String mongoDBUri);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Target",
        description = "MongoDB Database",
        helpText = "Database in MongoDB to store the collection.",
        example = "my-db")
    @Validation.Required
    String getDatabase();

    void setDatabase(String database);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Target",
        description = "MongoDB collection",
        helpText = "Name of the collection in the MongoDB database.",
        example = "my-collection")
    @Validation.Required
    String getCollection();

    void setCollection(String collection);

    @TemplateParameter.BigQueryTable(
        order = 5,
        description = "The dead-letter table name to output failed messages to BigQuery",
        helpText =
            "The BigQuery table that stores messages caused by failures, such as mismatched schema, malformed JSON, and so on.",
        example = "your-project-id:your-dataset.your-table-name")
    @Validation.Required
    String getDeadletterTable();

    void setDeadletterTable(String deadletterTable);

    @TemplateParameter.Long(
        order = 6,
        optional = true,
        description = "Batch Size",
        helpText = "Batch size used for batch insertion of documents into MongoDB.")
    @Default.Long(1000)
    Long getBatchSize();

    void setBatchSize(Long batchSize);

    @TemplateParameter.Long(
        order = 7,
        optional = true,
        description = "Batch Size in Bytes",
        helpText = "Batch size in bytes.")
    @Default.Long(5242880)
    Long getBatchSizeBytes();

    void setBatchSizeBytes(Long batchSizeBytes);

    @TemplateParameter.Integer(
        order = 8,
        optional = true,
        description = "Max Connection idle time",
        helpText = "Maximum idle time allowed in seconds before connection timeout occurs.")
    @Default.Integer(60000)
    int getMaxConnectionIdleTime();

    void setMaxConnectionIdleTime(int maxConnectionIdleTime);

    @TemplateParameter.Boolean(
        order = 9,
        optional = true,
        description = "SSL Enabled",
        helpText = "Boolean value indicating whether the connection to MongoDB is SSL enabled.")
    @Default.Boolean(true)
    Boolean getSslEnabled();

    void setSslEnabled(Boolean sslEnabled);

    @TemplateParameter.Boolean(
        order = 10,
        optional = true,
        description = "Ignore SSL Certificate",
        helpText = "Boolean value indicating whether to ignore the SSL certificate.")
    @Default.Boolean(true)
    Boolean getIgnoreSSLCertificate();

    void setIgnoreSSLCertificate(Boolean ignoreSSLCertificate);

    @TemplateParameter.Boolean(
        order = 11,
        optional = true,
        description = "withOrdered",
        helpText = "Boolean value enabling ordered bulk insertions into MongoDB.")
    @Default.Boolean(true)
    Boolean getWithOrdered();

    void setWithOrdered(Boolean withOrdered);

    @TemplateParameter.Boolean(
        order = 12,
        optional = true,
        description = "withSSLInvalidHostNameAllowed",
        helpText =
            "Boolean value indicating whether an invalid hostname is allowed for the SSL connection.")
    @Default.Boolean(true)
    Boolean getWithSSLInvalidHostNameAllowed();

    void setWithSSLInvalidHostNameAllowed(Boolean withSSLInvalidHostNameAllowed);
  }

  /** DoFn that will parse the given string elements as Bson Documents. */
  private static class ParseAsDocumentsFn extends DoFn<String, Document> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(Document.parse(context.element()));
    }
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line.
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();

    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);

    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());

    if (usePythonUdf && useJavascriptUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }
    /*
     * Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
     *        2) Apply Javascript or Python UDF if provided.
     *        3) Write to MongoDB
     *
     */

    LOG.info("Reading from subscription: " + options.getInputSubscription());

    PCollection<PubsubMessage> readMessagesFromPubsub =
        pipeline
            /*
             * Step #1: Read from a PubSub subscription.
             */
            .apply(
            "Read PubSub Subscription",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()));

    PCollectionTuple convertedPubsubMessages;
    if (usePythonUdf) {
      convertedPubsubMessages =
          readMessagesFromPubsub
              /*
               * Step #2: Apply Python Transform and transform, if provided and transform
               *          the PubsubMessages into Json documents.
               */
              .apply(
              "Apply Python UDF",
              PubSubMessageToJsonDocument.newBuilder()
                  .setPythonExternalTextTransformFunctionName(
                      options.getPythonExternalTextTransformFunctionName())
                  .setPythonExternalTextTransformGcsPath(
                      options.getPythonExternalTextTransformGcsPath())
                  .build());
    } else {
      convertedPubsubMessages =
          readMessagesFromPubsub
              /*
               * Step #2: Apply Javascript Transform and transform, if provided and transform
               *          the PubsubMessages into Json documents.
               */
              .apply(
              "Apply Javascript UDF",
              PubSubMessageToJsonDocument.newBuilder()
                  .setJavascriptTextTransformFunctionName(
                      options.getJavascriptTextTransformFunctionName())
                  .setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
                  .setJavascriptTextTransformReloadIntervalMinutes(
                      options.getJavascriptTextTransformReloadIntervalMinutes())
                  .build());
    }

    /*
     * Step #3a: Write Json documents into MongoDB using {@link MongoDbIO.write}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_OUT)
        .apply(
            "Get Json Documents",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply("Parse as BSON Document", ParDo.of(new ParseAsDocumentsFn()))
        .apply(
            "Put to MongoDB",
            MongoDbIO.write()
                .withBatchSize(options.getBatchSize())
                .withUri(prefixMongoDb(options.getMongoDBUri()))
                .withDatabase(options.getDatabase())
                .withCollection(options.getCollection())
                .withIgnoreSSLCertificate(options.getIgnoreSSLCertificate())
                .withMaxConnectionIdleTime(options.getMaxConnectionIdleTime())
                .withOrdered(options.getWithOrdered())
                .withSSLEnabled(options.getSslEnabled())
                .withSSLInvalidHostNameAllowed(options.getWithSSLInvalidHostNameAllowed()));

    /*
     * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_DEADLETTER_OUT)
        .apply(
            "Write Transform Failures To BigQuery",
            ErrorConverters.WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getDeadletterTable())
                .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                .build());

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /** Add the MongoDB protocol prefix only if the given uri doesn't have it. */
  private static String prefixMongoDb(String mongoDBUri) {
    if (mongoDBUri.startsWith("mongodb://") || mongoDBUri.startsWith("mongodb+srv://")) {
      return mongoDBUri;
    }
    return String.format("mongodb://%s", mongoDBUri);
  }

  /**
   * The {@link PubSubMessageToJsonDocument} class is a {@link PTransform} which transforms incoming
   * {@link PubsubMessage} objects into JSON objects for insertion into MongoDB while applying an
   * optional UDF to the input. The executions of the UDF and transformation to Json objects is done
   * in a fail-safe way by wrapping the element with it's original payload inside the {@link
   * FailsafeElement} class. The {@link PubSubMessageToJsonDocument} transform will output a {@link
   * PCollectionTuple} which contains all output and dead-letter {@link PCollection}.
   *
   * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
   *
   * <ul>
   *   <li>{@link PubSubToMongoDB#TRANSFORM_OUT} - Contains all records successfully converted to
   *       JSON objects.
   *   <li>{@link PubSubToMongoDB#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
   *       records which couldn't be converted to table rows.
   * </ul>
   */
  @AutoValue
  public abstract static class PubSubMessageToJsonDocument
      extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {

    public static Builder newBuilder() {
      return new AutoValue_PubSubToMongoDB_PubSubMessageToJsonDocument.Builder();
    }

    @Nullable
    public abstract String javascriptTextTransformGcsPath();

    @Nullable
    public abstract String javascriptTextTransformFunctionName();

    @Nullable
    public abstract String pythonExternalTextTransformGcsPath();

    @Nullable
    public abstract String pythonExternalTextTransformFunctionName();

    @Nullable
    public abstract Integer javascriptTextTransformReloadIntervalMinutes();

    @Override
    public PCollectionTuple expand(PCollection<PubsubMessage> input) {

      // Check for python UDF first. If the pipeline is not using Python UDF, proceed as normal.
      if (pythonExternalTextTransformGcsPath() != null) {
        PCollection<Row> failsafeElements =
            input
                // Map the incoming messages into FailsafeElements, so we can recover from failures
                // across multiple transforms.
                .apply(
                    "MapToRecord",
                    PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                        .pubSubMappingFunction())
                .setRowSchema(
                    PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA);
        return failsafeElements
            .apply(
                "InvokeUDF",
                PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                    .setFileSystemPath(pythonExternalTextTransformGcsPath())
                    .setFunctionName(pythonExternalTextTransformFunctionName())
                    .build())
            .apply(
                "MapRowsToFailsafeElements",
                ParDo.of(
                        new PythonExternalTextTransformer.RowToPubSubFailsafeElementFn(
                            TRANSFORM_OUT, TRANSFORM_DEADLETTER_OUT))
                    .withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
      }
      // If we don't have Python UDF, we proceed as normal checking for Javascript UDF.
      // Map the incoming messages into FailsafeElements so we can recover from failures
      // across multiple transforms.
      PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
          input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));

      // If a Udf is supplied then use it to parse the PubSubMessages.
      if (javascriptTextTransformGcsPath() != null) {
        return failsafeElements.apply(
            "InvokeUDF",
            JavascriptTextTransformer.FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                .setFileSystemPath(javascriptTextTransformGcsPath())
                .setFunctionName(javascriptTextTransformFunctionName())
                .setReloadIntervalMinutes(javascriptTextTransformReloadIntervalMinutes())
                .setSuccessTag(TRANSFORM_OUT)
                .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                .build());
      } else {
        return failsafeElements.apply(
            "ProcessPubSubMessages",
            ParDo.of(new ProcessFailsafePubSubFn())
                .withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
      }
    }

    /** Builder for {@link PubSubMessageToJsonDocument}. */
    @AutoValue.Builder
    public abstract static class Builder {
      public abstract Builder setJavascriptTextTransformGcsPath(
          String javascriptTextTransformGcsPath);

      public abstract Builder setJavascriptTextTransformFunctionName(
          String javascriptTextTransformFunctionName);

      public abstract Builder setPythonExternalTextTransformGcsPath(
          String pythonExternalTextTransformGcsPath);

      public abstract Builder setPythonExternalTextTransformFunctionName(
          String pythonExternalTextTransformFunctionName);

      public abstract Builder setJavascriptTextTransformReloadIntervalMinutes(
          Integer javascriptTextTransformReloadIntervalMinutes);

      public abstract PubSubMessageToJsonDocument build();
    }
  }

  /**
   * The {@link ProcessFailsafePubSubFn} class processes a {@link FailsafeElement} containing a
   * {@link PubsubMessage} and a String of the message's payload {@link PubsubMessage#getPayload()}
   * into a {@link FailsafeElement} of the original {@link PubsubMessage} and a JSON string that has
   * been processed with {@link Gson}.
   *
   * <p>If {@link PubsubMessage#getAttributeMap()} is not empty then the message attributes will be
   * serialized along with the message payload.
   */
  static class ProcessFailsafePubSubFn
      extends DoFn<FailsafeElement<PubsubMessage, String>, FailsafeElement<PubsubMessage, String>> {

    private static final Counter successCounter =
        Metrics.counter(PubSubMessageToJsonDocument.class, "successful-json-conversion");

    private static Gson gson = new Gson();

    private static final Counter failedCounter =
        Metrics.counter(PubSubMessageToJsonDocument.class, "failed-json-conversion");

    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage pubsubMessage = context.element().getOriginalPayload();

      JsonObject messageObject = new JsonObject();

      try {
        if (pubsubMessage.getPayload().length > 0) {
          messageObject =
              gson.fromJson(
                  new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8), JsonObject.class);
        }

        // If message attributes are present they will be serialized along with the message payload
        if (pubsubMessage.getAttributeMap() != null) {
          pubsubMessage.getAttributeMap().forEach(messageObject::addProperty);
        }

        context.output(FailsafeElement.of(pubsubMessage, messageObject.toString()));
        successCounter.inc();

      } catch (JsonSyntaxException e) {
        context.output(
            TRANSFORM_DEADLETTER_OUT,
            FailsafeElement.of(context.element())
                .setErrorMessage(e.getMessage())
                .setStacktrace(Throwables.getStackTraceAsString(e)));
        failedCounter.inc();
      }
    }
  }

  /**
   * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
   * {@link FailsafeElement} class so errors can be recovered from and the original message can be
   * output to a error records table.
   */
  static class PubsubMessageToFailsafeElementFn
      extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage message = context.element();
      context.output(
          FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
    }
  }
}

Étape suivante