Modelos de streaming fornecidos pelo Google

O Google fornece um conjunto de modelos de código aberto (em inglês) do Cloud Dataflow. Para informações gerais sobre modelos, consulte a página Visão geral. Para uma lista de todos os modelos fornecidos pelo Google, consulte esta página.

Nesta página, você verá a documentação dos modelos de streaming.

Assinatura do Pub/Sub para BigQuery

A inscrição Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma assinatura de Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • As mensagens do Pub/Sub precisam estar no formato JSON, descritas aqui. Por exemplo, mensagens formatadas como {"k1":"v1", "k2":"v2"} podem ser inseridas em uma tabela do BigQuery com duas colunas, chamadas k1 e k2, com dados do tipo string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription O tópico de entrada do Cloud Pub/Sub que será lido, no formato de projects/<project>/subscriptions/<subscription>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar a inscrição Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Subscription to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Tópico do Pub/Sub para o BigQuery

O tópico do Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de um tópico do Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • As mensagens do Pub/Sub precisam estar no formato JSON, descritas aqui. Por exemplo, mensagens formatadas como {"k1":"v1", "k2":"v2"} podem ser inseridas em uma tabela do BigQuery com duas colunas, chamadas k1 e k2, com dados do tipo string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico de entrada do tópico do Pub/Sub que será lido, no formato de projects/<project>/topics/<topic>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não chegaram à tabela de saída. É preciso usar o formato <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o tópico do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Topic to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Avro do Pub/Sub para BigQuery

O modelo do Avro do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do Avro de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Requisitos para esse pipeline

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema para os registros do Avro precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
schemaPath O local do Cloud Storage do arquivo de esquema do Avro. Por exemplo, gs://path/to/my/schema.avsc.
inputSubscription A assinatura de entrada do Pub/Sub a ser lida. Por exemplo, projects/<project>/subscriptions/<subscription>
outputTopic O tópico do Pub/Sub a ser usado para registros não processados. Por exemplo, projects/<project-id>/topics/<topic-name>
outputTableSpec O local da tabela de saída do BigQuery. Por exemplo, <my-project>:<my-dataset>.<my-table>. Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário.
writeDisposition (Opcional) O WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Padrão: WRITE_APPEND
createDisposition (Opcional) O CreateDisposition do BigQuery. Por exemplo: CREATE_IF_NEEDED e CREATE_NEVER. Padrão: CREATE_IF_NEEDED

Como executar o Avro do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Avro to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub para Pub/Sub

O modelo do Pub/Sub para Pub/Sub é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava as mensagens em outro tópico do Pub/Sub. O pipeline também aceita uma chave de atributo de mensagem opcional e um valor que pode ser usado para filtrar as mensagens que precisam ser gravadas no tópico do Pub/Sub. Use esse modelo para copiar mensagens de uma assinatura do Pub/Sub para outro tópico do Pub/Sub com um filtro de mensagem opcional.

Requisitos para este pipeline:

  • É necessário que a inscrição do Pub/Sub de origem exista antes da execução.
  • O tópico do Pub/Sub de destino precisa existir antes da execução.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Assinatura do Pub/Sub em que será lida a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tópico do Cloud Pub/Sub onde será gravada a saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
filterKey [Opcional] Filtra eventos com base em uma chave de atributo. Nenhum filtro será aplicado se filterKey não for especificado.
filterValue [Opcional] Filtra o valor do atributo a ser usado no caso de um filterKey ser fornecido. Um filterValue nulo é usado por padrão.

Como executar o modelo do Pub/Sub para Pub/Sub

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. As correspondências parciais (por exemplo, substring) não serão filtradas. Um valor de filtro de evento nulo é usado por padrão
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. As correspondências parciais (por exemplo, substring) não serão filtradas. Um valor de filtro de evento nulo é usado por padrão
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub para Splunk

O modelo do Pub/Sub para Splunk é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava o payload da mensagem no Splunk pelo Coletor de eventos HTTP (HEC, na sigla em inglês) do Splunk. O caso de uso mais comum desse modelo é exportar registros para o Splunk. Para ver um exemplo do fluxo de trabalho subjacente, consulte Como implantar exportações de registro prontas para produção no Splunk usando o Dataflow.

