Modelo do MongoDB para BigQuery (Stream)

Este modelo cria um pipeline de streaming que funciona com fluxos de alterações do MongoDB. Para usar esse modelo, publique os dados do fluxo de alterações no Pub/Sub. O pipeline lê os registros JSON do Pub/Sub e os grava no BigQuery. Os registros gravados no BigQuery têm o mesmo formato que o modelo de lote do MongoDB para BigQuery.

Requisitos de pipeline

  • O conjunto de dados de destino do BigQuery precisa existir.
  • A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.
  • É preciso criar um tópico do Pub/Sub para ler o fluxo de alterações. Enquanto o pipeline estiver em execução, detecte eventos de captura de dados alterados (CDC) no fluxo de alterações do MongoDB e publique-os no Pub/Sub como registros JSON. Para mais informações sobre como publicar mensagens no Pub/Sub, consulte Publicar mensagens em tópicos.
  • Este modelo usa fluxos de alterações do MongoDB. Não é compatível com a captura de dados alterados do BigQuery.

Parâmetros do modelo

Parâmetros obrigatórios

  • mongoDbUri: o URI de conexão do MongoDB no formato mongodb+srv://:@..
  • database: banco de dados no MongoDB para leitura da coleção. Exemplo: my-db.
  • collection: nome da coleção dentro do banco de dados MongoDB. Exemplo: my-collection.
  • userOption : FLATTEN, JSON ou NONE. FLATTEN nivela a linha para um único nível. JSON armazena o documento no formato JSON do BigQuery. NONE armazena todo o documento como uma STRING formatada em JSON. O padrão é: NENHUM.
  • inputTopic: o tópico de entrada do Pub/Sub que que será lido, no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputTableSpec: a tabela do BigQuery a ser gravada. Por exemplo, bigquery-project:dataset.output_table.

Parâmetros opcionais

  • useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • KMSEncryptionKey: chave de criptografia do Cloud KMS para descriptografar a string de conexão uri do Mongodb. Se a chave do Cloud KMS for transmitida, a string de conexão uri do mongodb precisará ser transmitida de forma criptografada. (Exemplo: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
  • filter : filtro Bson no formato JSON. (Exemplo: { "val": { $gt: 0, $lt: 9 }}).
  • useStorageWriteApi: se verdadeiro, o pipeline usará a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro.
  • bigQuerySchemaPath : o caminho do Cloud Storage para o esquema JSON do BigQuery. (Exemplo: gs://your-bucket/your-schema.json).
  • javascriptDocumentTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Exemplo: gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Exemplo: transform.

Função definida pelo usuário

Se quiser, estenda esse modelo gravando uma função definida pelo usuário (UDF) em JavaScript. O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma UDF, faça upload do arquivo JavaScript no Cloud Storage e defina os seguintes parâmetros de modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath O local do arquivo JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: um documento do MongoDB.
  • Saída: um objeto serializado como uma string JSON.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the MongoDB (CDC) to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    • INPUT_TOPIC: o tópico de entrada do Pub/Sub.

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
    • MONGO_DB_URI: o URI do MongoDB.
    • DATABASE: o banco de dados do MongoDB.
    • COLLECTION: sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    • INPUT_TOPIC: o tópico de entrada do Pub/Sub.

    A seguir