Modelo do MongoDB para BigQuery

Esse modelo cria um pipeline em lote que lê documentos do MongoDB e os grava no BigQuery.

Se você quiser capturar dados de fluxo de alterações do MongoDB, use o modelo do MongoDB para BigQuery (CDC).

Requisitos de pipeline

  • O conjunto de dados de destino do BigQuery precisa existir.
  • A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.

Formato da saída

O formato dos registros de saída depende do valor do parâmetro userOption. Se userOption for NONE, a saída terá o esquema a seguir. O campo source_data contém o documento no formato JSON.

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Se userOption for FLATTEN, o pipeline nivelará os documentos e gravará os campos de nível superior como colunas de tabela. Por exemplo, suponha que os documentos na coleção do MongoDB contenham os seguintes campos:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

Usando FLATTEN, a saída tem o esquema a seguir. O campo timestamp é adicionado pelo modelo.

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Se userOption for JSON, o pipeline vai armazenar o documento no formato JSON do BigQuery. O BigQuery tem suporte integrado a dados JSON usando o tipo de dados JSON. Para mais informações, consulte Como trabalhar com dados JSON no GoogleSQL.

Parâmetros do modelo

Parâmetros obrigatórios

  • mongoDbUri: o URI de conexão do MongoDB no formato mongodb+srv://:@..
  • database: banco de dados no MongoDB para leitura da coleção. Por exemplo, my-db.
  • collection: nome da coleção no banco de dados do MongoDB. Por exemplo, my-collection.
  • userOption: FLATTEN, JSON ou NONE. FLATTEN nivela a linha para um único nível. JSON armazena o documento no formato JSON do BigQuery. NONE armazena todo o documento como uma string formatada em JSON. O padrão é: NENHUM.
  • outputTableSpec: a tabela do BigQuery em que será feita a gravação. Por exemplo, bigquery-project:dataset.output_table.

Parâmetros opcionais

  • KMSEncryptionKey: chave de criptografia do Cloud KMS para descriptografar a string de conexão uri do Mongodb. Se a chave do Cloud KMS for transmitida, a string de conexão uri do mongodb precisará ser transmitida de forma criptografada. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: filtro Bson no formato JSON. Por exemplo, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: se true, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Por exemplo, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform.

Função definida pelo usuário

Se quiser, estenda esse modelo gravando uma função definida pelo usuário (UDF) em JavaScript. O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma UDF, faça upload do arquivo JavaScript no Cloud Storage e defina os seguintes parâmetros de modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath O local do arquivo JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: um documento do MongoDB.
  • Saída: um objeto serializado como uma string JSON. Se userOption for NONE, o objeto JSON precisará incluir uma propriedade chamada _id que contenha o ID do documento.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the MongoDB to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.

    A seguir