Antes de gravar no Splunk, também é possível aplicar uma função JavaScript definida pelo usuário ao payload da mensagem. Todas as mensagens que apresentam falhas de processamento são encaminhadas para um tópico não processado do Pub/Sub para posterior resolução de problemas e reprocessamento.

Como uma camada extra de proteção para o token HEC, é possível passar uma chave do Cloud KMS com o parâmetro de token HEC codificado em base64 criptografado com a chave do Cloud KMS. Consulte o endpoint de criptografia da API Cloud KMS para saber mais detalhes sobre como criptografar o parâmetro de token do HEC.

Requisitos para este pipeline:

  • A inscrição do Pub/Sub de origem precisa existir antes da execução do pipeline.
  • O tópico do Pub/Sub precisa existir antes de o pipeline ser executado.
  • O endpoint HEC de Splunk precisa ser acessível pela rede dos workers do Dataflow.
  • O token Splunk de HEC precisa ser gerado e estar disponível.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription A inscrição do Pub/Sub a partir da qual ler a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
token O token de autenticação de HEC de Splunk. Essa string codificada em base64 pode ser criptografada com uma chave do Cloud KMS para maior segurança.
url O URL de HEC do Splunk. Precisa ser roteável a partir da VPC na qual o pipeline é executado. Por exemplo, https://splunk-hec-host:8088.
outputDeadletterTopic O tópico do Pub/Sub para encaminhar mensagens não entregues. Por exemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Opcional] O caminho do Cloud Storage que contém todo o código do JavaScript. Por exemplo, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Opcional] O nome da função JavaScript a ser chamada. Por exemplo, se a função JavaScript for function myTransform(inJson) { ...dostuff...}, o nome da função será myTransform.
batchCount [Opcional] O tamanho do lote para enviar vários eventos para o Splunk. Padrão 1 (sem loteamento).
parallelism [Opcional] O número máximo de solicitações paralelas. Padrão 1 (sem paralelismo).
disableCertificateValidation [Opcional] Desativar a validação do certificado SSL. Falso padrão (validação ativada).
includePubsubMessage [Opcional] Inclua a mensagem completa do Pub/Sub no payload. Falso padrão (somente o elemento de dados está incluído no payload).
tokenKMSEncryptionKey [Opcional] A chave do Cloud KMS para descriptografar a string do token HEC. Se a chave do Cloud KMS for fornecida, a string do token HEC precisará ser transmitida de forma criptografada.

