Modelos de streaming fornecidos pelo Google

O Google fornece um conjunto de modelos de código aberto (em inglês) do Cloud Dataflow. Para informações gerais sobre modelos, consulte a página Visão geral. Para uma lista de todos os modelos fornecidos pelo Google, consulte esta página.

Nesta página, você verá a documentação dos modelos de streaming:

Assinatura do Pub/Sub para BigQuery

A inscrição Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma assinatura de Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • As mensagens do Pub/Sub precisam estar no formato JSON, descritas aqui. Por exemplo, mensagens formatadas como {"k1":"v1", "k2":"v2"} podem ser inseridas em uma tabela do BigQuery com duas colunas, chamadas k1 e k2, com dados do tipo string.
  • O diretório de saída precisa ser criado antes de executar o pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription O tópico de entrada do Cloud Pub/Sub que será lido, no formato de projects/<project>/subscriptions/<subscription>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não chegaram à tabela de saída, no formato de <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar a inscrição Pub/Sub para o modelo do BigQuery

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub Subscription to BigQuery template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Tópico do Pub/Sub para o BigQuery

O tópico do Pub/Sub para o modelo do BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de um tópico do Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos para este pipeline:

  • As mensagens do Pub/Sub precisam estar no formato JSON, descritas aqui. Por exemplo, mensagens formatadas como {"k1":"v1", "k2":"v2"} podem ser inseridas em uma tabela do BigQuery com duas colunas, chamadas k1 e k2, com dados do tipo string.
  • A tabela de saída precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico de entrada do tópico do Pub/Sub que será lido, no formato de projects/<project>/topics/<topic>.
outputTableSpec O local da tabela de saída do BigQuery, no formato de <my-project>:<my-dataset>.<my-table>
outputDeadletterTable A tabela do BigQuery para mensagens que não chegaram à tabela de saída. É preciso usar o formato <my-project>:<my-dataset>.<my-table>. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o tópico do Pub/Sub para o modelo do BigQuery

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub Topic to BigQuery template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Avro do Pub/Sub para BigQuery

O modelo do Avro do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do Avro de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Requisitos para esse pipeline

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema para os registros do Avro precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
schemaPath O local do Cloud Storage do arquivo de esquema do Avro. Por exemplo, gs://path/to/my/schema.avsc.
inputSubscription A assinatura de entrada do Pub/Sub a ser lida. Por exemplo, projects/<project>/subscriptions/<subscription>
outputTopic O tópico do Pub/Sub a ser usado para registros não processados. Por exemplo, projects/<project-id>/topics/<topic-name>
outputTableSpec O local da tabela de saída do BigQuery. Por exemplo, <my-project>:<my-dataset>.<my-table>. Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário.
writeDisposition (Opcional) O WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Padrão: WRITE_APPEND
createDisposition (Opcional) O CreateDisposition do BigQuery. Por exemplo: CREATE_IF_NEEDED e CREATE_NEVER. Padrão: CREATE_IF_NEEDED

Como executar o Avro do Pub/Sub para o modelo do BigQuery

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub Avro to BigQuery template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Substitua:

  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada.
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub para Pub/Sub

O modelo do Pub/Sub para Pub/Sub é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava as mensagens em outro tópico do Pub/Sub. O pipeline também aceita uma chave de atributo de mensagem opcional e um valor que pode ser usado para filtrar as mensagens que precisam ser gravadas no tópico do Pub/Sub. Use esse modelo para copiar mensagens de uma assinatura do Pub/Sub para outro tópico do Pub/Sub com um filtro de mensagem opcional.

Requisitos para este pipeline:

  • É necessário que a inscrição do Pub/Sub de origem exista antes da execução.
  • O tópico do Pub/Sub de destino precisa existir antes da execução.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Assinatura do Pub/Sub em que será lida a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tópico do Cloud Pub/Sub onde será gravada a saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
filterKey [Opcional] Filtra eventos com base em uma chave de atributo. Nenhum filtro será aplicado se filterKey não for especificado.
filterValue [Opcional] Filtra o valor do atributo a ser usado no caso de um filterKey ser fornecido. Um filterValue nulo é usado por padrão.

