Modelo do MongoDB para BigQuery (Stream)

Este modelo cria um pipeline de streaming que funciona com fluxos de alterações do MongoDB. Para usar esse modelo, publique os dados do fluxo de alterações no Pub/Sub. O pipeline lê os registros JSON do Pub/Sub e os grava no BigQuery. Os registros gravados no BigQuery têm o mesmo formato que o modelo de lote do MongoDB para BigQuery.

Requisitos de pipeline

  • O conjunto de dados de destino do BigQuery precisa existir.
  • A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.
  • É preciso criar um tópico do Pub/Sub para ler o fluxo de alterações. Enquanto o pipeline estiver em execução, detecte eventos de captura de dados alterados (CDC) no fluxo de alterações do MongoDB e publique-os no Pub/Sub como registros JSON. Para mais informações sobre como publicar mensagens no Pub/Sub, consulte Publicar mensagens em tópicos.
  • Este modelo usa fluxos de alterações do MongoDB. Não é compatível com a captura de dados de mudança do BigQuery.

Parâmetros do modelo

Parâmetros obrigatórios

  • mongoDbUri: o URI de conexão do MongoDB no formato mongodb+srv://:@..
  • database: banco de dados no MongoDB para leitura da coleção. Por exemplo, my-db.
  • collection: nome da coleção no banco de dados do MongoDB. Por exemplo, my-collection.
  • userOption: FLATTEN, JSON ou NONE. FLATTEN nivela a linha para um único nível. JSON armazena o documento no formato JSON do BigQuery. NONE armazena todo o documento como uma STRING formatada em JSON. O padrão é: NENHUM.
  • inputTopic: o tópico de entrada do Pub/Sub que será lido, no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputTableSpec: a tabela do BigQuery em que será feita a gravação. Por exemplo, bigquery-project:dataset.output_table.

Parâmetros opcionais

  • useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • KMSEncryptionKey: chave de criptografia do Cloud KMS para descriptografar a string de conexão uri do Mongodb. Se a chave do Cloud KMS for transmitida, a string de conexão uri do mongodb precisará ser transmitida de forma criptografada. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: filtro Bson no formato JSON. Por exemplo, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro.
  • bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Por exemplo, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform.

Função definida pelo usuário

Se quiser, estenda esse modelo gravando uma função definida pelo usuário (UDF) em JavaScript. O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma UDF, faça upload do arquivo JavaScript no Cloud Storage e defina os seguintes parâmetros de modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath O local do arquivo JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: um documento do MongoDB.
  • Saída: um objeto serializado como uma string JSON.
  • Executar o modelo

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the MongoDB (CDC) to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Cliquem em Executar job.

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    • INPUT_TOPIC: o tópico de entrada do Pub/Sub.

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    • INPUT_TOPIC: o tópico de entrada do Pub/Sub.
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.PubSubOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbCdcToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link MongoDbCdcToBigQuery} pipeline is a streaming pipeline which reads data pushed to
     * PubSub from MongoDB Changestream and outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery_CDC.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery_CDC",
        category = TemplateCategory.STREAMING,
        displayName = "MongoDB (CDC) to BigQuery",
        description =
            "The MongoDB CDC (Change Data Capture) to BigQuery template is a streaming pipeline that works together with MongoDB change streams. "
                + "The pipeline reads the JSON records pushed to Pub/Sub via a MongoDB change stream and writes them to BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery-cdc",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-change-stream-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines.",
          "The change stream pushing changes from MongoDB to Pub/Sub should be running."
        },
        streaming = true,
        supportsAtLeastOnce = true)
    public class MongoDbCdcToBigQuery {
    
      private static final Logger LOG = LoggerFactory.getLogger(MongoDbCdcToBigQuery.class);
    
      /** Options interface. */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              PubSubOptions,
              BigQueryWriteOptions,
              JavascriptDocumentTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 1,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. To"
                    + " use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-"
                    + " once semantics, set the parameter to `false`. This parameter applies only when"
                    + " `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** class ParseAsDocumentsFn. */
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
    
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      /**
       * Main entry point for pipeline execution.
       *
       * @param args Command line arguments to the pipeline.
       */
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        run(options);
      }
    
      /** Pipeline to read data from PubSub and write to MongoDB. */
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        options.setStreaming(true);
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
        String inputOption = options.getInputTopic();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        pipeline
            .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(inputOption))
            .apply(
                "RTransform string to document",
                ParDo.of(
                    new DoFn<String, Document>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = Document.parse(c.element());
                        c.output(document);
                      }
                    }))
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Read and transform data",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    A seguir