Como executar o modelo do Pub/Sub para o Splunk

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Splunk template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função do JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript (por exemplo, gs://your-bucket/your-function.js)
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função do JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript (por exemplo, gs://your-bucket/your-function.js)
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub para arquivos Avro no Cloud Storage

O modelo do Pub/Sub para arquivos do Avro no Cloud Storage é um pipeline de streaming que lê dados de um tópico do Pub/Sub e grava arquivos Avro no bucket especificado do Cloud Storage.

Requisitos para este pipeline:

  • O tópico de entrada do Pub/Sub precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputTopic Tópico do Pub/Sub que será assinado para consumo de mensagens. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory Diretório em que os arquivos Avro de saída são arquivados. Precisa conter / no final. Por exemplo, gs://example-bucket/example-directory/.
avroTempDirectory Diretório dos arquivos Avro temporários. Precisa conter / no final. Por exemplo, gs://example-bucket/example-directory/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída para os arquivos Avro.
outputFilenameSuffix [Opcional] Sufixo do nome do arquivo de saída para os arquivos Avro.
outputShardTemplate [Opcional] O modelo de fragmento do arquivo de saída. Ela é especificada como sequências repetidas das letras S ou N. Por exemplo, SSS-NNN. Eles são substituídos pelo número do fragmento ou pelo número total de fragmentos, respectivamente. Quando esse parâmetro não for especificado, o formato de modelo padrão será W-P-SS-of-NN.

Como executar o modelo Avro do Pub/Sub com o Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Avro Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência

Pub/Sub para arquivos de texto no Cloud Storage

O modelo do Pub/Sub para o Cloud Storage Text é um pipeline de streaming que lê registros do Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos para este pipeline:

  • O tópico do Pub/Sub precisa existir antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory O caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
outputFilenamePrefix O prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-.
outputFilenameSuffix O sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv.
outputShardTemplate O modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são colocados em um único arquivo por janela. O outputShardTemplate padrão é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate será 00-of-01.

Como executar o modelo do Pub/Sub para arquivos de texto no Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub para MongoDB

O modelo do Pub/Sub para MongoDB é um pipeline de streaming que lê mensagens JSON codificadas de uma assinatura do Pub/Sub e grava no MongoDB como documentos. Caso seja necessário, o pipeline é compatível com transformações extras que podem ser incluídas usando uma função definida pelo usuário (UDF) do JavaScript. Qualquer erro ocorrido devido à incompatibilidade de esquema, JSON malformado ou ao executar transformações são gravados em uma tabela do BigQuery para mensagens não processadas, juntamente com mensagens de entrada. Se não existir uma tabela para registros não processados antes da execução, o pipeline criará automaticamente essa tabela.

Requisitos para este pipeline:

  • A assinatura do Pub/Sub precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • O cluster do MongoDB precisa existir e poder ser acessado das máquinas de trabalho do Dataflow.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Nome da assinatura do Pub/Sub. Exemplo: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri Lista separada por vírgulas dos servidores MongoDB. Exemplo: 192.285.234.12:27017,192.287.123.11:27017
database Banco de dados no MongoDB para armazenar a coleção. Por exemplo, my-db.
collection Nome da coleção dentro do banco de dados MongoDB. Por exemplo, my-collection.
deadletterTable Tabela do BigQuery que armazena mensagens geradas por falhas (esquema incorreto, JSON incorreto etc.). Por exemplo, project-id:dataset-name.table-name.
javascriptTextTransformGcsPath [Opcional] Local do Cloud Storage do arquivo JavaScript na transformação da UDF. Por exemplo, gs://mybucket/filename.json.
javascriptTextTransformFunctionName [Opcional] Nome da UDF em JavaScript. Por exemplo, transform.
batchSize [Opcional] Tamanho do lote usado para a inserção de documentos em lote no MongoDB. Padrão: 1000.
batchSizeBytes [Opcional] Tamanho do lote em bytes. Padrão: 5242880.
maxConnectionIdleTime [Opcional] Tempo máximo de inatividade permitido em segundos antes que o tempo limite da conexão ocorra. Padrão: 60000.
sslEnabled [Opcional] Valor booleano que indica se a conexão com o MongoDB está ativada para SSL. Padrão: true.
ignoreSSLCertificate [Opcional] Valor booleano que indica se o certificado SSL deve ser ignorado. Padrão: true.
withOrdered [Opcional] Valor booleano que permite inserções ordenadas em massa para o MongoDB. Padrão: true.
withSSLInvalidHostNameAllowed [Opcional] Valor booleano que indica se o nome do host inválido é permitido para conexão SSL. Padrão: true.

Como executar o modelo do Pub/Sub para MongoDB

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to MongoDB template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para usar modelos com a ferramenta de linha de comando gcloud, você precisa ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Substitua:

  • PROJECT_ID: ID do projeto
  • REGION_NAME: nome da região do Dataflow (por exemplo, us-central1)
  • JOB_NAME: um nome de job de sua escolha
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • LOCATION: nome da região do Dataflow (por exemplo, us-central1)
  • JOB_NAME: um nome de job de sua escolha
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Datastream para o Cloud Spanner

O modelo do Datastream para Cloud Spanner é um pipeline de streaming que lê eventos do Datastream de um bucket do Cloud Storage e os grava em um banco de dados do Cloud Spanner. Ela é destinada à migração de dados de fontes do Datastream para o Cloud Spanner.

Todas as tabelas necessárias para migração precisam existir no banco de dados de destino do Cloud Spanner antes da execução do modelo. Portanto, a migração de esquema de um banco de dados de origem para o Cloud Spanner de destino precisa ser concluída antes da migração de dados. Os dados podem existir nas tabelas antes da migração. Esse modelo não propaga alterações de esquema do Datastream no banco de dados do Cloud Spanner.

A consistência de dados é garantida apenas no final da migração, quando todos os dados tiverem sido gravados no Cloud Spanner. Para armazenar informações de pedidos de cada registro gravado no Cloud Spanner, esse modelo cria uma tabela adicional (chamada de tabela de sombra) para cada tabela no banco de dados do Cloud Spanner. Isso é usado para garantir consistência no final da migração. As tabelas de sombra não são excluídas após a migração e podem ser usadas para fins de validação no final da migração.

Todos os erros que ocorrem durante a operação, como incompatibilidades de esquema, arquivos JSON malformados ou erros resultantes da execução de transformações, são registrados em uma fila de erros. A fila de erros é uma pasta do Cloud Storage que armazena todos os eventos do Datastream que encontraram erros, além do motivo do erro. em formato de texto. Os erros podem ser temporários ou permanentes e são armazenados em pastas apropriadas do Cloud Storage na fila de erros. Os erros temporários são repetidos automaticamente, ao contrário dos permanentes. No caso de erros permanentes, você tem a opção de fazer correções nos eventos de mudança e movê-los para o bucket recuperável enquanto o modelo estiver em execução.

Requisitos para este pipeline:

  • Um fluxo do Datastream no estado Em execução ou Não iniciado.
  • Um bucket do Cloud Storage em que os eventos do Datastream são replicados.
  • Um banco de dados do Cloud Spanner com tabelas existentes. Essas tabelas podem estar vazias ou conter dados.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse é o caminho raiz de um stream.
streamName O nome ou modelo do stream para pesquisar informações de esquema e tipo de origem.
instanceId A instância do Cloud Spanner em que as alterações são replicadas.
databaseId O banco de dados do Cloud Spanner em que as alterações são replicadas.
projectId O ID do projeto do Cloud Spanner.
deadLetterQueueDirectory (Opcional) Esse é o caminho do arquivo para armazenar a saída da fila de erros. O padrão é um diretório no local temporário do job do Dataflow.
inputFileFormat (Opcional) O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
shadowTablePrefix (Opcional) O prefixo usado para nomear tabelas de sombra. Padrão: shadow_.

Como executar o modelo do Datastream para o Cloud Spanner

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to Spanner template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Cloud Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Cloud Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Cloud Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Cloud Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner",
   }
}
  

