Plantilla de MongoDB a BigQuery (transmisión)

Esta plantilla crea una canalización de transmisión que funciona con las transmisiones de cambio de MongoDB. Para usar esta plantilla, publica los datos de flujos de cambios en Pub/Sub. Luego, la canalización lee los registros JSON de Pub/Sub y los escribe en BigQuery. Los registros escritos en BigQuery tienen el mismo formato que la plantilla por lotes de MongoDB a BigQuery.

Requisitos de la canalización

  • El conjunto de datos de destino de BigQuery debe existir.
  • Se debe poder acceder a la instancia de origen de MongoDB desde las máquinas de trabajador de Dataflow.
  • Debes crear un tema de Pub/Sub para leer el flujo de cambios. Mientras se ejecuta la canalización, detecta eventos de captura de datos modificados (CDC) en el flujo de cambios de MongoDB y publícalos en Pub/Sub como registros JSON. Para obtener más información sobre la publicación de mensajes en Pub/Sub, consulta Publica mensajes en temas.
  • Esta plantilla usa los flujos de cambios de MongoDB. No es compatible con la captura de datos modificados de BigQuery.

Parámetros de la plantilla

Parámetros obligatorios

  • mongoDbUri: Es el URI de conexión de MongoDB con el formato mongodb+srv://:@..
  • database: La base de datos en MongoDB en la que se debe leer la colección. Por ejemplo, my-db
  • collection: Es el nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo, my-collection
  • userOption: FLATTEN, JSON o NONE. FLATTEN aplana los documentos al nivel único. JSON almacena el documento en formato JSON de BigQuery. NONE almacena todo el documento como una cadena con formato JSON. La configuración predeterminada es NONE.
  • inputTopic: El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputTableSpec: La tabla de BigQuery en la que se escribirá. Por ejemplo, bigquery-project:dataset.output_table

Parámetros opcionales

  • useStorageWriteApiAtLeastOnce: Cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura el parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.
  • KMSEncryptionKey: Clave de encriptación de Cloud KMS para desencriptar la cadena de conexión del URI de MongoDB. Si se pasa la clave de Cloud KMS, la string de conexión de URI de MongoDB debe pasarse encriptada. Por ejemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • filter: Filtro Bson en formato JSON. Por ejemplo, { "val": { $gt: 0, $lt: 9 }}
  • useStorageWriteApi: Si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.
  • bigQuerySchemaPath: La ruta de Cloud Storage para el esquema JSON de BigQuery. Por ejemplo, gs://your-bucket/your-schema.json
  • javascriptDocumentTransformGcsPath: El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: Es el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por ejemplo, transform.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF) en JavaScript. La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON.

Para usar una UDF, sube el archivo JavaScript a Cloud Storage y establece los siguientes parámetros de plantilla:

ParámetroDescripción
javascriptDocumentTransformGcsPath Ubicación de Cloud Storage del archivo JavaScript.
javascriptDocumentTransformFunctionName Es el nombre de la función de JavaScript.

Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: un documento de MongoDB.
  • Resultado: Un objeto serializado como una cadena JSON.
  • Ejecuta la plantilla

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the MongoDB (CDC) to BigQuery template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Haga clic en Ejecutar trabajo.

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • OUTPUT_TABLE_SPEC: Es el nombre de la tabla de BigQuery de destino.
    • MONGO_DB_URI: Es el URI de MongoDB.
    • DATABASE: Es tu base de datos de MongoDB.
    • COLLECTION: Es tu colección de MongoDB.
    • USER_OPTION: FLATTEN, JSON o NONE.
    • INPUT_TOPIC: Es el tema de entrada de Pub/Sub.

    Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • OUTPUT_TABLE_SPEC: Es el nombre de la tabla de BigQuery de destino.
    • MONGO_DB_URI: Es el URI de MongoDB.
    • DATABASE: Es tu base de datos de MongoDB.
    • COLLECTION: Es tu colección de MongoDB.
    • USER_OPTION: FLATTEN, JSON o NONE.
    • INPUT_TOPIC: Es el tema de entrada de Pub/Sub.
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.PubSubOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbCdcToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link MongoDbCdcToBigQuery} pipeline is a streaming pipeline which reads data pushed to
     * PubSub from MongoDB Changestream and outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery_CDC.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery_CDC",
        category = TemplateCategory.STREAMING,
        displayName = "MongoDB (CDC) to BigQuery",
        description =
            "The MongoDB CDC (Change Data Capture) to BigQuery template is a streaming pipeline that works together with MongoDB change streams. "
                + "The pipeline reads the JSON records pushed to Pub/Sub via a MongoDB change stream and writes them to BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery-cdc",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-change-stream-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines.",
          "The change stream pushing changes from MongoDB to Pub/Sub should be running."
        },
        streaming = true,
        supportsAtLeastOnce = true)
    public class MongoDbCdcToBigQuery {
    
      private static final Logger LOG = LoggerFactory.getLogger(MongoDbCdcToBigQuery.class);
    
      /** Options interface. */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              PubSubOptions,
              BigQueryWriteOptions,
              JavascriptDocumentTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 1,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. To"
                    + " use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-"
                    + " once semantics, set the parameter to `false`. This parameter applies only when"
                    + " `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** class ParseAsDocumentsFn. */
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
    
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      /**
       * Main entry point for pipeline execution.
       *
       * @param args Command line arguments to the pipeline.
       */
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        run(options);
      }
    
      /** Pipeline to read data from PubSub and write to MongoDB. */
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        options.setStreaming(true);
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
        String inputOption = options.getInputTopic();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        pipeline
            .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(inputOption))
            .apply(
                "RTransform string to document",
                ParDo.of(
                    new DoFn<String, Document>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = Document.parse(c.element());
                        c.output(document);
                      }
                    }))
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Read and transform data",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    ¿Qué sigue?