Modelos de streaming fornecidos pelo Google

O Google fornece um conjunto de modelos de código aberto (em inglês) do Cloud Dataflow. Para informações gerais sobre modelos, consulte Modelos do Dataflow. Para uma lista de todos os modelos fornecidos pelo Google, consulte Primeiros passos com os modelos fornecidos pelo Google.

Neste guia, você verá os modelos de streaming.

Assinatura do Pub/Sub para BigQuery

A inscrição Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma assinatura de Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • As mensagens do Pub/Sub precisam estar no formato JSON, descritas aqui. Por exemplo, mensagens formatadas como {"k1":"v1", "k2":"v2"} podem ser inseridas em uma tabela do BigQuery com duas colunas, chamadas k1 e k2, com dados do tipo string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription O tópico de entrada do Cloud Pub/Sub que será lido, no formato de projects/<project>/subscriptions/<subscription>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada OUTPUT_TABLE_SPEC_error_records.

Como executar a inscrição Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Subscription to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

Tópico do Pub/Sub para o BigQuery

O tópico do Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de um tópico do Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • As mensagens do Pub/Sub precisam estar no formato JSON, descritas aqui. Por exemplo, mensagens formatadas como {"k1":"v1", "k2":"v2"} podem ser inseridas em uma tabela do BigQuery com duas colunas, chamadas k1 e k2, com dados do tipo string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico de entrada do tópico do Pub/Sub que será lido, no formato de projects/<project>/topics/<topic>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não chegaram à tabela de saída. É preciso usar o formato <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o tópico do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Topic to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

Avro do Pub/Sub para BigQuery

O modelo do Avro do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do Avro de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Requisitos para esse pipeline

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema para os registros do Avro precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
schemaPath O local do Cloud Storage do arquivo de esquema do Avro. Por exemplo, gs://path/to/my/schema.avsc.
inputSubscription A assinatura de entrada do Pub/Sub a ser lida. Por exemplo, projects/<project>/subscriptions/<subscription>
outputTopic O tópico do Pub/Sub a ser usado para registros não processados. Por exemplo, projects/<project-id>/topics/<topic-name>
outputTableSpec O local da tabela de saída do BigQuery. Por exemplo, <my-project>:<my-dataset>.<my-table>. Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário.
writeDisposition (Opcional) O WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Padrão: WRITE_APPEND
createDisposition (Opcional) O CreateDisposition do BigQuery. Por exemplo: CREATE_IF_NEEDED e CREATE_NEVER. Padrão: CREATE_IF_NEEDED

Como executar o Avro do Pub/Sub para o modelo do BigQuery

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Avro to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

Pub/Sub para Pub/Sub

O modelo do Pub/Sub para Pub/Sub é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava as mensagens em outro tópico do Pub/Sub. O pipeline também aceita uma chave de atributo de mensagem opcional e um valor que pode ser usado para filtrar as mensagens que precisam ser gravadas no tópico do Pub/Sub. Use esse modelo para copiar mensagens de uma assinatura do Pub/Sub para outro tópico do Pub/Sub com um filtro de mensagem opcional.

Requisitos para este pipeline:

  • É necessário que a inscrição do Pub/Sub de origem exista antes da execução.
  • O tópico do Pub/Sub de destino precisa existir antes da execução.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Assinatura do Pub/Sub em que será lida a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tópico do Cloud Pub/Sub onde será gravada a saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
filterKey [Opcional] Filtra eventos com base em uma chave de atributo. Nenhum filtro será aplicado se filterKey não for especificado.
filterValue [Opcional] Filtra o valor do atributo a ser usado no caso de um filterKey ser fornecido. Um filterValue nulo é usado por padrão.

Como executar o modelo do Pub/Sub para Pub/Sub

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. As correspondências parciais (por exemplo, substring) não serão filtradas. Um valor de filtro de evento nulo é usado por padrão

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. As correspondências parciais (por exemplo, substring) não serão filtradas. Um valor de filtro de evento nulo é usado por padrão

Pub/Sub para Splunk

O modelo do Pub/Sub para Splunk é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava o payload da mensagem no Splunk pelo Coletor de eventos HTTP (HEC, na sigla em inglês) do Splunk. O caso de uso mais comum desse modelo é exportar registros para o Splunk. Para ver um exemplo do fluxo de trabalho subjacente, consulte Como implantar exportações de registro prontas para produção no Splunk usando o Dataflow.

Antes de gravar no Splunk, também é possível aplicar uma função JavaScript definida pelo usuário ao payload da mensagem. Todas as mensagens que apresentam falhas de processamento são encaminhadas para um tópico não processado do Pub/Sub para posterior resolução de problemas e reprocessamento.