Arquivos de texto no Cloud Storage para BigQuery (stream)

O pipeline de arquivos de texto no Cloud Storage para BigQuery permite ler arquivos de texto armazenados no Cloud Storage, transformá-los usando uma função definida pelo usuário (UDF) do JavaScript fornecida por você e anexar o resultado no BigQuery.

O pipeline é executado indefinidamente e precisa ser encerrado manualmente por meio de cancelar e não drenar, devido ao uso da transformação Watch, que é um DoFn divisível que não é compatível com a drenagem.

Requisitos para este pipeline:

  • Crie um arquivo JSON que descreva o esquema da tabela de saída no BigQuery.

    Verifique se há uma matriz JSON de nível superior intitulada BigQuery Schema e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemplo:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Crie um arquivo JavaScript (.js) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    Por exemplo, esta função divide cada linha de um arquivo CSV e retorna uma string JSON depois de transformar os valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parâmetros do modelo

Parâmetro Descrição
javascriptTextTransformGcsPath Local do Cloud Storage da UDF do JavaScript. Por exemplo, gs://my_bucket/my_function.js.
JSONPath Local do Cloud Storage do arquivo de esquema do BigQuery, descrito como um JSON. Por exemplo, gs://path/to/my/schema.json.
javascriptTextTransformFunctionName O nome da função JavaScript que você quer chamar como UDF. Por exemplo, transform.
outputTable A tabela do BigQuery totalmente qualificada. Exemplo: my-project:dataset.table
inputFilePattern Local do Cloud Storage do texto que você quer processar. Por exemplo, gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Diretório temporário para o processo de carregamento do BigQuery. Exemplo: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabela de mensagens que não alcançaram a tabela de saída. Por exemplo, my-project:dataset.my-unprocessed-table. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo Cloud Storage Text no BigQuery (Stream)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Arquivos de texto no Cloud Storage para o Pub/Sub (stream)

Esse modelo cria um pipeline de streaming que pesquisa continuamente novos arquivos de texto carregados no Cloud Storage, lê cada arquivo linha por linha e publica strings em um tópico do Pub/Sub. O modelo publica registros em um arquivo delimitado por uma nova linha contendo registros JSON ou em um arquivo CSV em um tópico do Pub/Sub para processamento em tempo real. É possível usar esse modelo para reproduzir dados novamente no Pub/Sub.

O pipeline é executado indefinidamente e precisa ser encerrado manualmente com um "cancel", e não com um "drain", porque ele usa uma transformação "Watch" que é um "SplittableDoFn" não compatível com drenagem.

