Modelo do MongoDB para BigQuery

Esse modelo cria um pipeline em lote que lê documentos do MongoDB e os grava no BigQuery.

Se você quiser capturar dados de fluxo de alterações do MongoDB, use o modelo do MongoDB para BigQuery (CDC).

Requisitos de pipeline

  • O conjunto de dados de destino do BigQuery precisa existir.
  • A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.

Formato da saída

O formato dos registros de saída depende do valor do parâmetro userOption. Se userOption for NONE, a saída terá o esquema a seguir. O campo source_data contém o documento no formato JSON.

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Se userOption for FLATTEN, o pipeline nivelará os documentos e gravará os campos de nível superior como colunas de tabela. Por exemplo, suponha que os documentos na coleção do MongoDB contenham os seguintes campos:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

Usando FLATTEN, a saída tem o esquema a seguir. O campo timestamp é adicionado pelo modelo.

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Se userOption for JSON, o pipeline vai armazenar o documento no formato JSON do BigQuery. O BigQuery tem suporte integrado a dados JSON usando o tipo de dados JSON. Para mais informações, consulte Como trabalhar com dados JSON no GoogleSQL.

Parâmetros do modelo

Parâmetros obrigatórios

  • mongoDbUri: o URI de conexão do MongoDB no formato mongodb+srv://:@..
  • database: banco de dados no MongoDB para leitura da coleção. Por exemplo, my-db.
  • collection: nome da coleção no banco de dados do MongoDB. Por exemplo, my-collection.
  • userOption: FLATTEN, JSON ou NONE. FLATTEN nivela a linha para um único nível. JSON armazena o documento no formato JSON do BigQuery. NONE armazena todo o documento como uma STRING formatada em JSON. O padrão é: NENHUM.
  • outputTableSpec: a tabela do BigQuery em que será feita a gravação. Por exemplo, bigquery-project:dataset.output_table.

Parâmetros opcionais

  • KMSEncryptionKey: chave de criptografia do Cloud KMS para descriptografar a string de conexão uri do Mongodb. Se a chave do Cloud KMS for transmitida, a string de conexão uri do mongodb precisará ser transmitida de forma criptografada. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: filtro Bson no formato JSON. Por exemplo, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: se true, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Por exemplo, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform.

Função definida pelo usuário

Se quiser, estenda esse modelo gravando uma função definida pelo usuário (UDF) em JavaScript. O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma UDF, faça upload do arquivo JavaScript no Cloud Storage e defina os seguintes parâmetros de modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath O local do arquivo JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: um documento do MongoDB.
  • Saída: um objeto serializado como uma string JSON. Se userOption for NONE, o objeto JSON precisará incluir uma propriedade chamada _id que contenha o ID do documento.
  • Executar o modelo

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the MongoDB to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Cliquem em Executar job.

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.client.json.gson.GsonFactory;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiBatchOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.common.base.Strings;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.FileSystems;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.mongodb.FindQuery;
    import org.apache.beam.sdk.io.mongodb.MongoDbIO;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.BsonDocument;
    import org.bson.Document;
    
    /**
     * The {@link MongoDbToBigQuery} pipeline is a batch pipeline which ingests data from MongoDB and
     * outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery",
        category = TemplateCategory.BATCH,
        displayName = "MongoDB to BigQuery",
        description =
            "The MongoDB to BigQuery template is a batch pipeline that reads documents from MongoDB and writes them to "
                + "BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines."
        })
    public class MongoDbToBigQuery {
      /**
       * Options supported by {@link MongoDbToBigQuery}
       *
       * <p>Inherits standard configuration options.
       */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              BigQueryWriteOptions,
              BigQueryStorageApiBatchOptions,
              JavascriptDocumentTransformerOptions {}
    
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    
        BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    
        run(options);
      }
    
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri plain text or base64 encrypted with a specific KMS encryption key
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getBigQuerySchemaPath() != null) {
          // initialize FileSystem to read from GCS
          FileSystems.setDefaultPipelineOptions(options);
          String jsonSchema = getGcsFileAsString(options.getBigQuerySchemaPath());
          GsonFactory gf = new GsonFactory();
          bigquerySchema = gf.fromString(jsonSchema, TableSchema.class);
        } else if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        MongoDbIO.Read readDocuments =
            MongoDbIO.read()
                .withUri(mongoDbUri)
                .withDatabase(options.getDatabase())
                .withCollection(options.getCollection());
    
        String filterJson = options.getFilter();
        BsonDocument filter;
        if (!Strings.isNullOrEmpty(filterJson)
            && !(filter = BsonDocument.parse(filterJson)).isEmpty()) {
          readDocuments = readDocuments.withQueryFn(FindQuery.create().withFilters(filter));
        }
    
        pipeline
            .apply("Read Documents", readDocuments)
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Transform to TableRow",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
    
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                "Write to Bigquery",
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    A seguir