Como uma camada extra de proteção para o token HEC, é possível passar uma chave do Cloud KMS com o parâmetro de token HEC codificado em base64 criptografado com a chave do Cloud KMS. Consulte o endpoint de criptografia da API Cloud KMS para saber mais detalhes sobre como criptografar o parâmetro de token do HEC.

Requisitos para este pipeline:

  • A inscrição do Pub/Sub de origem precisa existir antes da execução do pipeline.
  • O tópico do Pub/Sub precisa existir antes de o pipeline ser executado.
  • O endpoint HEC de Splunk precisa ser acessível pela rede dos workers do Dataflow.
  • O token Splunk de HEC precisa ser gerado e estar disponível.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription A inscrição do Pub/Sub a partir da qual ler a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
token O token de autenticação de HEC de Splunk. Essa string codificada em base64 pode ser criptografada com uma chave do Cloud KMS para maior segurança.
url O URL de HEC do Splunk. Precisa ser roteável a partir da VPC na qual o pipeline é executado. Por exemplo, https://splunk-hec-host:8088.
outputDeadletterTopic O tópico do Pub/Sub para encaminhar mensagens não entregues. Por exemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Opcional] O caminho do Cloud Storage que contém todo o código do JavaScript. Por exemplo, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Opcional] O nome da função JavaScript a ser chamada. Por exemplo, se a função JavaScript for function myTransform(inJson) { ...dostuff...}, o nome da função será myTransform.
batchCount [Opcional] O tamanho do lote para enviar vários eventos para o Splunk. Padrão 1 (sem loteamento).
parallelism [Opcional] O número máximo de solicitações paralelas. Padrão 1 (sem paralelismo).
disableCertificateValidation [Opcional] Desativar a validação do certificado SSL. Falso padrão (validação ativada).
includePubsubMessage [Opcional] Inclua a mensagem completa do Pub/Sub no payload. Falso padrão (somente o elemento de dados está incluído no payload).
tokenKMSEncryptionKey [Opcional] A chave do Cloud KMS para descriptografar a string do token HEC. Se a chave do Cloud KMS for fornecida, a string do token HEC precisará ser transmitida de forma criptografada.