Atualmente, o intervalo de pesquisa é fixo e definido para 10 segundos. Esse modelo não configura carimbos de data/hora nos registros individuais, assim o horário do evento será igual ao da publicação durante a execução. Se o pipeline depender de um tempo exato do evento para processamento, não o utilize.

Requisitos para este pipeline:

  • Os arquivos de entrada precisam estar no formato JSON ou CSV delimitado por nova linha. Os registros que ocupam várias linhas nos arquivos de origem podem causar problemas no downstream, já que cada linha nos arquivos será publicada como uma mensagem para o Pub/Sub.
  • O tópico do Pub/Sub precisa existir antes da execução.
  • O pipeline é executado indefinidamente e precisa ser finalizado manualmente.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/files/*.json ou gs://bucket-name/path/*.csv.
outputTopic O tópico de entrada do Pub/Sub a ser gravado. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.

Como executar os arquivos de texto no Cloud Storage para o modelo Pub/Sub (stream)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

O modelo de mascaramento de dados/tokenização do Cloud Storage para o BigQuery (usando o Cloud DLP) é um pipeline de streaming que lê arquivos csv de um bucket do Cloud Storage, chama a API Cloud Data Loss Prevention (Cloud DLP) para remoção identificação e grava os dados desidentificados na tabela especificada do BigQuery. Este modelo é compatível com o uso de um modelo de inspeção e um modelo de desidentificação, ambos do Cloud DLP. Isso permite que os usuários inspecionem informações potencialmente confidenciais e façam a desidentificação. Além disso, também é possível desidentificar os dados estruturados em que as colunas são especificadas para serem desidentificadas sem nenhuma inspeção necessária.

Requisitos para este pipeline:

  • Os dados de entrada para tokenizar precisam existir.
  • Os modelos do Cloud DLP precisam existir (por exemplo, DeidentifyTemplate e InspectTemplate). Veja os modelos do Cloud DLP para mais detalhes.
  • O conjunto de dados do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern Arquivo(s) csv para ler registros de dados de entrada. O uso de caracteres curingas também é aceito. Por exemplo, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv.
dlpProjectId ID do projeto do Cloud DLP que tem o recurso da API Cloud DLP. Esse projeto do Cloud DLP pode ser o mesmo projeto que tem os modelos do Cloud DLP ou pode ser um projeto separado. Por exemplo, my_dlp_api_project.
deidentifyTemplateName Modelo de desidentificação do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por exemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de dados do BigQuery para enviar resultados tokenizados.
batchSize Tamanho do lote/divisão para enviar dados para inspecionar e/ou retirar a tokenização. Se for um arquivo csv, batchSize é o número de linhas em um lote. Os usuários precisam determinar o tamanho do lote com base no tamanho dos registros e no tamanho do arquivo. Observe que a API Cloud DLP tem um limite de tamanho de payload de 524 KB por chamada de API.
inspectTemplateName [Opcional] Modelo de inspeção do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/identifyTemplates/{idTemplateId}. Por exemplo, projects/my_project/identifyTemplates/100.

Como executar o modelo de tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Substitua:

  • TEMPLATE_PROJECT_ID: o ID do modelo de projeto
  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • TEMPLATE_PROJECT_ID: o ID do modelo de projeto
  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Alterar a captura de dados do MySQL para o BigQuery usando o Debezium e o Pub/Sub (Stream)

O modelo da captura de dados de alteração do MySQL para o BigQuery usando o Debezium e o Pub/Sub é um pipeline de streaming que lê mensagens Pub/Sub com dados de alteração de um banco de dados MySQL e grava os registros no BigQuery. Um conector do Debezium captura alterações no banco de dados MySQL e publica os dados alterados no Pub/Sub. O modelo lê as mensagens do Pub/Sub e as grava no BigQuery.

É possível usar esse modelo para sincronizar bancos de dados MySQL e tabelas do BigQuery. O pipeline grava os dados alterados em uma tabela de preparo do BigQuery e atualiza intermitentemente uma tabela do BigQuery que replica o banco de dados MySQL.

Requisitos para este pipeline:

  • O conector do Debezium precisa ser implantado.
  • As mensagens do Pub/Sub precisam ser serializadas em uma Linha enviada.

Parâmetros do modelo

Parâmetro Descrição
inputSubscriptions A lista separada por vírgulas de assinaturas de entrada do Pub/Sub para leitura, no formato de <subscription>,<subscription>, ...
changeLogDataset O conjunto de dados do BigQuery para armazenar as tabelas de preparo, no formato de <my-dataset>
replicaDataset O local do conjunto de dados do BigQuery para armazenar as tabelas de réplica, no formato de <my-dataset>
updateFrequencySecs Opcional: o intervalo em que o pipeline atualiza a tabela do BigQuery replicando o banco de dados MySQL.

Como executar a captura de dados de alteração usando o Debezium e o MySQL do modelo do Pub/Sub para o BigQuery

Para executar esse modelo, siga estas etapas:

  1. Na máquina local, clone o repositório DataflowTemplates.
  2. Altere para o diretório v2/cdc-parent.
  3. Certifique-se de que o conector do Debezium esteja implantado.
  4. Usando o Maven, execute o modelo do Dataflow.

    Substitua os seguintes valores:

    • PROJECT_ID: o ID do projeto.
    • YOUR_SUBSCRIPTIONS: sua lista separada por vírgulas de nomes de assinatura do Pub/Sub.
    • YOUR_CHANGELOG_DATASET: seu conjunto de dados do BigQuery para dados do registro de alterações.
    • YOUR_REPLICA_DATASET: seu conjunto de dados do BigQuery para tabelas replicadas.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka para BigQuery

O modelo Apache Kafka para BigQuery é um pipeline de streaming que ingere dados de texto do Apache Kafka, executa uma função definida pelo usuário (UDF) e gera os registros resultantes no BigQuery. Qualquer erro que ocorrer na transformação dos dados, na execução da UDF ou na inserção na tabela de respostas será inserido em uma tabela de erros separada no BigQuery. Se a tabela de erros não existir antes da execução, ela será criada.

Requisitos para esse pipeline

  • A tabela de respostas do BigQuery precisa existir.
  • O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
  • Os tópicos do Apache Kafka precisam existir, e as mensagens precisam estar codificadas em um formato JSON válido.

Parâmetros do modelo

Parâmetro Descrição
outputTableSpec O local da tabela de respostas do BigQuery para gravar as mensagens do Apache Kafka, no formato de my-project:dataset.table
inputTopics Os tópicos de entrada do Apache Kafka para leitura em uma lista separada por vírgulas. Exemplo: messages
bootstrapServers O endereço do host dos servidores do agente do Apache Kafka em execução em uma lista separada por vírgulas, cada endereço de host no formato 35.70.252.199:9092
javascriptTextTransformGcsPath Opcional: caminho do local do Cloud Storage para a UDF do JavaScript. Exemplo: gs://my_bucket/my_function.js
javascriptTextTransformFunctionName Opcional: o nome do JavaScript a ser chamado como UDF. Exemplo: transform
outputDeadletterTable Opcional: a tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de my-project:dataset.my-deadletter-table. Se não existir, a tabela será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo do Apache Kafka para BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Kafka to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • YOUR_JAVASCRIPT_FUNCTION: o nome da UDF
  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • YOUR_JAVASCRIPT_FUNCTION: o nome da UDF
  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}
  

Datastream para o BigQuery (stream)

O modelo do Datastream para o BigQuery é um pipeline de streaming que lê dados do Datastream e os replica no BigQuery. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e os replica em uma tabela de preparo particionada do BigQuery. Após a replicação, o modelo executa um MERGE no BigQuery para mesclar todas as alterações de change data capture (CDC) em uma réplica da tabela de origem.

O modelo lida com a criação e a atualização das tabelas do BigQuery gerenciadas pela replicação. Quando a linguagem de definição de dados (DDL) é obrigatória, um callback para o Datastream extrai o esquema da tabela de origem e o converte em tipos de dados do BigQuery. As operações compatíveis incluem:

  • Novas tabelas são criadas à medida que os dados são inseridos.
  • Novas colunas são adicionadas às tabelas do BigQuery com valores iniciais nulos.
  • As colunas descartadas são ignoradas no BigQuery, e os valores futuros são nulos.
  • As colunas renomeadas são adicionadas ao BigQuery como novas colunas.
  • As alterações de tipo não são propagadas para o BigQuery.

Requisitos para este pipeline:

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Os conjuntos de dados de destino do BigQuery são criados, e a conta de serviço do Compute Engine recebe acesso de administrador a eles.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
gcsPubSubSubscription A assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME.
inputFileFormat O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
outputStagingDatasetTemplate O nome de um conjunto de dados existente para conter tabelas de preparo. É possível incluir o modelo {_metadata_dataset} como um marcador que será substituído pelo nome do seu conjunto de dados/esquema de origem (por exemplo, {_metadata_dataset}_log).
outputDatasetTemplate O nome de um conjunto de dados existente para conter tabelas de réplica. É possível incluir o modelo {_metadata_dataset} como um marcador que será substituído pelo nome do conjunto de dados/esquema de origem (por exemplo, {_metadata_dataset}).
outputStagingTableNameTemplate (Opcional) O modelo para o nome das tabelas de preparo. O padrão é {_metadata_table}_log. Se você estiver replicando vários esquemas, a sugestão é {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate (Opcional) O modelo para o nome das tabelas de réplica. Padrão: {_metadata_table}. Se você estiver replicando vários esquemas, a sugestão é {_metadata_schema}_{_metadata_table}.
outputProjectId Opcional: projeto para conjuntos de dados do BigQuery em que os dados serão gerados. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
deadLetterQueueDirectory (Opcional) O caminho do arquivo para armazenar todas as mensagens não processadas com o motivo de falha no processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é suficiente na maioria das condições.
streamName (Opcional) O nome ou modelo do stream para pesquisar informações de esquema. Padrão: {_metadata_stream}.
mergeFrequencyMinutes (Opcional) O número de minutos entre as mesclagens de uma determinada tabela. Padrão: 5.
dlqRetryMinutes (Opcional) O número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) Padrão: 10.

Como executar o modelo do Datastream para o BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Exemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
  • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Exemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
  • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Datastream para PostgreSQL (stream)

O modelo do Datastream para PostgreSQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer banco de dados do PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica esses dados em tabelas de réplica do PostgreSQL.

O modelo não é compatível com a linguagem de definição de dados (DDL) e espera que todas as tabelas já existam no PostgreSQL. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver passado, uma versão que chega atrasada dessa linha será ignorada. A linguagem de manipulação de dados (DML) executada é uma melhor tentativa de replicar perfeitamente os dados de origem e de destino. As instruções DML executadas seguem as seguintes regras:

  • Se houver uma chave primária, as operações de inserção e atualização usarão sintaxe de mesclagem (isto é, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE.
  • Se houver chaves primárias, as exclusões serão replicadas como uma DML de exclusão.
  • Se não houver uma chave primária, as operações de inserção e atualização serão inseridas na tabela.
  • Se não houver chaves primárias, as exclusões serão ignoradas.

Se você estiver usando os utilitários Oracle para Postgres, adicione ROWID no PostgreSQL como a chave primária quando não houver nenhuma.

Os requisitos para esse pipeline são:

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Um banco de dados do PostgreSQL foi propagado com o esquema necessário.
  • O acesso à rede entre workers do Dataflow e o PostgreSQL está configurado.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
gcsPubSubSubscription A assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
inputFileFormat O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
databaseHost O host do PostgreSQL para se conectar.
databaseUser O usuário do PostgreSQL com todas as permissões necessárias para gravar em todas as tabelas na replicação.
databasePassword A senha do usuário do PostgreSQL.
databasePort (Opcional) A porta do banco de dados do PostgreSQL para a conexão. Padrão: 5432.
databaseName (Opcional) O nome do banco de dados do PostgreSQL ao qual se conectar. Padrão: postgres.
streamName (Opcional) O nome ou modelo do stream para pesquisar informações de esquema. Padrão: {_metadata_stream}.

Como executar o modelo do Datastream para o PostgreSQL

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to PostgreSQL template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Exemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST: o IP do host do PostgreSQL.
  • DATABASE_USER: o usuário do PostgreSQL.
  • DATABASE_PASSWORD: a senha do PostgreSQL.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • YOUR_PROJECT_ID: o ID do modelo de projeto
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Exemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST: o IP do host do PostgreSQL.
  • DATABASE_USER: o usuário do PostgreSQL.
  • DATABASE_PASSWORD: a senha do PostgreSQL.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres",
   }
}