Modelo do MongoDB para o BigQuery (CDC)

Este modelo cria um pipeline de streaming que funciona com fluxos de alterações do MongoDB. Para usar esse modelo, publique os dados do fluxo de alterações no Pub/Sub. O pipeline lê os registros JSON do Pub/Sub e os grava no BigQuery. Os registros gravados no BigQuery têm o mesmo formato que o modelo de lote do MongoDB para BigQuery.

Requisitos de pipeline

  • O conjunto de dados de destino do BigQuery precisa existir.
  • A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.
  • É preciso criar um tópico do Pub/Sub para ler o fluxo de alterações. Enquanto o pipeline estiver em execução, detecte eventos de captura de dados alterados (CDC) no fluxo de alterações do MongoDB e publique-os no Pub/Sub como registros JSON. Para mais informações sobre como publicar mensagens no Pub/Sub, consulte Publicar mensagens em tópicos.

Parâmetros do modelo

Parâmetro Descrição
mongoDbUri URI de conexão do MongoDB no formato mongodb+srv://:@.
database Banco de dados no MongoDB para leitura da coleção. Por exemplo, my-db.
collection Nome da coleção dentro do banco de dados MongoDB. Por exemplo, my-collection.
outputTableSpec Tabela do BigQuery a ser gravada. Por exemplo, bigquery-project:dataset.output_table.
userOption FLATTEN ou NONE. FLATTEN nivela os documentos no primeiro nível. NONE armazena todo o documento como uma string JSON.
inputTopic O tópico de entrada do tópico do Pub/Sub que será lido, no formato de projects/<project>/topics/<topic>.
javascriptDocumentTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptDocumentTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
useStorageWriteApi (Opcional) Se true, o pipeline usará a API BigQuery Storage Write. O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write.
useStorageWriteApiAtLeastOnce (Opcional) Ao usar a API Storage Write, especifica a semântica de gravação. Para usar semântica pelo menos uma vez, defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
numStorageWriteApiStreams (Opcional) Ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.
storageWriteApiTriggeringFrequencySec (Opcional) Ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Função definida pelo usuário

Se quiser, estenda esse modelo gravando uma função definida pelo usuário (UDF) em JavaScript. O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma UDF, faça upload do arquivo JavaScript no Cloud Storage e defina os seguintes parâmetros de modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath O local do arquivo JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: um documento do MongoDB.
  • Saída: um objeto serializado como uma string JSON.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the MongoDB to BigQuery (CDC) template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC
    

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN ou NENHUM.
    • INPUT_TOPIC: o tópico de entrada do Pub/Sub.

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN ou NENHUM.
    • INPUT_TOPIC: o tópico de entrada do Pub/Sub.

    A seguir