Como executar o modelo do Pub/Sub para Pub/Sub

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub to Pub/Sub template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. As correspondências parciais (por exemplo, substring) não serão filtradas. Um valor de filtro de evento nulo é usado por padrão
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • FILTER_KEY: a chave de atributo que servirá para filtrar os eventos. Nenhum filtro será aplicado se nenhuma chave for especificada
  • FILTER_VALUE: valor do atributo de filtro a ser usado se uma chave de filtro de eventos for fornecida. Aceita uma string Java Regex válida como um valor de filtro de eventos. Se uma regex for fornecida, a expressão completa deverá corresponder para que a mensagem seja filtrada. As correspondências parciais (por exemplo, substring) não serão filtradas. Um valor de filtro de evento nulo é usado por padrão
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub para Splunk

O modelo do Pub/Sub para Splunk é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub e grava o payload da mensagem no Splunk pelo Coletor de eventos HTTP (HEC, na sigla em inglês) do Splunk. Antes de gravar no Splunk, também é possível aplicar uma função JavaScript definida pelo usuário ao payload da mensagem. Todas as mensagens que apresentam falhas de processamento são encaminhadas para um tópico de mensagens não processadas do Pub/Sub para posterior resolução de problemas e reprocessamento.

Como uma camada extra de proteção para o token HEC, é possível passar uma chave do Cloud KMS com o parâmetro de token HEC codificado em base64 criptografado com a chave do Cloud KMS. Consulte o endpoint de criptografia da API Cloud KMS para saber mais detalhes sobre como criptografar o parâmetro de token do HEC.

Requisitos para este pipeline:

  • A inscrição do Pub/Sub de origem precisa existir antes da execução do pipeline.
  • O tópico do Pub/Sub precisa existir antes de o pipeline ser executado.
  • O endpoint HEC de Splunk precisa ser acessível pela rede dos workers do Dataflow.
  • O token Splunk de HEC precisa ser gerado e estar disponível.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription A inscrição do Pub/Sub a partir da qual ler a entrada. Por exemplo, projects/<project-id>/subscriptions/<subscription-name>.
token O token de autenticação de HEC de Splunk. Essa string codificada em base64 pode ser criptografada com uma chave do Cloud KMS para maior segurança.
url O URL de HEC do Splunk. Precisa ser roteável a partir da VPC na qual o pipeline é executado. Por exemplo, https://splunk-hec-host:8088.
outputDeadletterTopic O tópico do Pub/Sub para encaminhar mensagens não entregues. Por exemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Opcional] O caminho do Cloud Storage que contém todo o código do JavaScript. Por exemplo, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Opcional] O nome da função JavaScript a ser chamada. Por exemplo, se a função JavaScript for function myTransform(inJson) { ...dostuff...}, o nome da função será myTransform.
batchCount [Opcional] O tamanho do lote para enviar vários eventos para o Splunk. Padrão 1 (sem loteamento).
parallelism [Opcional] O número máximo de solicitações paralelas. Padrão 1 (sem paralelismo).
disableCertificateValidation [Opcional] Desativar a validação do certificado SSL. Falso padrão (validação ativada).
includePubsubMessage [Opcional] Inclua a mensagem completa do Pub/Sub no payload. Falso padrão (somente o elemento de dados está incluído no payload).
tokenKMSEncryptionKey [Opcional] A chave do Cloud KMS para descriptografar a string do token HEC. Se a chave do Cloud KMS for fornecida, a string do token HEC precisará ser transmitida de forma criptografada.

