Este modelo cria um pipeline de streaming que funciona com fluxos de alterações do MongoDB. Para usar esse modelo, publique os dados do fluxo de alterações no Pub/Sub. O pipeline lê os registros JSON do Pub/Sub e os grava no BigQuery. Os registros gravados no BigQuery têm o mesmo formato que o modelo de lote do MongoDB para BigQuery.
Requisitos de pipeline
- O conjunto de dados de destino do BigQuery precisa existir.
- A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.
- É preciso criar um tópico do Pub/Sub para ler o fluxo de alterações. Enquanto o pipeline estiver em execução, detecte eventos de captura de dados alterados (CDC) no fluxo de alterações do MongoDB e publique-os no Pub/Sub como registros JSON. Para mais informações sobre como publicar mensagens no Pub/Sub, consulte Publicar mensagens em tópicos.
- Este modelo usa fluxos de alterações do MongoDB. Não é compatível com a captura de dados alterados do BigQuery.
Parâmetros do modelo
Parâmetros obrigatórios
- mongoDbUri: o URI de conexão do MongoDB no formato
mongodb+srv://:@.
. - database: banco de dados no MongoDB para leitura da coleção. Exemplo: my-db.
- collection: nome da coleção dentro do banco de dados MongoDB. Exemplo: my-collection.
- userOption :
FLATTEN
,JSON
ouNONE
.FLATTEN
nivela a linha para um único nível.JSON
armazena o documento no formato JSON do BigQuery.NONE
armazena todo o documento como uma STRING formatada em JSON. O padrão é: NENHUM. - inputTopic: o tópico de entrada do Pub/Sub que que será lido, no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
- outputTableSpec: a tabela do BigQuery a ser gravada. Por exemplo,
bigquery-project:dataset.output_table
.
Parâmetros opcionais
- useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como
true
. Para usar semântica exatamente uma vez, defina o parâmetro comofalse
. Esse parâmetro se aplica apenas quandouseStorageWriteApi
étrue
. O valor padrão éfalse
. - KMSEncryptionKey: chave de criptografia do Cloud KMS para descriptografar a string de conexão uri do Mongodb. Se a chave do Cloud KMS for transmitida, a string de conexão uri do mongodb precisará ser transmitida de forma criptografada. (Exemplo: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
- filter : filtro Bson no formato JSON. (Exemplo: { "val": { $gt: 0, $lt: 9 }}).
- useStorageWriteApi: se verdadeiro, o pipeline usará a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é
false
. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro. Padrão: 0. - storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro. - bigQuerySchemaPath : o caminho do Cloud Storage para o esquema JSON do BigQuery. (Exemplo: gs://your-bucket/your-schema.json).
- javascriptDocumentTransformGcsPath: o URI do Cloud Storage do arquivo
.js
que define a função JavaScript definida pelo usuário (UDF) a ser usada. Exemplo: gs://your-bucket/your-transforms/*.js. - javascriptDocumentTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Exemplo: transform.
Função definida pelo usuário
Se quiser, estenda esse modelo gravando uma função definida pelo usuário (UDF) em JavaScript. O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.
Para usar uma UDF, faça upload do arquivo JavaScript no Cloud Storage e defina os seguintes parâmetros de modelo:
Parâmetro | Descrição |
---|---|
javascriptDocumentTransformGcsPath |
O local do arquivo JavaScript no Cloud Storage. |
javascriptDocumentTransformFunctionName |
O nome da função JavaScript. |
Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
Especificação da função
A UDF tem a seguinte especificação:
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the MongoDB (CDC) to BigQuery template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: o nome da tabela de destino do BigQuery.MONGO_DB_URI
: o URI do MongoDB.DATABASE
: o banco de dados do MongoDB.COLLECTION
: sua coleção do MongoDB.USER_OPTION
: FLATTEN, JSON ou NONE.INPUT_TOPIC
: o tópico de entrada do Pub/Sub.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: o nome da tabela de destino do BigQuery.MONGO_DB_URI
: o URI do MongoDB.DATABASE
: o banco de dados do MongoDB.COLLECTION
: sua coleção do MongoDB.USER_OPTION
: FLATTEN, JSON ou NONE.INPUT_TOPIC
: o tópico de entrada do Pub/Sub.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.