Como executar o modelo do Pub/Sub para o Splunk

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Splunk template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função do JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript (por exemplo, gs://your-bucket/your-function.js)
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função do JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript (por exemplo, gs://your-bucket/your-function.js)
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL

Pub/Sub para arquivos Avro no Cloud Storage

O modelo do Pub/Sub para arquivos do Avro no Cloud Storage é um pipeline de streaming que lê dados de um tópico do Pub/Sub e grava arquivos Avro no bucket especificado do Cloud Storage.

Requisitos para este pipeline:

  • O tópico de entrada do Pub/Sub precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputTopic Tópico do Pub/Sub que será assinado para consumo de mensagens. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory Diretório em que os arquivos Avro de saída são arquivados. Precisa conter / no final. Por exemplo, gs://example-bucket/example-directory/.
avroTempDirectory Diretório dos arquivos Avro temporários. Precisa conter / no final. Por exemplo, gs://example-bucket/example-directory/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída para os arquivos Avro.
outputFilenameSuffix [Opcional] Sufixo do nome do arquivo de saída para os arquivos Avro.
outputShardTemplate [Opcional] O modelo de fragmento do arquivo de saída. Ela é especificada como sequências repetidas das letras S ou N. Por exemplo, SSS-NNN. Eles são substituídos pelo número do fragmento ou pelo número total de fragmentos, respectivamente. Quando esse parâmetro não for especificado, o formato de modelo padrão será W-P-SS-of-NN.

Como executar o modelo Avro do Pub/Sub com o Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Avro Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência

Pub/Sub para arquivos de texto no Cloud Storage

O modelo do Pub/Sub para o Cloud Storage Text é um pipeline de streaming que lê registros do Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos para este pipeline:

  • O tópico do Pub/Sub precisa existir antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory O caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
outputFilenamePrefix O prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-.
outputFilenameSuffix O sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv.
outputShardTemplate O modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são colocados em um único arquivo por janela. O outputShardTemplate padrão é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate será 00-of-01.

Como executar o modelo do Pub/Sub para arquivos de texto no Cloud Storage

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

Pub/Sub para MongoDB

O modelo do Pub/Sub para MongoDB é um pipeline de streaming que lê mensagens JSON codificadas de uma assinatura do Pub/Sub e grava no MongoDB como documentos. Caso seja necessário, o pipeline é compatível com transformações extras que podem ser incluídas usando uma função definida pelo usuário (UDF) do JavaScript. Qualquer erro ocorrido devido à incompatibilidade de esquema, JSON malformado ou ao executar transformações são gravados em uma tabela do BigQuery para mensagens não processadas, juntamente com mensagens de entrada. Se não existir uma tabela para registros não processados antes da execução, o pipeline criará automaticamente essa tabela.

Requisitos para este pipeline:

  • A assinatura do Pub/Sub precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • O cluster do MongoDB precisa existir e poder ser acessado das máquinas de trabalho do Dataflow.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Nome da assinatura do Pub/Sub. Exemplo: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Lista separada por vírgulas dos servidores MongoDB. Exemplo: 192.285.234.12:27017,192.287.123.11:27017
database Banco de dados no MongoDB para armazenar a coleção. Por exemplo, my-db.
collection Nome da coleção dentro do banco de dados MongoDB. Por exemplo, my-collection.
deadletterTable Tabela do BigQuery que armazena mensagens geradas por falhas (esquema incorreto, JSON incorreto etc.). Por exemplo, project-id:dataset-name.table-name.
javascriptTextTransformGcsPath [Opcional] Local do Cloud Storage do arquivo JavaScript na transformação da UDF. Por exemplo, gs://mybucket/filename.json.
javascriptTextTransformFunctionName [Opcional] Nome da UDF em JavaScript. Por exemplo, transform.
batchSize [Opcional] Tamanho do lote usado para a inserção de documentos em lote no MongoDB. Padrão: 1000.
batchSizeBytes [Opcional] Tamanho do lote em bytes. Padrão: 5242880.
maxConnectionIdleTime [Opcional] Tempo máximo de inatividade permitido em segundos antes que o tempo limite da conexão ocorra. Padrão: 60000.
sslEnabled [Opcional] Valor booleano que indica se a conexão com o MongoDB está ativada para SSL. Padrão: true.
ignoreSSLCertificate [Opcional] Valor booleano que indica se o certificado SSL deve ser ignorado. Padrão: true.
withOrdered [Opcional] Valor booleano que permite inserções ordenadas em massa para o MongoDB. Padrão: true.
withSSLInvalidHostNameAllowed [Opcional] Valor booleano que indica se o nome do host inválido é permitido para conexão SSL. Padrão: true.

Como executar o modelo do Pub/Sub para MongoDB

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to MongoDB template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

Pub/Sub para Elasticsearch

O modelo do Pub/Sub para Elasticsearch é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub, executa uma função definida pelo usuário (UDF, na sigla em inglês) e as grava no Elasticsearch como documentos. O modelo do Dataflow usa o recurso de fluxos de dados do Elasticsearch para armazenar dados de série temporal em vários índices, oferecendo um único recurso nomeado para solicitações. Esses fluxos são adequados para registros, métricas, rastros e outros dados gerados continuamente armazenados no Pub/Sub.

Requisitos para esse pipeline

  • A assinatura do Pub/Sub de origem precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • Um host Elasticsearch acessível publicamente em uma instância do GCP ou no Elastic Cloud com o Elasticsearch versão 7.0 ou mais recente. Consulte Integração do Google Cloud para Elastic para ver mais detalhes.
  • Um tópico do Pub/Sub para saída de erros.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription A assinatura do Cloud Pub/Sub a ser consumida. O nome precisa estar no formato projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl URL do Elasticsearch no formato https://hostname:[port] ou especifique o CloudID se estiver usando o Elastic Cloud.
apiKey Chave da API codificada em Base64 usada para autenticação.
errorOutputTopic Tópico de saída do Pub/Sub para publicar registros com falha no formato de projects/<project-id>/topics/<topic-name>
dataset [Opcional] O tipo de registros enviados via Pub/Sub, para os quais temos um painel pronto para uso. Os valores de tipos de registro conhecidos são audit, vpcflow e firewall. Padrão: pubsub.
namespace [Opcional] Um agrupamento arbitrário, como um ambiente (dev, prod ou qa), uma equipe ou uma unidade de negócios estratégica. Padrão: default.
batchSize [Opcional] Tamanho do lote em número de documentos. Padrão: 1000.
batchSizeBytes [Opcional] Tamanho do lote em número de bytes. Padrão: 5242880 (5 mb).
maxRetryAttempts [Opcional] Máximo de tentativas de repetição. Precisa ser > 0. Padrão: no retries.
maxRetryDuration [Opcional] A duração máxima da nova tentativa em milissegundos precisa ser maior que 0. Padrão: no retries.
javascriptTextTransformGcsPath [Opcional] O URL completo do seu arquivo .js. Por exemplo, gs://your-bucket/your-function.js
javascriptTextTransformFunctionName [Opcional] O nome da função JavaScript a ser chamada. Por exemplo, se a função JavaScript for myTransform(inJson) { ...dostuff...}, o nome da função será myTransform.

Como executar o Pub/Sub para o modelo do MongoDB

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

Datastream para o Cloud Spanner

O modelo do Datastream para Cloud Spanner é um pipeline de streaming que lê eventos do Datastream de um bucket do Cloud Storage e os grava em um banco de dados do Cloud Spanner. Ela é destinada à migração de dados de fontes do Datastream para o Cloud Spanner.

Todas as tabelas necessárias para migração precisam existir no banco de dados de destino do Cloud Spanner antes da execução do modelo. Portanto, a migração de esquema de um banco de dados de origem para o Cloud Spanner de destino precisa ser concluída antes da migração de dados. Os dados podem existir nas tabelas antes da migração. Esse modelo não propaga alterações de esquema do Datastream no banco de dados do Cloud Spanner.

A consistência de dados é garantida apenas no final da migração, quando todos os dados tiverem sido gravados no Cloud Spanner. Para armazenar informações de pedidos de cada registro gravado no Cloud Spanner, esse modelo cria uma tabela adicional (chamada de tabela de sombra) para cada tabela no banco de dados do Cloud Spanner. Isso é usado para garantir consistência no final da migração. As tabelas de sombra não são excluídas após a migração e podem ser usadas para fins de validação no final da migração.

Todos os erros que ocorrem durante a operação, como incompatibilidades de esquema, arquivos JSON malformados ou erros resultantes da execução de transformações, são registrados em uma fila de erros. A fila de erros é uma pasta do Cloud Storage que armazena todos os eventos do Datastream que encontraram erros, além do motivo do erro. em formato de texto. Os erros podem ser temporários ou permanentes e são armazenados em pastas apropriadas do Cloud Storage na fila de erros. Os erros temporários são repetidos automaticamente, ao contrário dos permanentes. No caso de erros permanentes, você tem a opção de fazer correções nos eventos de mudança e movê-los para o bucket recuperável enquanto o modelo estiver em execução.

Requisitos para este pipeline:

  • Um fluxo de Datastream no estado Em execução ou Não iniciado.
  • Um bucket do Cloud Storage em que os eventos do Datastream são replicados.
  • Um banco de dados do Cloud Spanner com tabelas existentes. Essas tabelas podem estar vazias ou conter dados.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse é o caminho raiz de um stream.
streamName O nome ou modelo do stream para pesquisar informações de esquema e tipo de origem.
instanceId A instância do Cloud Spanner em que as alterações são replicadas.
databaseId O banco de dados do Cloud Spanner em que as alterações são replicadas.
projectId O ID do projeto do Cloud Spanner.
deadLetterQueueDirectory (Opcional) Esse é o caminho do arquivo para armazenar a saída da fila de erros. O padrão é um diretório no local temporário do job do Dataflow.
inputFileFormat (Opcional) O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
shadowTablePrefix (Opcional) O prefixo usado para nomear tabelas de sombra. Padrão: shadow_.

Como executar o modelo do Datastream para o Cloud Spanner

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to Spanner template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Cloud Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Cloud Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Cloud Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Cloud Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

Arquivos de texto no Cloud Storage para BigQuery (stream)

O pipeline de arquivos de texto no Cloud Storage para BigQuery permite ler arquivos de texto armazenados no Cloud Storage, transformá-los usando uma função definida pelo usuário (UDF) do JavaScript fornecida por você e anexar o resultado no BigQuery.

O pipeline é executado indefinidamente e precisa ser encerrado manualmente por meio de cancelar e não drenar, devido ao uso da transformação Watch, que é um DoFn divisível que não é compatível com a drenagem.

Requisitos para este pipeline:

  • Crie um arquivo JSON que descreva o esquema da tabela de saída no BigQuery.

    Verifique se há uma matriz JSON de nível superior intitulada BigQuery Schema e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemplo:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Crie um arquivo JavaScript (.js) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    Por exemplo, esta função divide cada linha de um arquivo CSV e retorna uma string JSON depois de transformar os valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parâmetros do modelo

Parâmetro Descrição
javascriptTextTransformGcsPath Local do Cloud Storage da UDF do JavaScript. Por exemplo, gs://my_bucket/my_function.js.
JSONPath Local do Cloud Storage do arquivo de esquema do BigQuery, descrito como um JSON. Por exemplo, gs://path/to/my/schema.json.
javascriptTextTransformFunctionName O nome da função JavaScript que você quer chamar como UDF. Por exemplo, transform.
outputTable A tabela do BigQuery totalmente qualificada. Exemplo: my-project:dataset.table
inputFilePattern Local do Cloud Storage do texto que você quer processar. Por exemplo, gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Diretório temporário para o processo de carregamento do BigQuery. Exemplo: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabela de mensagens que não alcançaram a tabela de saída. Por exemplo, my-project:dataset.my-unprocessed-table. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo Cloud Storage Text no BigQuery (Stream)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

Arquivos de texto no Cloud Storage para o Pub/Sub (stream)

Esse modelo cria um pipeline de streaming que pesquisa continuamente novos arquivos de texto carregados no Cloud Storage, lê cada arquivo linha por linha e publica strings em um tópico do Pub/Sub. O modelo publica registros em um arquivo delimitado por uma nova linha contendo registros JSON ou em um arquivo CSV em um tópico do Pub/Sub para processamento em tempo real. É possível usar esse modelo para reproduzir dados novamente no Pub/Sub.

O pipeline é executado indefinidamente e precisa ser encerrado manualmente com um "cancel", e não com um "drain", porque ele usa uma transformação "Watch" que é um "SplittableDoFn" não compatível com drenagem.

Atualmente, o intervalo de pesquisa é fixo e definido para 10 segundos. Esse modelo não configura carimbos de data/hora nos registros individuais, assim o horário do evento será igual ao da publicação durante a execução. Se o pipeline depender de um tempo exato do evento para processamento, não o utilize.

Requisitos para este pipeline:

  • Os arquivos de entrada precisam estar no formato JSON ou CSV delimitado por nova linha. Os registros que ocupam várias linhas nos arquivos de origem podem causar problemas no downstream, já que cada linha nos arquivos será publicada como uma mensagem para o Pub/Sub.
  • O tópico do Pub/Sub precisa existir antes da execução.
  • O pipeline é executado indefinidamente e precisa ser finalizado manualmente.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/files/*.json ou gs://bucket-name/path/*.csv.
outputTopic O tópico de entrada do Pub/Sub a ser gravado. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.

Como executar os arquivos de texto no Cloud Storage para o modelo Pub/Sub (stream)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).

Tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

O modelo de mascaramento de dados/tokenização do Cloud Storage para o BigQuery (usando o Cloud DLP) é um pipeline de streaming que lê arquivos csv de um bucket do Cloud Storage, chama a API Cloud Data Loss Prevention (Cloud DLP) para remoção identificação e grava os dados desidentificados na tabela especificada do BigQuery. Este modelo é compatível com o uso de um modelo de inspeção e um modelo de desidentificação, ambos do Cloud DLP. Isso permite que os usuários inspecionem informações potencialmente confidenciais e façam a desidentificação. Além disso, também é possível desidentificar os dados estruturados em que as colunas são especificadas para serem desidentificadas sem nenhuma inspeção necessária.

Requisitos para este pipeline:

  • Os dados de entrada para tokenizar precisam existir.
  • Os modelos do Cloud DLP precisam existir (por exemplo, DeidentifyTemplate e InspectTemplate). Veja os modelos do Cloud DLP para mais detalhes.
  • O conjunto de dados do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern Arquivo(s) csv para ler registros de dados de entrada. O uso de caracteres curingas também é aceito. Por exemplo, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv.
dlpProjectId ID do projeto do Cloud DLP que tem o recurso da API Cloud DLP. Esse projeto do Cloud DLP pode ser o mesmo projeto que tem os modelos do Cloud DLP ou pode ser um projeto separado. Por exemplo, my_dlp_api_project.
deidentifyTemplateName Modelo de desidentificação do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por exemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de dados do BigQuery para enviar resultados tokenizados.
batchSize Tamanho do lote/divisão para enviar dados para inspecionar e/ou retirar a tokenização. Se for um arquivo csv, batchSize é o número de linhas em um lote. Os usuários precisam determinar o tamanho do lote com base no tamanho dos registros e no tamanho do arquivo. Observe que a API Cloud DLP tem um limite de tamanho de payload de 524 KB por chamada de API.
inspectTemplateName [Opcional] Modelo de inspeção do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/identifyTemplates/{idTemplateId}. Por exemplo, projects/my_project/identifyTemplates/100.

Como executar o modelo de tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. O endpoint regional padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

Substitua:

  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Cloud em que você quer executar o job do Dataflow
  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: o endpoint regional em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

    • latest para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates/latest/
    • o nome da versão, como 2021-09-20-00_RC00, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).