Como executar o modelo do Pub/Sub para o Splunk

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub to Splunk template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função do JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript (por exemplo, gs://your-bucket/your-function.js)
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • TOKEN: token do coletor de eventos HTTP do Splunk
  • URL: o caminho do URL para o Coletor de eventos HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função do JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript (por exemplo, gs://your-bucket/your-function.js)
  • BATCH_COUNT: o tamanho do lote a ser usado para enviar vários eventos para o Splunk
  • PARALLELISM: o número de solicitações paralelas a serem usadas para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se você quiser desativar a validação do certificado SSL
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub para arquivos Avro no Cloud Storage

O modelo do Pub/Sub para arquivos do Avro no Cloud Storage é um pipeline de streaming que lê dados de um tópico do Pub/Sub e grava arquivos Avro no bucket especificado do Cloud Storage.

Requisitos para este pipeline:

  • O tópico de entrada do Pub/Sub precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputTopic Tópico do Cloud Pub/Sub que será assinado para consumo de mensagens. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory Diretório em que os arquivos Avro de saída serão arquivados. Adicione uma barra (/) ao final. Por exemplo, gs://example-bucket/example-directory/.
avroTempDirectory Diretório dos arquivos Avro temporários. Adicione / no final. Por exemplo, gs://example-bucket/example-directory/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída para os arquivos Avro.
outputFilenameSuffix [Opcional] Sufixo do nome do arquivo de saída para os arquivos Avro.
outputShardTemplate [Opcional] O modelo de fragmento do arquivo de saída. Especificado como sequências das letras "S" ou "N" que se repetem (exemplo: SSS-NNN). São substituídos pelo número do fragmento ou pela quantidade de fragmentos, respectivamente. O formato padrão do modelo é "W-P-SS-of-NN" quando o parâmetro não é especificado.
numShards [Opcional] O número máximo de fragmentos de saída produzidos durante a gravação. O número máximo padrão de fragmentos é 1.

Como executar o modelo Avro do Pub/Sub com o Cloud Storage

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub to Cloud Storage Avro template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: nome do job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência
  • NUM_SHARDS: o número de fragmentos de saída
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: nome do job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILENAME_PREFIX: o prefixo de nome de arquivo de saída de sua preferência
  • FILENAME_SUFFIX: o sufixo de nome de arquivo de saída de sua preferência
  • SHARD_TEMPLATE: o modelo de fragmento de saída de sua preferência
  • NUM_SHARDS: o número de fragmentos de saída
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub para arquivos de texto no Cloud Storage

O modelo do Pub/Sub para o Cloud Storage Text é um pipeline de streaming que lê registros do Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos para este pipeline:

  • O tópico do Pub/Sub precisa existir antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>.
outputDirectory O caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
outputFilenamePrefix O prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-.
outputFilenameSuffix O sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv.
outputShardTemplate O modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são colocados em um único arquivo por janela. O outputShardTemplate padrão é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate será 00-of-01.

Como executar o modelo do Pub/Sub para arquivos de texto no Cloud Storage

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Pub/Sub to Text Files on Cloud Storage template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub para MongoDB

O modelo do Pub/Sub para MongoDB é um pipeline de streaming que lê mensagens JSON codificadas de uma assinatura do Pub/Sub e grava no MongoDB como documentos. Caso seja necessário, o pipeline é compatível com transformações extras que podem ser incluídas usando uma função definida pelo usuário (UDF) do JavaScript. Qualquer erro ocorrido devido à incompatibilidade de esquema, JSON malformado ou ao executar transformações são gravados em uma tabela do BigQuery para mensagens não processadas, juntamente com mensagens de entrada. Se não existir uma tabela para registros não processados antes da execução, o pipeline criará automaticamente essa tabela.

Requisitos para este pipeline:

  • A assinatura do Pub/Sub precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • O cluster do MongoDB precisa existir e poder ser acessado das máquinas de trabalho do Dataflow.

Parâmetros do modelo

Parâmetro Descrição
inputSubscription Nome da assinatura do Pub/Sub. Exemplo: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri Lista separada por vírgulas dos servidores MongoDB. Exemplo: 192.285.234.12:27017,192.287.123.11:27017
database Banco de dados no MongoDB para armazenar a coleção. Por exemplo, my-db.
collection Nome da coleção dentro do banco de dados MongoDB. Por exemplo, my-collection.
deadletterTable Tabela do BigQuery que armazena mensagens geradas por falhas (esquema incorreto, JSON incorreto etc.). Por exemplo, project-id:dataset-name.table-name.
javascriptTextTransformGcsPath [Opcional] Local do Cloud Storage do arquivo JavaScript na transformação da UDF. Por exemplo, gs://mybucket/filename.json.
javascriptTextTransformFunctionName [Opcional] Nome da UDF em JavaScript. Por exemplo, transform.
batchSize [Opcional] Tamanho do lote usado para a inserção de documentos em lote no MongoDB. Padrão: 1000.
batchSizeBytes [Opcional] Tamanho do lote em bytes. Padrão: 5242880.
maxConnectionIdleTime [Opcional] Tempo máximo de inatividade permitido em segundos antes que o tempo limite da conexão ocorra. Padrão: 60000.
sslEnabled [Opcional] Valor booleano que indica se a conexão com o MongoDB está ativada para SSL. Padrão: true.
ignoreSSLCertificate [Opcional] Valor booleano que indica se o certificado SSL deve ser ignorado. Padrão: true.
withOrdered [Opcional] Valor booleano que permite inserções ordenadas em massa para o MongoDB. Padrão: true.
withSSLInvalidHostNameAllowed [Opcional] Valor booleano que indica se o nome do host inválido é permitido para conexão SSL. Padrão: true.

Como executar o modelo do Pub/Sub para MongoDB

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione Pub/Sub to MongoDB template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para usar modelos com a ferramenta de linha de comando gcloud, você precisa ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Substitua:

  • PROJECT_ID: ID do projeto
  • REGION_NAME: nome da região do Dataflow (por exemplo, us-central1)
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • LOCATION: nome da região do Dataflow (por exemplo, us-central1)
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • INPUT_SUBSCRIPTION: a assinatura do Pub/Sub (por exemplo, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: os endereços de servidor do MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome do banco de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Arquivos de texto no Cloud Storage para BigQuery (stream)

O pipeline de arquivos de texto no Cloud Storage para BigQuery permite ler arquivos de texto armazenados no Cloud Storage, transformá-los usando uma função definida pelo usuário (UDF) do JavaScript fornecida por você e gerar o resultado no BigQuery.

Requisitos para este pipeline:

  • Crie um arquivo de esquema do BigQuery formatado em JSON que descreva sua tabela de saída.
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • Crie um arquivo JavaScript (.js) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    Por exemplo, esta função divide cada linha de um arquivo CSV e retorna uma string JSON depois de transformar os valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parâmetros do modelo

Parâmetro Descrição
javascriptTextTransformGcsPath Local do Cloud Storage da UDF do JavaScript. Por exemplo, gs://my_bucket/my_function.js.
JSONPath Local do Cloud Storage do arquivo de esquema do BigQuery, descrito como um JSON. Por exemplo, gs://path/to/my/schema.json.
javascriptTextTransformFunctionName O nome da função JavaScript que você quer chamar como UDF. Por exemplo, transform.
outputTable A tabela do BigQuery totalmente qualificada. Exemplo: my-project:dataset.table
inputFilePattern Local do Cloud Storage do texto que você quer processar. Por exemplo, gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Diretório temporário para o processo de carregamento do BigQuery. Exemplo: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabela de mensagens que não alcançaram a tabela de saída. Por exemplo, my-project:dataset.my-unprocessed-table. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo Cloud Storage Text no BigQuery (Stream)

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Cloud Storage Text to BigQuery template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: o nome da UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Arquivos de texto no Cloud Storage para o Pub/Sub (stream)

Esse modelo cria um pipeline de streaming que pesquisa continuamente novos arquivos de texto carregados no Cloud Storage, lê cada arquivo linha por linha e publica strings em um tópico do Pub/Sub. O modelo publica registros em um arquivo delimitado por uma nova linha contendo registros JSON ou em um arquivo CSV em um tópico do Pub/Sub para processamento em tempo real. É possível usar esse modelo para reproduzir dados novamente no Pub/Sub.

Atualmente, o intervalo de pesquisa é fixo e definido para 10 segundos. Esse modelo não configura carimbos de data/hora nos registros individuais, assim o horário do evento será igual ao da publicação durante a execução. Se o pipeline depender de um tempo exato do evento para processamento, não o utilize.

Requisitos para este pipeline:

  • Os arquivos de entrada precisam estar no formato JSON ou CSV delimitado por nova linha. Os registros que ocupam várias linhas nos arquivos de origem podem causar problemas no downstream, já que cada linha nos arquivos será publicada como uma mensagem para o Pub/Sub.
  • O tópico do Pub/Sub precisa existir antes da execução.
  • O pipeline é executado indefinidamente e precisa ser finalizado manualmente.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/files/*.json ou gs://bucket-name/path/*.csv.
outputTopic O tópico de entrada do Pub/Sub a ser gravado. O nome precisa estar no formato projects/<project-id>/topics/<topic-name>.

Como executar os arquivos de texto no Cloud Storage para o modelo Pub/Sub (Stream)

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • FILE_PATTERN: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo, path/*.csv).
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Tokenização/mascaramento de dados do Cloud Storage para o BigQuery (usando o Cloud DLP)

O modelo de mascaramento de dados/tokenização do Cloud Storage para o BigQuery (usando o Cloud DLP) é um pipeline de streaming que lê arquivos csv de um bucket do Cloud Storage, chama a API Cloud Data Loss Prevention (Cloud DLP) para remoção identificação e grava os dados desidentificados na tabela especificada do BigQuery. Este modelo é compatível com o uso de um modelo de inspeção e um modelo de desidentificação, ambos do Cloud DLP. Isso permite que os usuários inspecionem informações potencialmente confidenciais e façam a desidentificação. Além disso, também é possível desidentificar os dados estruturados em que as colunas são especificadas para serem desidentificadas sem nenhuma inspeção necessária.

Requisitos para este pipeline:

  • Os dados de entrada para tokenizar precisam existir.
  • Os modelos do Cloud DLP precisam existir (por exemplo, DeidentifyTemplate e InspectTemplate). Veja os modelos do Cloud DLP para mais detalhes.
  • O conjunto de dados do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern Arquivo(s) csv para ler registros de dados de entrada. O uso de caracteres curingas também é aceito. Por exemplo, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv.
dlpProjectId ID do projeto do Cloud DLP que tem o recurso da API Cloud DLP. Esse projeto do Cloud DLP pode ser o mesmo projeto que tem os modelos do Cloud DLP ou pode ser um projeto separado. Por exemplo, my_dlp_api_project.
deidentifyTemplateName Modelo de desidentificação do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por exemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de dados do BigQuery para enviar resultados tokenizados.
batchSize Tamanho do lote/divisão para enviar dados para inspecionar e/ou retirar a tokenização. Se for um arquivo csv, batchSize é o número de linhas em um lote. Os usuários precisam determinar o tamanho do lote com base no tamanho dos registros e no tamanho do arquivo. Observe que a API Cloud DLP tem um limite de tamanho de payload de 524 KB por chamada de API.
inspectTemplateName [Opcional] Modelo de inspeção do Cloud DLP a ser usado para solicitações de API, especificado com o padrão projects/{template_project_id}/identifyTemplates/{idTemplateId}. Por exemplo, projects/my_project/identifyTemplates/100.

Como executar o modelo de mascaramento de dados/tokenização do Cloud Storage para o BigQuery (usando o Cloud DLP)

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Substitua:

  • TEMPLATE_PROJECT_ID: o ID do projeto de modelo
  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua:

  • TEMPLATE_PROJECT_ID: o ID do projeto de modelo
  • DLP_API_PROJECT_ID: o ID do projeto da API Cloud DLP
  • JOB_NAME: um nome de job de sua escolha O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • REGION: o endpoint regional (por exemplo, us-west1)
  • TEMP_LOCATION: o local para gravar arquivos temporários (por exemplo, gs://your-bucket/temp)
  • INPUT_DATA: o caminho do arquivo de entrada
  • DEIDENTIFY_TEMPLATE: o número do modelo do Cloud DLPDeidentify
  • DATASET_NAME: o nome do conjunto de dados do BigQuery
  • INSPECT_TEMPLATE_NUMBER: o número do modelo do Cloud DLPInspect
  • BATCH_SIZE_VALUE: o tamanho do lote (número de linhas por API para csv).
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Alterar a captura de dados do MySQL para o BigQuery usando o Debezium e o Pub/Sub (Stream)

O modelo da captura de dados de alteração do MySQL para o BigQuery usando o Debezium e o Pub/Sub é um pipeline de streaming que lê mensagens Pub/Sub com dados de alteração de um banco de dados MySQL e grava os registros no BigQuery. Um conector do Debezium captura alterações no banco de dados MySQL e publica os dados alterados no Pub/Sub. O modelo lê as mensagens do Pub/Sub e as grava no BigQuery.

É possível usar esse modelo para sincronizar bancos de dados MySQL e tabelas do BigQuery. O pipeline grava os dados alterados em uma tabela de preparo do BigQuery e atualiza intermitentemente uma tabela do BigQuery que replica o banco de dados MySQL.

Requisitos para este pipeline:

  • O conector do Debezium precisa ser implantado.
  • As mensagens do Pub/Sub precisam ser serializadas em uma Linha enviada.

Parâmetros do modelo

Parâmetro Descrição
inputSubscriptions A lista separada por vírgulas de assinaturas de entrada do Pub/Sub para leitura, no formato de <subscription>,<subscription>, ...
changeLogDataset O conjunto de dados do BigQuery para armazenar as tabelas de preparo, no formato de <my-dataset>
replicaDataset O local do conjunto de dados do BigQuery para armazenar as tabelas de réplica, no formato de <my-dataset>
Opcional: updateFrequencySecs O intervalo em que o pipeline atualiza a tabela do BigQuery replicando o banco de dados MySQL.

Como executar a captura de dados de alteração usando o Debezium e o MySQL do modelo do Pub/Sub para o BigQuery

Para executar esse modelo, siga estas etapas:

  1. Na máquina local, clone o repositório DataflowTemplates.
  2. Altere para o diretório v2/cdc-parent.
  3. Certifique-se de que o conector do Debezium esteja implantado.
  4. Usando o Maven, execute o modelo do Dataflow.

    Substitua os valores a seguir neste exemplo:

    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua YOUR_SUBSCRIPTIONS pela lista separada por vírgulas de nomes de assinaturas do Pub/Sub.
    • Substitua YOUR_CHANGELOG_DATASET pelo conjunto de dados do BigQuery para dados do registro de alterações e substitua YOUR_REPLICA_DATASET pelo conjunto de dados do BigQuery para tabelas de réplica.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka para BigQuery

O modelo Apache Kafka para BigQuery é um pipeline de streaming que ingere dados de texto do Apache Kafka, executa uma função definida pelo usuário (UDF) e gera os registros resultantes no BigQuery. Qualquer erro que ocorrer na transformação dos dados, na execução da UDF ou na inserção na tabela de respostas será inserido em uma tabela de erros separada no BigQuery. Se a tabela de erros não existir antes da execução, ela será criada.

Requisitos para esse pipeline

  • A tabela de respostas do BigQuery precisa existir.
  • O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
  • Os tópicos do Apache Kafka precisam existir, e as mensagens precisam estar codificadas em um formato JSON válido.

Parâmetros do modelo

Parâmetro Descrição
outputTableSpec O local da tabela de respostas do BigQuery para gravar as mensagens do Apache Kafka, no formato de my-project:dataset.table
inputTopics Os tópicos de entrada do Apache Kafka para leitura em uma lista separada por vírgulas. Exemplo: messages
bootstrapServers O endereço do host dos servidores do agente do Apache Kafka em execução em uma lista separada por vírgulas, cada endereço de host no formato 35.70.252.199:9092
javascriptTextTransformGcsPath Opcional: caminho do local do Cloud Storage para a UDF do JavaScript. Exemplo: gs://my_bucket/my_function.js
javascriptTextTransformFunctionName Opcional: o nome do JavaScript a ser chamado como UDF. Exemplo: transform
outputDeadletterTable Opcional: a tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de my-project:dataset.my-deadletter-table. Se não existir, a tabela será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Como executar o modelo do Apache Kafka para BigQuery

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Apache Kafka to BigQuery template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Execute a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos Flex usando a ferramenta de linha de comando gcloud, é necessário ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • Substitua YOUR_JAVASCRIPT_FUNCTION pelo nome da UDF.
  • Substitua REGION_NAME pelo nome da região do Dataflow. Exemplo: us-central1.
  • Substitua BIGQUERY_TABLE pelo nome da tabela do BigQuery.
  • Substitua KAFKA_TOPICS pela lista de tópicos do Apache Kafka. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • Substitua PATH_TO_JAVASCRIPT_UDF_FILE pelo caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript.
  • Substitua YOUR_JAVASCRIPT_FUNCTION pelo nome da UDF.
  • Substitua KAFKA_SERVER_ADDRESSES pela lista de endereços IP do servidor do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido
  • Substitua YOUR_JAVASCRIPT_FUNCTION pelo nome da UDF.
  • Substitua LOCATION pelo nome da região do Dataflow. Exemplo: us-central1.
  • Substitua BIGQUERY_TABLE pelo nome da tabela do BigQuery.
  • Substitua KAFKA_TOPICS pela lista de tópicos do Apache Kafka. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • Substitua PATH_TO_JAVASCRIPT_UDF_FILE pelo caminho do Cloud Storage para o arquivo .js que contém o código do JavaScript.
  • Substitua YOUR_JAVASCRIPT_FUNCTION pelo nome da UDF.
  • Substitua KAFKA_SERVER_ADDRESSES pela lista de endereços IP do servidor do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}