Modelos de streaming do Dataflow fornecidos pelo Google

O Google fornece um conjunto de modelos de código aberto (em inglês) do Cloud Dataflow.

Esses modelos do Dataflow ajudam você a resolver tarefas grandes de dados, incluindo importação e exportação de dados, backup e restauração de dados, além de operações em massa da API. Tudo isso sem o uso de um ambiente de desenvolvimento dedicado. Os modelos são criados no Apache Beam e usam o Dataflow para transformar os dados.

Para informações gerais sobre modelos, consulte Modelos do Dataflow. Para uma lista de todos os modelos fornecidos pelo Google, consulte Primeiros passos com os modelos fornecidos pelo Google.

Neste guia, você verá os modelos de streaming.

Assinatura do Pub/Sub para BigQuery

A inscrição Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma assinatura de Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • O campo data das mensagens do Pub/Sub precisa usar o formato JSON, conforme descrito neste guia JSON. Por exemplo, mensagens com valores no campo data formatados como {"k1":"v1", "k2":"v2"} podem ser inseridos em uma tabela do BigQuery com duas colunas, k1 e k2, com um tipo de dados de string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription O tópico de entrada do Cloud Pub/Sub que será lido, no formato de projects/<project>/subscriptions/<subscription>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada OUTPUT_TABLE_SPEC_error_records.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

Como executar a inscrição Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Subscription to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

Tópico do Pub/Sub para BigQuery

O tópico do Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de um tópico do Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • O campo data das mensagens do Pub/Sub precisa usar o formato JSON, conforme descrito neste guia JSON. Por exemplo, mensagens com valores no campo data formatados como {"k1":"v1", "k2":"v2"} podem ser inseridos em uma tabela do BigQuery com duas colunas, k1 e k2, com um tipo de dados de string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico de entrada do tópico do Pub/Sub que será lido, no formato de projects/<project>/topics/<topic>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não chegaram à tabela de saída. É preciso usar o formato <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

Como executar o tópico do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Topic to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

Avro do Pub/Sub para BigQuery

O modelo do Avro do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do Avro de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Requisitos para esse pipeline

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema para os registros do Avro precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
schemaPath O local do Cloud Storage do arquivo de esquema do Avro. Por exemplo, gs://path/to/my/schema.avsc.
inputSubscription A assinatura de entrada do Pub/Sub a ser lida. Por exemplo, projects/<project>/subscriptions/<subscription>.
outputTopic O tópico do Pub/Sub a ser usado para registros não processados. Por exemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec O local da tabela de saída do BigQuery. Por exemplo, <my-project>:<my-dataset>.<my-table>. Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário.
writeDisposition (Opcional) O WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Padrão: WRITE_APPEND
createDisposition (Opcional) O CreateDisposition do BigQuery. Por exemplo: CREATE_IF_NEEDED e CREATE_NEVER. Padrão: CREATE_IF_NEEDED

Como executar o Avro do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Avro to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

Buffer de protocolo do Pub/Sub para BigQuery

O modelo de buffer de protocolo do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do buffer de protocolo de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Uma função definida pelo usuário (UDF) do JavaScript pode ser fornecida para transformar dados. Erros ao executar a UDF podem ser enviados para um tópico separado do Pub/Sub ou para o mesmo tópico não processado como os erros do BigQuery.

Requisitos para este pipeline:

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema dos registros do buffer de protocolo precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.
  • Se a tabela do BigQuery existir, ela precisará ter um esquema que corresponda aos dados proto, independentemente do valor createDisposition.

Parâmetros do modelo

Parâmetro Descrição
protoSchemaPath O local do Cloud Storage do arquivo de esquema proto independente. Por exemplo, gs://path/to/my/file.pb Esse arquivo pode ser gerado com a sinalização --descriptor_set_out do comando protoc. A sinalização --include_imports garante que o arquivo seja independente.
fullMessageName O nome completo da mensagem proto. Por exemplo, package.name.MessageName, em que package.name é o valor fornecido para a instrução package, e não para a instrução java_package.
inputSubscription A assinatura de entrada do Pub/Sub a ser lida. Por exemplo, projects/<project>/subscriptions/<subscription>.
outputTopic O tópico do Pub/Sub a ser usado para registros não processados. Por exemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec O local da tabela de saída do BigQuery. Por exemplo, my-project:my_dataset.my_table Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o arquivo de esquema de entrada.
preserveProtoFieldNames (Opcional) true para preservar o nome do campo do Proto original no JSON. false para usar mais nomes JSON padrão. Por exemplo, false mudaria field_name para fieldName. (Padrão: false)
bigQueryTableSchemaPath Opcional: caminho do Cloud Storage para o caminho do esquema do BigQuery. Por exemplo, gs://path/to/my/schema.json Se isso não for fornecido, o esquema será inferido do esquema Proto.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
udfOutputTopic (Opcional) O tópico do Pub/Sub que armazena os erros da UDF. Por exemplo, projects/<project-id>/topics/<topic-name> Se isso não for fornecido, os erros de UDF serão enviados para o mesmo tópico que outputTopic.
writeDisposition (Opcional) O WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Padrão: WRITE_APPEND.
createDisposition (Opcional) O CreateDisposition do BigQuery. Por exemplo: CREATE_IF_NEEDED e CREATE_NEVER. Padrão: CREATE_IF_NEEDED.

Como executar o tópico do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Proto to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Proto (por exemplo, gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME: o nome da mensagem do Proto (por exemplo, package.name.MessageName)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • UNPROCESSED_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Proto (por exemplo, gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME: o nome da mensagem do Proto (por exemplo, package.name.MessageName)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • UNPROCESSED_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

Pub/Sub para Pub/Sub

O modelo do Pub/Sub para Pub/Sub é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava as mensagens em outro tópico do Pub/Sub. O pipeline também aceita uma chave de atributo de mensagem opcional e um valor que pode ser usado para filtrar as mensagens que precisam ser gravadas no tópico do Pub/Sub. Use esse modelo para copiar mensagens de uma assinatura do Pub/Sub para outro tópico do Pub/Sub com um filtro de mensagem opcional.

Requisitos para este pipeline:

  • A inscrição do Pub/Sub de origem precisa ser criada antes da execução.
  • A assinatura de origem do Pub/Sub precisa ser uma assinatura de pull.
  • O tópico do Pub/Sub de destino precisa ser criado antes da execução.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Assinatura do Pub/Sub em que será lida a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tópico do Cloud Pub/Sub onde será gravada a saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
filterKey (Opcional) Filtra eventos com base em uma chave de atributo. Nenhum filtro será aplicado se filterKey não for especificado.
filterValue (Opcional) Filtra o valor do atributo a ser usado no caso de um filterKey ser fornecido. Um filterValue nulo é usado por padrão.

Como executar o modelo do Pub/Sub para Pub/Sub

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. Correspondências parciais (como substrings) não são filtradas. Um valor de filtro de evento nulo é usado por padrão.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. Correspondências parciais (como substrings) não são filtradas. Um valor de filtro de evento nulo é usado por padrão.

Pub/Sub para Splunk

O modelo do Pub/Sub para Splunk é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava o payload da mensagem no Splunk pelo Coletor de eventos HTTP (HEC, na sigla em inglês) do Splunk. O caso de uso mais comum desse modelo é exportar registros para o Splunk. Para ver um exemplo do fluxo de trabalho subjacente, consulte Como implantar exportações de registro prontas para produção no Splunk usando o Dataflow.

Antes de gravar no Splunk, também é possível aplicar uma função JavaScript definida pelo usuário ao payload da mensagem. Todas as mensagens que apresentam falhas de processamento são encaminhadas para um tópico não processado do Pub/Sub para posterior resolução de problemas e reprocessamento.

Como uma camada extra de proteção para o token HEC, é possível passar uma chave do Cloud KMS com o parâmetro de token HEC codificado em base64 criptografado com a chave do Cloud KMS. Consulte o endpoint de criptografia da API Cloud KMS para saber mais detalhes sobre como criptografar o parâmetro de token do HEC.

Requisitos para este pipeline:

  • A inscrição do Pub/Sub de origem precisa existir antes da execução do pipeline.
  • O tópico do Pub/Sub precisa existir antes de o pipeline ser executado.
  • O endpoint HEC de Splunk precisa ser acessível pela rede dos workers do Dataflow.
  • O token Splunk de HEC precisa ser gerado e estar disponível.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription A inscrição do Pub/Sub a partir da qual ler a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
token (Opcional) O token de autenticação HEC do Splunk. Precisará ser fornecido se tokenSource estiver definido como PLAINTEXT ou KMS.
url O URL de HEC do Splunk. Precisa ser roteável a partir da VPC na qual o pipeline é executado. Por exemplo, https://splunk-hec-host:8088.
outputDeadletterTopic O tópico do Pub/Sub para encaminhar mensagens não entregues. Por exemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
batchCount (Opcional) O tamanho do lote para enviar vários eventos para o Splunk. Padrão 1 (sem loteamento).
parallelism (Opcional) O número máximo de solicitações paralelas. Padrão 1 (sem paralelismo).
disableCertificateValidation (Opcional) Desativar a validação do certificado SSL. Falso padrão (validação ativada). Se for verdadeiro, os certificados não serão validados (todos os certificados são confiáveis) e o parâmetro `rootCaCertificatePath` será ignorado.
includePubsubMessage (Opcional) Inclua a mensagem completa do Pub/Sub no payload. Falso padrão (somente o elemento de dados está incluído no payload).
tokenSource Origem do token. Um de PLAINTEXT, KMS ou SECRET_MANAGER. Esse parâmetro precisa ser fornecido se o Secret Manager for usado. Se tokenSource estiver definido como KMS, tokenKMSEncryptionKey e a criptografia token precisarão ser fornecidas. Se tokenSource for definido como SECRET_MANAGER, tokenSecretId precisará ser fornecido. Se tokenSource for definido como PLAINTEXT, token precisará ser fornecido.
tokenKMSEncryptionKey (Opcional) A chave do Cloud KMS para descriptografar a string do token HEC. Esse parâmetro precisará ser fornecido se o tokenSource estiver definido como KMS. Se a chave do Cloud KMS for fornecida, a string do token HEC precisará ser transmitida de forma criptografada.
tokenSecretId (Opcional) O ID do secret do Gerenciador de secrets do token. Esse parâmetro precisará ser fornecido se o tokenSource estiver definido como SECRET_MANAGER. É necessário seguir este formato projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath (Opcional) O URL completo do certificado de CA raiz no Cloud Storage. Por exemplo, gs://mybucket/mycerts/privateCA.crt O certificado fornecido no Cloud Storage precisa ser codificado em DER e pode ser fornecido em codificação binária ou imprimível (Base64). Se o certificado for fornecido na codificação Base64, ele precisará ser delimitado no início por -----BEGIN CERTIFICATE----- e no final por -----END CERTIFICATE-----. Se esse parâmetro for fornecido, esse arquivo de certificado de CA particular é buscado e adicionado ao armazenamento de confiança do worker do Dataflow para verificar o certificado SSL do endpoint do Splunk HEC. Se esse parâmetro não for fornecido, o armazenamento de confiança padrão será usado.
enableBatchLogs (Opcional) Especifica se os registros devem ser ativados para lotes gravados no Splunk. Padrão: true.
enableGzipHttpCompression (Opcional) Especifica se as solicitações HTTP enviadas ao HEC do Splunk serão compactadas (conteúdo gzip codificado). Padrão: true.

Como executar o modelo do Pub/Sub para o Splunk

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Splunk template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL
  • ROOT_CA_CERTIFICATE_PATH: o caminho para o certificado de CA raiz no Cloud Storage (por exemplo, gs://your-bucket/privateCA.crt)

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL
  • ROOT_CA_CERTIFICATE_PATH: o caminho para o certificado de CA raiz no Cloud Storage (por exemplo, gs://your-bucket/privateCA.crt)

Pub/Sub para arquivos Avro no Cloud Storage

O modelo do Pub/Sub para arquivos do Avro no Cloud Storage é um pipeline de streaming que lê dados de um tópico do Pub/Sub e grava arquivos Avro no bucket especificado do Cloud Storage.

Requisitos para este pipeline:

  • O tópico de entrada do Pub/Sub precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputTopic Tópico do Pub/Sub que será assinado para consumo de mensagens. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory Diretório em que os arquivos Avro de saída são arquivados. Precisa conter / no final. Por exemplo, gs://example-bucket/example-directory/.
avroTempDirectory Diretório dos arquivos Avro temporários. Precisa conter / no final. Por exemplo, gs://example-bucket/example-directory/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída para os arquivos Avro.
outputFilenameSuffix [Opcional] Sufixo do nome do arquivo de saída para os arquivos Avro.
outputShardTemplate [Opcional] O modelo de fragmento do arquivo de saída. Ela é especificada como sequências repetidas das letras S ou N. Por exemplo, SSS-NNN. Eles são substituídos pelo número do fragmento ou pelo número total de fragmentos, respectivamente. Quando esse parâmetro não for especificado, o formato de modelo padrão será W-P-SS-of-NN.

Como executar o modelo Avro do Pub/Sub com o Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Avro Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência

Tópico do Pub/Sub para arquivos de texto no Cloud Storage

O modelo Cloud Pub/Sub para Cloud Storage Text é um pipeline de streaming que lê registros do Cloud Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos para este pipeline:

  • O tópico do Pub/Sub precisa existir antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory O caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
outputFilenamePrefix O prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-.
outputFilenameSuffix O sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv.
outputShardTemplate O modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são gerados em um único arquivo por janela. O outputShardTemplate padrão é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate é 00-of-01.

Como executar o modelo do Pub/Sub para arquivos de texto no Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

Tópico do Pub/Sub ou assinatura de arquivos de texto no Cloud Storage

O modelo do Pub/Sub para o Cloud Storage Text é um pipeline de streaming que lê registros do Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos para este pipeline:

  • O tópico ou a assinatura do Pub/Sub precisa ter sido criado antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>. Se esse parâmetro for fornecido, inputSubscription não poderá ser fornecido.
inputSubscription O tópico do Pub/Sub em que a entrada será lida. O nome da assinatura precisa estar no formato projects/<project-id>/subscription/<subscription-name>. Se esse parâmetro for fornecido, inputTopic não deverá ser fornecido.
outputDirectory O caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
outputFilenamePrefix O prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-.
outputFilenameSuffix O sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv.
outputShardTemplate O modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são gerados em um único arquivo por janela. O outputShardTemplate padrão é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate é 00-of-01.
windowDuration (Opcional) A duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é 5 min, com um mínimo de 1 s. Os formatos permitidos são: [int]s para segundos (5 s, por exemplo); [int]m para minutos (12 min, por exemplo); [int]h para horas (2h, por exemplo).

Como executar o tópico do Pub/Sub ou a assinatura de arquivos de texto no modelo do Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

Pub/Sub para MongoDB

O modelo do Pub/Sub para MongoDB é um pipeline de streaming que lê mensagens JSON codificadas de uma assinatura do Pub/Sub e grava no MongoDB como documentos. Caso seja necessário, o pipeline é compatível com transformações extras que podem ser incluídas usando uma função definida pelo usuário (UDF) do JavaScript. Qualquer erro ocorrido devido à incompatibilidade de esquema, JSON malformado ou ao executar transformações são gravados em uma tabela do BigQuery para mensagens não processadas, juntamente com mensagens de entrada. Se não existir uma tabela para registros não processados antes da execução, o pipeline criará automaticamente essa tabela.

Requisitos para este pipeline:

  • A assinatura do Pub/Sub precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • O cluster do MongoDB precisa existir e poder ser acessado das máquinas de trabalho do Dataflow.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Nome da assinatura do Pub/Sub. Exemplo: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Lista separada por vírgulas dos servidores MongoDB. Exemplo: 192.285.234.12:27017,192.287.123.11:27017
database Banco de dados no MongoDB para armazenar a coleção. Por exemplo, my-db.
collection Nome da coleção dentro do banco de dados MongoDB. Por exemplo, my-collection.
deadletterTable Tabela do BigQuery que armazena mensagens geradas por falhas (esquema incorreto, JSON incorreto etc.). Por exemplo, project-id:dataset-name.table-name.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
batchSize (Opcional) Tamanho do lote usado para a inserção de documentos em lote no MongoDB. Padrão: 1000.
batchSizeBytes (Opcional) Tamanho do lote em bytes. Padrão: 5242880.
maxConnectionIdleTime (Opcional) Tempo máximo de inatividade permitido em segundos antes que o tempo limite da conexão ocorra. Padrão: 60000.
sslEnabled (Opcional) Valor booleano que indica se a conexão com o MongoDB está ativada para SSL. Padrão: true.
ignoreSSLCertificate (Opcional) Valor booleano que indica se o certificado SSL deve ser ignorado. Padrão: true.
withOrdered (Opcional) Valor booleano que permite inserções ordenadas em massa para o MongoDB. Padrão: true.
withSSLInvalidHostNameAllowed (Opcional) Valor booleano que indica se o nome do host inválido é permitido para conexão SSL. Padrão: true.

Como executar o modelo do Pub/Sub para MongoDB

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to MongoDB template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

Pub/Sub para Elasticsearch

O modelo do Pub/Sub para Elasticsearch é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub, executa uma função definida pelo usuário (UDF, na sigla em inglês) e as grava no Elasticsearch como documentos. O modelo do Dataflow usa o recurso de fluxos de dados do Elasticsearch para armazenar dados de série temporal em vários índices, oferecendo um único recurso nomeado para solicitações. Esses fluxos são adequados para registros, métricas, rastros e outros dados gerados continuamente armazenados no Pub/Sub.

Requisitos para esse pipeline

  • A assinatura do Pub/Sub de origem precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • Um host Elasticsearch acessível publicamente em uma instância do GCP ou no Elastic Cloud com o Elasticsearch versão 7.0 ou mais recente. Consulte Integração do Google Cloud para Elastic para ver mais detalhes.
  • Um tópico do Pub/Sub para saída de erros.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription A assinatura do Pub/Sub a ser consumida. O nome precisa estar no formato projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl URL do Elasticsearch no formato https://hostname:[port] ou especifique o CloudID se estiver usando o Elastic Cloud.
apiKey Chave da API codificada em Base64 usada para autenticação.
errorOutputTopic Tópico de saída do Pub/Sub para publicar registros com falha no formato de projects/<project-id>/topics/<topic-name>
dataset (Opcional) O tipo de registros enviados via Pub/Sub, para os quais temos um painel pronto para uso. Os valores de tipos de registro conhecidos são audit, vpcflow e firewall. Padrão: pubsub.
namespace (Opcional) Um agrupamento arbitrário, como um ambiente (dev, prod ou qa), uma equipe ou uma unidade de negócios estratégica. Padrão: default.
batchSize (Opcional) Tamanho do lote em número de documentos. Padrão: 1000.
batchSizeBytes (Opcional) Tamanho do lote em número de bytes. Padrão: 5242880 (5 mb).
maxRetryAttempts (Opcional) Máximo de tentativas de repetição. Precisa ser > 0. Padrão: no retries.
maxRetryDuration (Opcional) A duração máxima da nova tentativa em milissegundos precisa ser maior que 0. Padrão: no retries.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
propertyAsIndex (Opcional) Uma propriedade no documento que está sendo indexado com o valor que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa (tem precedência sobre uma UDF _index). Padrão: none.
propertyAsId (Opcional) Uma propriedade no documento que está sendo indexado com o valor que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa (tem precedência sobre uma UDF _id). Padrão: none.
javaScriptIndexFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIndexFnName (Opcional) Nome da função UDF em JavaScript para a função que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIdFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIdFnName (Opcional) Nome da função UDF em JavaScript para a função que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptTypeFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que especificará os metadados de _type a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptTypeFnName (Opcional) Nome da função UDF em JavaScript para a função que especificará os metadados de _type a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIsDeleteFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que determina se o documento deve ser excluído em vez de inserido ou atualizado. A função precisa retornar o valor da string "true" ou "false". Padrão: none.
javaScriptIsDeleteFnName (Opcional) Nome da função UDF em JavaScript de uma função que vai determinar se o documento deve ser excluído em vez de inserido ou atualizado. A função precisa retornar o valor da string "true" ou "false". Padrão: none.
usePartialUpdate (Opcional) Indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. Padrão: false.
bulkInsertMethod (Opcional) Indica se é necessário usar INDEX (índice, permite ajustes) ou CREATE (criar, erros em _id duplicados) com solicitações em massa do Elasticsearch. Padrão: CREATE.

Como executar o Pub/Sub para o modelo do MongoDB

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

Datastream para o Cloud Spanner

O modelo do Datastream para Cloud Spanner é um pipeline de streaming que lê eventos do Datastream de um bucket do Cloud Storage e os grava em um banco de dados do Cloud Spanner. Ela é destinada à migração de dados de fontes do Datastream para o Cloud Spanner.

Todas as tabelas necessárias para migração precisam existir no banco de dados de destino do Cloud Spanner antes da execução do modelo. Portanto, a migração de esquema de um banco de dados de origem para o Cloud Spanner de destino precisa ser concluída antes da migração de dados. Os dados podem existir nas tabelas antes da migração. Esse modelo não propaga alterações de esquema do Datastream no banco de dados do Cloud Spanner.

A consistência de dados é garantida apenas no final da migração, quando todos os dados tiverem sido gravados no Cloud Spanner. Para armazenar informações de pedidos de cada registro gravado no Cloud Spanner, esse modelo cria uma tabela adicional (chamada de tabela de sombra) para cada tabela no banco de dados do Cloud Spanner. Isso é usado para garantir consistência no final da migração. As tabelas de sombra não são excluídas após a migração e podem ser usadas para fins de validação no final da migração.

Todos os erros que ocorrem durante a operação, como incompatibilidades de esquema, arquivos JSON malformados ou erros resultantes da execução de transformações, são registrados em uma fila de erros. A fila de erros é uma pasta do Cloud Storage que armazena todos os eventos do Datastream que encontraram erros, além do motivo do erro. em formato de texto. Os erros podem ser temporários ou permanentes e são armazenados em pastas apropriadas do Cloud Storage na fila de erros. Os erros temporários são repetidos automaticamente, ao contrário dos permanentes. No caso de erros permanentes, você tem a opção de fazer correções nos eventos de mudança e movê-los para o bucket recuperável enquanto o modelo estiver em execução.

Requisitos para este pipeline:

  • Um fluxo de Datastream no estado Em execução ou Não iniciado.
  • Um bucket do Cloud Storage em que os eventos do Datastream são replicados.
  • Um banco de dados do Cloud Spanner com tabelas existentes. Essas tabelas podem estar vazias ou conter dados.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse é o caminho raiz de um stream.
streamName O nome ou modelo do stream para pesquisar informações de esquema e tipo de origem.
instanceId A instância do Cloud Spanner em que as alterações são replicadas.
databaseId O banco de dados do Cloud Spanner em que as alterações são replicadas.
projectId O ID do projeto do Cloud Spanner.
deadLetterQueueDirectory (Opcional) Esse é o caminho do arquivo para armazenar a saída da fila de erros. O padrão é um diretório no local temporário do job do Dataflow.
inputFileFormat (Opcional) O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
shadowTablePrefix (Opcional) O prefixo usado para nomear tabelas de sombra. Padrão: shadow_.

Como executar o modelo do Datastream para o Cloud Spanner

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to Spanner template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Cloud Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Cloud Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Cloud Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Cloud Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

Arquivos de texto no Cloud Storage para BigQuery (stream)

O pipeline de arquivos de texto no Cloud Storage para BigQuery permite ler arquivos de texto armazenados no Cloud Storage, transformá-los usando uma função definida pelo usuário (UDF) do JavaScript fornecida por você e anexar o resultado no BigQuery.

O pipeline é executado de modo indefinido e precisa ser encerrado manualmente pelo comando cancelar e não drenar, porque ele usa a transformação Watch, que é um DoFn divisível sem suporte para drenagem.

Requisitos para este pipeline:

  • Crie um arquivo JSON que descreva o esquema da tabela de saída no BigQuery.

    Verifique se há uma matriz JSON de nível superior intitulada fields e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemplo:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Crie um arquivo JavaScript (.js) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    Por exemplo, esta função divide cada linha de um arquivo CSV e retorna uma string JSON depois de transformar os valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parâmetros do modelo

Parâmetro Descrição
javascriptTextTransformGcsPath O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
JSONPath Local do Cloud Storage do arquivo de esquema do BigQuery, descrito como um JSON. Por exemplo, gs://path/to/my/schema.json.
javascriptTextTransformFunctionName o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
outputTable A tabela do BigQuery totalmente qualificada. Exemplo: my-project:dataset.table
inputFilePattern Local do Cloud Storage do texto que você quer processar. Por exemplo, gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Diretório temporário para o processo de carregamento do BigQuery. Exemplo: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabela de mensagens que não alcançaram a tabela de saída. Por exemplo, my-project:dataset.my-unprocessed-table. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo Cloud Storage Text no BigQuery (Stream)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

Arquivos de texto no Cloud Storage para o Pub/Sub (Stream)

Esse modelo cria um pipeline de streaming que pesquisa continuamente novos arquivos de texto carregados no Cloud Storage, lê cada arquivo linha por linha e publica strings em um tópico do Pub/Sub. O modelo publica registros em um arquivo delimitado por uma nova linha contendo registros JSON ou em um arquivo CSV em um tópico do Pub/Sub para processamento em tempo real. É possível usar esse modelo para reproduzir dados novamente no Pub/Sub.

O pipeline é executado indefinidamente e precisa ser encerrado manualmente com um "cancel", e não com um "drain", porque ele usa uma transformação "Watch" que é um "SplittableDoFn" não compatível com drenagem.

Atualmente, o intervalo de pesquisa é fixo e definido para 10 segundos. Esse modelo não configura carimbos de data/hora nos registros individuais, assim o horário do evento é igual ao da publicação durante a execução. O pipeline não deve ser usado caso dependa de um tempo exato do evento para processamento.

Requisitos para este pipeline:

  • Os arquivos de entrada precisam estar no formato JSON ou CSV delimitado por nova linha. Registros que abrangem várias linhas nos arquivos de origem podem causar problemas no downstream, porque cada linha nos arquivos é publicada como uma mensagem para o Pub/Sub.
  • O tópico do Pub/Sub precisa existir antes da execução.
  • O pipeline é executado indefinidamente e precisa ser finalizado manualmente.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/files/*.json ou gs://bucket-name/path/*.csv.
outputTopic O tópico de entrada do Pub/Sub a ser gravado. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.

Como executar os arquivos de texto no Cloud Storage para o modelo Pub/Sub (stream)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).

Tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

O modelo de mascaramento de dados/tokenização do Cloud Storage para o BigQuery (usando o Cloud DLP) é um pipeline de streaming que lê arquivos csv de um bucket do Cloud Storage, chama a API Cloud Data Loss Prevention (Cloud DLP) para remoção identificação e grava os dados desidentificados na tabela especificada do BigQuery. Este modelo é compatível com o uso de um modelo de inspeção e um modelo de desidentificação, ambos do Cloud DLP. Isso permite que os usuários inspecionem informações potencialmente confidenciais e façam a desidentificação. Além disso, também é possível desidentificar os dados estruturados em que as colunas são especificadas para serem desidentificadas sem nenhuma inspeção necessária. Vale lembrar que este modelo não é compatível com um caminho regional para o local do modelo de desidentificação. Somente um caminho global é compatível.

Requisitos para este pipeline:

  • Os dados de entrada para tokenizar precisam existir.
  • Os modelos do Cloud DLP precisam existir (por exemplo, DeidentifyTemplate e InspectTemplate). Veja os modelos do Cloud DLP para mais detalhes.
  • O conjunto de dados do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern Arquivo(s) csv para ler registros de dados de entrada. O uso de caracteres curingas também é aceito. Por exemplo, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv.
dlpProjectId ID do projeto do Cloud DLP que tem o recurso da API Cloud DLP. Esse projeto do Cloud DLP pode ser o mesmo projeto que tem os modelos do Cloud DLP ou pode ser um projeto separado. Por exemplo, my_dlp_api_project.
deidentifyTemplateName Modelo de desidentificação do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por exemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de dados do BigQuery para enviar resultados tokenizados.
batchSize Tamanho do lote/divisão para enviar dados para inspecionar e/ou retirar a tokenização. Se for um arquivo csv, batchSize é o número de linhas em um lote. Os usuários precisam determinar o tamanho do lote com base no tamanho dos registros e no tamanho do arquivo. Observe que a API Cloud DLP tem um limite de tamanho de payload de 524 KB por chamada de API.
inspectTemplateName (Opcional) Modelo de inspeção do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/identifyTemplates/{idTemplateId}. Por exemplo, projects/my_project/identifyTemplates/100.

Como executar o modelo de tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

Substitua:

  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • STAGING_LOCATION: o local para organizar arquivos locais (por exemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).

Alterar a captura de dados do MySQL para o BigQuery usando o Debezium e o Pub/Sub (Stream)

O modelo da captura de dados de alteração do MySQL para o BigQuery usando o Debezium e o Pub/Sub é um pipeline de streaming que lê mensagens Pub/Sub com dados de alteração de um banco de dados MySQL e grava os registros no BigQuery. Um conector do Debezium captura alterações no banco de dados MySQL e publica os dados alterados no Pub/Sub. O modelo lê as mensagens do Pub/Sub e as grava no BigQuery.

É possível usar esse modelo para sincronizar bancos de dados MySQL e tabelas do BigQuery. O pipeline grava os dados alterados em uma tabela de preparo do BigQuery e atualiza intermitentemente uma tabela do BigQuery que replica o banco de dados MySQL.

Requisitos para este pipeline:

  • O conector do Debezium precisa ser implantado.
  • As mensagens do Pub/Sub precisam ser serializadas em uma Linha enviada.

Parâmetros do modelo

Parâmetro Descrição
inputSubscriptions A lista separada por vírgulas de assinaturas de entrada do Pub/Sub para leitura, no formato de <subscription>,<subscription>, ...
changeLogDataset O conjunto de dados do BigQuery para armazenar as tabelas de preparo, no formato de <my-dataset>
replicaDataset O local do conjunto de dados do BigQuery para armazenar as tabelas de réplica, no formato de <my-dataset>
updateFrequencySecs Opcional: o intervalo em que o pipeline atualiza a tabela do BigQuery replicando o banco de dados MySQL.

Como executar a captura de dados de alteração usando o Debezium e o MySQL do modelo do Pub/Sub para o BigQuery

Para executar esse modelo, siga estas etapas:

  1. Na máquina local, clone o repositório DataflowTemplates.
  2. Altere para o diretório v2/cdc-parent.
  3. Certifique-se de que o conector do Debezium esteja implantado.
  4. Usando o Maven, execute o modelo do Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Substitua:

    • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
    • SUBSCRIPTIONS: sua lista separada por vírgulas de nomes de assinatura do Pub/Sub
    • CHANGELOG_DATASET: seu conjunto de dados do BigQuery para dados do registro de alterações
    • REPLICA_DATASET: seu conjunto de dados do BigQuery para tabelas replicadas

Apache Kafka para BigQuery

O modelo Apache Kafka para BigQuery é um pipeline de streaming que ingere dados de texto do Apache Kafka, executa uma função definida pelo usuário (UDF) e gera os registros resultantes no BigQuery. Qualquer erro que ocorrer na transformação dos dados, na execução da UDF ou na inserção na tabela de respostas será inserido em uma tabela de erros separada no BigQuery. Se a tabela de erros não existir antes da execução, ela será criada.

Requisitos para esse pipeline

  • A tabela de respostas do BigQuery precisa existir.
  • O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
  • Os tópicos do Apache Kafka precisam existir, e as mensagens precisam estar codificadas em um formato JSON válido.

Parâmetros do modelo

Parâmetro Descrição
outputTableSpec O local da tabela de respostas do BigQuery para gravar as mensagens do Apache Kafka, no formato de my-project:dataset.table
inputTopics Os tópicos de entrada do Apache Kafka para leitura em uma lista separada por vírgulas. Exemplo: messages
bootstrapServers O endereço do host dos servidores do agente do Apache Kafka em execução em uma lista separada por vírgulas, cada endereço de host no formato 35.70.252.199:9092
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
outputDeadletterTable Opcional: a tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de my-project:dataset.my-deadletter-table. Se não existir, a tabela será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo do Apache Kafka para BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Kafka to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.

Para mais informações, consulte Gravar dados do Kafka no BigQuery com o Dataflow.

Datastream para o BigQuery (stream)

O modelo do Datastream para o BigQuery é um pipeline de streaming que lê dados do Datastream e os replica no BigQuery. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e os replica em uma tabela de preparo particionada do BigQuery. Após a replicação, o modelo executa um MERGE no BigQuery para mesclar todas as alterações de change data capture (CDC) em uma réplica da tabela de origem.

O modelo lida com a criação e a atualização das tabelas do BigQuery gerenciadas pela replicação. Quando a linguagem de definição de dados (DDL) é obrigatória, um callback para o Datastream extrai o esquema da tabela de origem e o converte em tipos de dados do BigQuery. As operações compatíveis incluem:

  • Novas tabelas são criadas à medida que os dados são inseridos.
  • Novas colunas são adicionadas às tabelas do BigQuery com valores iniciais nulos.
  • As colunas descartadas são ignoradas no BigQuery, e os valores futuros são nulos.
  • As colunas renomeadas são adicionadas ao BigQuery como novas colunas.
  • As alterações de tipo não são propagadas para o BigQuery.

Requisitos para este pipeline:

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Os conjuntos de dados de destino do BigQuery são criados, e a conta de serviço do Compute Engine recebe acesso de administrador a eles.
  • Uma chave primária é necessária na tabela de origem para a criação da tabela de réplica de destino.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
gcsPubSubSubscription A assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
outputStagingDatasetTemplate O nome de um conjunto de dados existente para conter tabelas de preparo. É possível incluir o modelo {_metadata_dataset} como um marcador que é substituído pelo nome do seu conjunto de dados/esquema de origem (por exemplo, {_metadata_dataset}_log).
outputDatasetTemplate O nome de um conjunto de dados existente para conter tabelas de réplica. É possível incluir o modelo {_metadata_dataset} como um marcador que é substituído pelo nome do conjunto de dados/esquema de origem (por exemplo, {_metadata_dataset}).
deadLetterQueueDirectory O caminho do arquivo para armazenar todas as mensagens não processadas com o motivo de falha no processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é suficiente na maioria das condições.
outputStagingTableNameTemplate (Opcional) O modelo para o nome das tabelas de preparo. O padrão é {_metadata_table}_log. Se você estiver replicando vários esquemas, a sugestão é {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate (Opcional) O modelo para o nome das tabelas de réplica. Padrão: {_metadata_table}. Se você estiver replicando vários esquemas, a sugestão é {_metadata_schema}_{_metadata_table}.
outputProjectId Opcional: projeto para conjuntos de dados do BigQuery em que os dados serão gerados. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
streamName (Opcional) O nome ou modelo do stream para pesquisar informações de esquema. Padrão: {_metadata_stream}.
mergeFrequencyMinutes (Opcional) O número de minutos entre as mesclagens de uma determinada tabela. Padrão: 5.
dlqRetryMinutes (Opcional) O número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) Padrão: 10.
javascriptTextTransformGcsPath (Opcional) O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opcional) O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

Como executar o modelo do Datastream para o BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Datastream to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
  • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
  • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

Datastream para MySQL ou PostgreSQL (Stream)

O modelo Datastream para SQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer banco de dados MySQL ou PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica esses dados em tabelas de réplica do PostgreSQL.

O modelo não é compatível com a linguagem de definição de dados (DDL) e espera que todas as tabelas já existam no banco de dados. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver passado, uma versão que chega atrasada dessa linha será ignorada. A linguagem de manipulação de dados (DML) executada é uma melhor tentativa de replicar perfeitamente os dados de origem e de destino. As instruções DML executadas seguem as seguintes regras:

  • Se houver uma chave primária, as operações de inserção e atualização usarão sintaxe de mesclagem (isto é, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE.
  • Se houver chaves primárias, as exclusões serão replicadas como uma DML de exclusão.
  • Se não houver uma chave primária, as operações de inserção e atualização serão inseridas na tabela.
  • Se não houver chaves primárias, as exclusões serão ignoradas.

Se você estiver usando os utilitários Oracle para Postgres, adicione ROWID no PostgreSQL como a chave primária quando não houver nenhuma.

Os requisitos para esse pipeline são:

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Um banco de dados do PostgreSQL foi propagado com o esquema necessário.
  • O acesso à rede entre workers do Dataflow e o PostgreSQL está configurado.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
gcsPubSubSubscription A assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
databaseHost O host SQL para se conectar.
databaseUser O usuário do PostgreSQL com todas as permissões necessárias para gravar em todas as tabelas na replicação.
databasePassword A senha do usuário do PostgreSQL.
databasePort (Opcional) A porta do banco de dados do PostgreSQL para a conexão. Padrão: 5432.
databaseName (Opcional) O nome do banco de dados do PostgreSQL ao qual se conectar. Padrão: postgres.
streamName (Opcional) O nome ou modelo do stream para pesquisar informações de esquema. Padrão: {_metadata_stream}.

Como executar o modelo Datastream para SQL

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to SQL template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

Pub/Sub para Java Database Connectivity (JDBC)

O modelo do Pub/Sub para Java Database Connectivity (JDBC) é um pipeline de streaming que ingere dados de uma assinatura preexistente do Cloud Pub/Sub como strings JSON e grava os registros resultantes no JDBC.

Requisitos para este pipeline:

  • A inscrição do Cloud Pub/Sub precisa existir antes da execução do pipeline.
  • A origem JDBC precisa existir antes da execução do pipeline.
  • O tópico de mensagens inativas de saída do Cloud Pub/Sub precisa existir antes de o pipeline ser executado.

Parâmetros do modelo

Parâmetro Descrição
driverClassName O nome da classe do driver do JDBC. Por exemplo, com.mysql.jdbc.Driver.
connectionUrl A string do URL de conexão do JDBC. Por exemplo, jdbc:mysql://some-host:3306/sampledb Pode ser transmitida como uma string codificada em Base64 e depois criptografada com uma chave do Cloud KMS.
driverJars Caminhos do Cloud Storage separados por vírgulas para drivers JDBC. Por exemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username (Opcional) O nome do usuário a ser usado para a conexão JDBC. Pode ser transmitida como uma string criptografada em Base64 com uma chave do Cloud KMS.
password (Opcional) Senha a ser usada para a conexão JDBC. Pode ser transmitida como uma string criptografada em Base64 com uma chave do Cloud KMS.
connectionProperties (Opcional) String de propriedades a ser usada para a conexão JDBC. O formato da string precisa ser [propertyName=property;]*. Por exemplo, unicode=true;characterEncoding=UTF-8.
statement Instrução a ser executada no banco de dados. A instrução precisa especificar os nomes das colunas da tabela em qualquer ordem. Somente os valores dos nomes das colunas especificadas são lidos no JSON e adicionados à instrução. Por exemplo, INSERT INTO tableName (column1, column2) VALUES (?,?).
inputSubscription O tópico de entrada do Cloud Pub/Sub que será lido, no formato de projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic O tópico do Pub/Sub para encaminhar mensagens não entregues. Por exemplo, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey (Opcional) A chave de criptografia do Cloud KMS para descriptografar o nome de usuário, senha e string de conexão. Se a chave do Cloud KMS for transmitida, o nome de usuário, senha e string de conexão precisarão ser transmitidos criptografados.
extraFilesToStage Caminhos do Cloud Storage separados ou vírgulas do Secret Manager para que os arquivos sejam organizados no worker. Esses arquivos serão salvos no diretório /extra_files de cada worker. Por exemplo, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Como executar o modelo do Pub/Sub para Java Database Connectivity (JDBC)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to JDBC template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • DRIVER_CLASS_NAME: o nome da classe do driver
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • DRIVER_PATHS: os caminho(s) do Cloud Storage separado(s) por vírgula do(s) driver(s) JDBC
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • CONNECTION_PROPERTIES: as propriedades da conexão JDBC, se necessário
  • SQL_STATEMENT: a instrução SQL a ser executada no banco de dados
  • INPUT_SUBSCRIPTION: a assinatura de entrada do Pub/Sub da qual ler
  • OUTPUT_DEADLETTER_TOPIC: o tópico do Pub/Sub para encaminhar mensagens não entregues
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • DRIVER_CLASS_NAME: o nome da classe do driver
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • DRIVER_PATHS: os caminho(s) do Cloud Storage separado(s) por vírgula do(s) driver(s) JDBC
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • CONNECTION_PROPERTIES: as propriedades da conexão JDBC, se necessário
  • SQL_STATEMENT: a instrução SQL a ser executada no banco de dados
  • INPUT_SUBSCRIPTION: a assinatura de entrada do Pub/Sub da qual ler
  • OUTPUT_DEADLETTER_TOPIC: o tópico do Pub/Sub para encaminhar mensagens não entregues
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS

Fluxos de alterações do Cloud Spanner para o Cloud Storage

Os fluxos de alterações do Cloud Spanner para o modelo do Cloud Storage são um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em um bucket do Cloud Storage usando o Dataflow Runner V2.

O pipeline agrupa os registros de stream de alterações do Spanner em janelas com base no carimbo de data/hora, e cada janela representa uma duração de tempo cujo tamanho pode ser configurado com esse modelo. Todos os registros com carimbos de data/hora pertencentes à janela têm a garantia de estarem na janela. Não pode haver chegadas atrasadas. Também é possível definir vários fragmentos de saída. O pipeline cria um arquivo de saída do Cloud Storage por janela por fragmento. Em um arquivo de saída, os registros são desordenados. Os arquivos de saída podem ser gravados no formato JSON ou AVRO, dependendo da configuração do usuário.

Observe que é possível minimizar a latência da rede e os custos de transporte dela executando o job do Dataflow na mesma região da sua instância do Cloud Spanner ou do bucket do Cloud Storage. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Saiba mais sobre os endpoints regionais do Dataflow.

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos para este pipeline:

  • A instância do Cloud Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Cloud Spanner precisa existir antes da execução do pipeline.
  • A instância de metadados do Cloud Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Cloud Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Cloud Spanner precisa ser criado antes da execução do pipeline.
  • O bucket de saída do Cloud Storage precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
spannerInstanceId O ID da instância do Cloud Spanner de onde os dados dos fluxos de alterações são lidos.
spannerDatabase O banco de dados do Cloud Spanner de onde os dados dos fluxos de alterações são lidos.
spannerMetadataInstanceId O ID da instância do Cloud Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerMetadataDatabase O banco de dados do Cloud Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerChangeStreamName O nome do fluxo de alterações a ser lido pelo Cloud Spanner.
gcsOutputDirectory O local do arquivo dos fluxos de alterações na saída do Cloud Storage no formato: 'gs://${BUCKET}/${ROOT_PATH}/'.
outputFilenamePrefix (Opcional) O prefixo do nome dos arquivos a serem gravados. O prefixo do arquivo padrão é definido como "saída".
spannerProjectId (Opcional) Projeto do qual os fluxos de alterações serão lidos. Este é também o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
startTimestamp (Opcional) O DateTime inicial, inclusive, para ler os fluxos de alterações. Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora quando o pipeline é iniciado, ou seja, o horário atual.
endTimestamp (Opcional) A terminação DateTime, inclusive, para usar em fluxos de alterações de leitura. Ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
outputFileFormat (Opcional) O formato do arquivo de saída do Cloud Storage. Os formatos permitidos são TEXT e AVRO. O padrão é AVRO.
windowDuration (Opcional) A duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é 5 min, com um mínimo de 1 s. Os formatos permitidos são: [int]s para segundos (5 s, por exemplo); [int]m para minutos (12 min, por exemplo); [int]h para horas (2h, por exemplo).
rpcPriority (Opcional) A prioridade da solicitação para chamadas do Cloud Spanner. O valor precisa ser um destes: [HIGH,MEDIUM,LOW]. (Padrão: HIGH)
numShards (Opcional) O número máximo de fragmentos de saída produzidos durante a gravação. O número padrão é 20. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage
spannerMetadataTableName (Opcional) O nome da tabela de metadados do conector dos Fluxos de alterações do Cloud Spanner a ser usado. Se não for fornecido, uma tabela de metadados dos fluxos de alterações do Cloud Spanner será criada automaticamente durante o fluxo de pipeline. É preciso informar esse parâmetro ao atualizar um pipeline. Caso contrário, não o forneça.

Executar os fluxos de alterações do Cloud Spanner para o modelo do Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Google Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

Fluxos de alterações do Cloud Spanner para o BigQuery

O modelo de fluxos de alterações do Cloud Spanner para o BigQuery é um pipeline de streaming que transmite os registros de alteração de dados do Cloud Spanner e os grava em tabelas do BigQuery usando o Dataflow Runner V2.

Se as tabelas do BigQuery necessárias não existirem, o pipeline as criará. Caso contrário, as tabelas atuais do BigQuery serão usadas. O esquema das tabelas atuais do BigQuery precisa conter as colunas rastreadas correspondentes das tabelas do Cloud Spanner e as colunas de metadados adicionais (consulte a descrição dos campos de metadados na lista a seguir) que não são ignoradas explicitamente pela opção "ignoreFields". Cada nova linha do BigQuery inclui todas as colunas monitoradas pelo fluxo de alterações da linha correspondente na tabela do Cloud Spanner no carimbo de data/hora do registro de alterações.

Todas as colunas monitoradas do fluxo de alterações são incluídas em cada linha da tabela do BigQuery, independentemente de serem modificadas por uma transação do Cloud Spanner. As colunas não monitoradas não são incluídas na linha do BigQuery. Qualquer alteração do Cloud Spanner inferior à marca d'água do Dataflow é aplicada com sucesso às tabelas do BigQuery ou armazenada na fila de mensagens inativas para nova tentativa. As linhas do BigQuery são inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Cloud Spanner.

Os seguintes campos de metadados são adicionados às tabelas do BigQuery:

  • _metadata_spanner_mod_type: extraído do registro de alterações dos dados do fluxo de alterações.
  • _metadata_spanner_table_name: o nome da tabela do Cloud Spanner. Observe que este não é o nome da tabela de metadados do conector.
  • _metadata_spanner_commit_timestamp: extraído do registro de alterações dos dados do fluxo de alterações.
  • _metadata_spanner_server_transaction_id: extraído do registro de alterações dos dados do fluxo de alterações.
  • _metadata_spanner_record_sequence: extraído do registro de alterações dos dados do fluxo de alterações.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: extraído do registro de alterações dos dados do fluxo de alterações.
  • _metadata_spanner_number_of_records_in_transaction: extraídos do registro de alterações dos dados do fluxo de alterações.
  • _metadata_spanner_number_of_partitions_in_transaction: extraído do registro de alterações dos dados do fluxo de alterações.
  • _metadata_big_query_commit_timestamp: o carimbo de data/hora de confirmação de quando a linha é inserida no BigQuery.

Observação:

  • Este modelo não propaga alterações de esquema do Cloud Spanner para o BigQuery. Como uma alteração de esquema no Cloud Spanner provavelmente vai interromper o pipeline, talvez ele precise ser recriado depois que o esquema for alterado.
  • Para os tipos de captura de valor OLD_AND_NEW_VALUES e NEW_VALUES, quando o registro de alteração de dados tiver uma alteração UPDATE, o modelo precisará fazer uma leitura desatualizada para o Cloud Spanner no carimbo de data/hora de confirmação do registro de alteração de dados. Isso serve para recuperar as colunas inalteradas, mas monitoradas. Configure o banco de dados "version_retention_period" corretamente para a leitura desatualizada. Para o tipo de captura de valor NEW_ROW, o modelo é mais eficiente, porque o registro de alteração de dados captura a nova linha completa, incluindo colunas que não são atualizadas em UPDATEs e o modelo não precisa fazer uma leitura desatualizada.
  • Você pode minimizar a latência da rede e os custos de transporte dela executando o job do Dataflow na mesma região de sua instância do Cloud Spanner ou das tabelas do BigQuery. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Saiba mais sobre os endpoints regionais do Dataflow.
  • Esse modelo é compatível com todos os tipos de dados válidos do Cloud Spanner. No entanto, se o tipo do BigQuery for mais preciso que o do Cloud Spanner, poderá ocorrer perda de precisão durante a transformação. Especificamente:
    • Para o tipo JSON do Cloud Spanner, a ordem dos membros de um objeto é ordenada lexicograficamente, mas não há essa garantia para o tipo JSON do BigQuery.
    • O Cloud Spanner é compatível com o tipo TIMESTAMP de nanossegundos, e o BigQuery é compatível apenas com o tipo TIMESTAMP de microssegundos.

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos para este pipeline:

  • A instância do Cloud Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Cloud Spanner precisa existir antes da execução do pipeline.
  • A instância de metadados do Cloud Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Cloud Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Cloud Spanner precisa ser criado antes da execução do pipeline.
  • O conjunto de dados do BigQuery precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
spannerInstanceId A instância do Cloud Spanner em que os fluxos de alterações serão lidos.
spannerDatabase O banco de dados do Cloud Spanner de onde os fluxos de alterações serão lidos.
spannerMetadataInstanceId A instância do Cloud Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
spannerMetadataDatabase O banco de dados do Cloud Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerChangeStreamName O nome do fluxo de alterações a ser lido pelo Cloud Spanner.
bigQueryDataSet O conjunto de dados do BigQuery para a saída de fluxos de alterações.
spannerProjectId (Opcional) Projeto do qual os fluxos de alterações serão lidos. Este é também o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
spannerMetadataTableName (Opcional) O nome da tabela de metadados do conector dos Fluxos de alterações do Cloud Spanner a ser usado. Se não for informado, uma tabela de metadados do conector de streams de alteração do Cloud Spanner será criada automaticamente durante o fluxo do pipeline. É preciso informar esse parâmetro ao atualizar um pipeline. Caso contrário, não o forneça.
rpcPriority (Opcional) A prioridade da solicitação para chamadas do Cloud Spanner. O valor precisa ser um destes: [HIGH,MEDIUM,LOW]. (Padrão: HIGH)
startTimestamp (Opcional) O DateTime inicial, inclusive, para ler os fluxos de alterações. Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
endTimestamp (Opcional) A terminação DateTime, inclusive, para usar em fluxos de alterações de leitura. Ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
bigQueryProjectId (Opcional) O projeto do BigQuery. O padrão é o projeto do job do Dataflow.
bigQueryChangelogTableNameTemplate (Opcional) O modelo para o nome das tabelas de registro de alterações do BigQuery. Vai para o padrão {_metadata_spanner_table_name}_changelog.
deadLetterQueueDirectory (Opcional) O caminho do arquivo para armazenar todos os registros não processados com o motivo da falha de processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é suficiente na maioria das condições.
dlqRetryMinutes (Opcional) O número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) O valor padrão é 10.
ignoreFields (Opcional) Lista separada por vírgulas de campos (diferencia maiúsculas de minúsculas) que devem ser ignorados. Podem ser campos de tabelas monitoradas ou campos de metadados adicionados pelo pipeline. Os campos ignorados não serão inseridos no BigQuery.

Executar os fluxos de alterações do Cloud Spanner para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações

Fluxos de alterações do Cloud Spanner para o Pub/Sub

O modelo de fluxos de alterações do Cloud Spanner para o Pub/Sub é um pipeline de streaming que transmite os registros de alteração de dados do Cloud Spanner e os grava em tópicos do Pub/Sub usando o Dataflow Runner V2.

Você precisa criar o novo tópico do Pub/Sub antes de enviar seus dados. Depois de criar, o Pub/Sub gera e anexa automaticamente uma assinatura ao novo tópico. Se você tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow vai gerar uma exceção e o pipeline vai travar enquanto tenta fazer uma conexão.

Se o tópico do Pub/Sub necessário já existir, os dados podem ser gerados para ele.

Para mais informações, consulte Sobre fluxos de alteração, Criar conexões de fluxos de alteração com o Dataflow e Práticas recomendadas para fluxos de alteração.

Requisitos para este pipeline:

  • A instância do Cloud Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Cloud Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Cloud Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Cloud Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Cloud Spanner precisa ser criado antes da execução do pipeline.
  • O tópico do Pub/Sub precisa ser criado antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
spannerInstanceId A instância do Cloud Spanner em que os fluxos de alterações serão lidos.
spannerDatabase O banco de dados do Cloud Spanner de onde os fluxos de alterações serão lidos.
spannerMetadataInstanceId A instância do Cloud Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
spannerMetadataDatabase O banco de dados do Cloud Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerChangeStreamName O nome do fluxo de alterações a ser lido pelo Cloud Spanner.
pubsubTopic O tópico do Pub/Sub para saída dos fluxos de alteração.
spannerProjectId (Opcional) Projeto do qual os fluxos de alterações serão lidos. Este é também o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
spannerMetadataTableName (Opcional) O nome da tabela de metadados do conector dos Fluxos de alterações do Cloud Spanner a ser usado. Se não for informado, o Cloud Spanner automaticamente criará a tabela de metadados do conector dos fluxos durante a mudança do fluxo do pipeline. Você precisa fornecer esse parâmetro ao atualizar um pipeline atual. Não use esse parâmetro para outros casos.
rpcPriority (Opcional) A prioridade da solicitação para chamadas do Cloud Spanner. O valor precisa ser um destes: [HIGH,MEDIUM,LOW]. (Padrão: HIGH)
startTimestamp (Opcional) O início DateTime, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
endTimestamp (Opcional) A terminação DateTime, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
outputFileFormat (Opcional) O formato da saída. A saída é encapsulada em muitas PubsubMessages e enviada para um tópico do Pub/Sub. Os formatos permitidos são JSON e AVRO. O padrão é JSON.
pubsubAPI (Opcional) API Pub/Sub usada para implementar o pipeline. As APIs permitidas são pubsubio e native_client. Para um pequeno número de consultas por segundo (QPS), native_client tem menos latência. Quando o QPS é alto, o pubsubio tem um desempenho melhor e mais estável. O padrão é pubsubio.

Executar os fluxos de alterações do Cloud Spanner para o modelo do Pub/Sub

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

    gcloud beta dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para saída dos fluxos de alteração

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para saída dos fluxos de alteração

MongoDB para BigQuery (CDC)

O modelo MongoDB para BigQuery CDC (captura de dados alterados) é um pipeline de streaming que funciona em conjunto com o fluxo de alterações do MongoDB. O pipeline lê os registros JSON enviados ao Pub/Sub por meio de um fluxo de alterações do MongoDB e os grava no BigQuery conforme especificado pelo parâmetro userOption.

Requisitos para esse pipeline

  • O conjunto de dados de destino do BigQuery precisa existir.
  • A instância de origem do MongoDB precisa ser acessível nas máquinas de trabalho do Dataflow.
  • O stream enviando as alterações do MongoDB para o Pub/Sub precisa estar em execução.

Parâmetros do modelo

Parâmetro Descrição
mongoDbUri URI de conexão do MongoDB no formato mongodb+srv://:@.
database Banco de dados no MongoDB para leitura da coleção. Por exemplo, my-db.
collection Nome da coleção dentro do banco de dados MongoDB. Por exemplo, my-collection.
outputTableSpec Tabela do BigQuery a ser gravada. Por exemplo, bigquery-project:dataset.output_table.
userOption FLATTEN ou NONE. FLATTEN nivela os documentos no primeiro nível. NONE armazena todo o documento como uma string JSON.
inputTopic O tópico de entrada do tópico do Pub/Sub que será lido, no formato de projects/<project>/topics/<topic>.

Como executar o modelo do MongoDB para BigQuery (CDC)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the MongoDB to BigQuery (CDC) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION,\
inputTopic=INPUT_TOPIC

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
  • MONGO_DB_URI: o URI do MongoDB.
  • DATABASE: o banco de dados do MongoDB.
  • COLLECTION: sua coleção do MongoDB.
  • USER_OPTION: FLATTEN ou NENHUM.
  • INPUT_TOPIC: o tópico de entrada do Pub/Sub.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION",
          "inputTopic": "INPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • OUTPUT_TABLE_SPEC: o nome da tabela de destino do BigQuery.
  • MONGO_DB_URI: o URI do MongoDB.
  • DATABASE: o banco de dados do MongoDB.
  • COLLECTION: sua coleção do MongoDB.
  • USER_OPTION: FLATTEN ou NENHUM.
  • INPUT_TOPIC: o tópico de entrada do Pub/Sub.