Modelo Gerador de dados de streaming para Pub/Sub, BigQuery e Cloud Storage

O modelo Gerador de dados de streaming é usado para gerar um número ilimitado ou fixo de registros sintéticos ou mensagens com base no esquema fornecido pelo usuário na taxa especificada. Os destinos compatíveis incluem tópicos do Pub/Sub, tabelas do BigQuery e buckets do Cloud Storage.

Veja a seguir um conjunto de alguns casos de uso possíveis:

  • Simule a publicação de eventos em tempo real em grande escala em um tópico do Pub/Sub para medir e determinar o número e o tamanho dos consumidores necessários para processar eventos publicados.
  • Gere dados sintéticos para uma tabela do BigQuery ou um bucket do Cloud Storage para avaliar a performance dos comparativos de mercado ou servir como prova de conceito.

Coletores e formatos de codificação compatíveis

A tabela a seguir descreve quais coletores e formatos de codificação são compatíveis com este modelo:
JSON Avro Parquet
Pub/Sub Sim Sim Não
BigQuery Sim Não Não
Cloud Storage Sim Sim Sim

Requisitos de pipeline

  • A conta de serviço do worker precisa do papel atribuído do Dataflow Worker (roles/dataflow.worker). Para mais informações, consulte Introdução ao IAM.
  • Crie um arquivo de esquema que contenha um modelo JSON para os dados gerados. Esse modelo usa a biblioteca Gerador de dados JSON para que você possa fornecer várias funções fictícias para cada campo no esquema. Para mais informações, consulte a documentação do json-data-generator.

    Exemplo:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Faça upload do arquivo de esquema para um bucket do Cloud Storage.
  • O destino da saída precisa existir antes da execução. O destino precisa ser um tópico do Pub/Sub, uma tabela do BigQuery ou um bucket do Cloud Storage, dependendo do tipo de coletor.
  • Se a codificação de saída for Avro ou Parquet, crie um arquivo de esquema Avro e armazene-o em um local do Cloud Storage.
  • Atribua à conta de serviço do worker um papel adicional do IAM, dependendo do destino desejado.
    Destino Além disso, papel do IAM necessário Aplicar a qual recurso
    Pub/Sub Publicador do Pub/Sub (roles/pubsub.publisher)
    (para mais informações, consulte Controle de acesso do Pub/Sub com IAM)
    Tópico do Pub/Sub
    BigQuery Editor de dados do BigQuery (roles/bigquery.dataEditor)
    (para mais informações, consulte Controle de acesso do BigQuery com o IAM)
    Conjunto de dados do BigQuery
    Cloud Storage Administrador de objetos do Cloud Storage (roles/storage.objectAdmin)
    (para mais informações, consulte Controle de acesso do Cloud Storage com IAM)
    Bucket do Cloud Storage

Parâmetros do modelo

Parâmetro Descrição
schemaLocation Local do arquivo de esquema. Por exemplo, gs://mybucket/filename.json.
qps Número de mensagens a serem publicadas por segundo. Por exemplo, 100.
sinkType [Opcional] Tipo de coletor de saída. Os valores possíveis são PUBSUB, BIGQUERY, GCS. O padrão é PUBSUB.
outputType [Opcional] Tipo de codificação de saída. Os valores possíveis são JSON, AVRO, PARQUET. O padrão é JSON.
avroSchemaLocation [Opcional] Local do arquivo de esquema AVRO. Obrigatório quando outputType for AVRO ou PARQUET. Por exemplo, gs://mybucket/filename.avsc.
topic [Opcional] Nome do tópico do Pub/Sub em que o pipeline precisa publicar dados. Obrigatório quando sinkType for Pub/Sub. Por exemplo: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opcional) Nome da tabela de saída do BigQuery. Obrigatório quando sinkType for o BigQuery. Por exemplo, my-project-ID:my_dataset_name.my-table-name.
writeDisposition [Opcional] Disposição de gravação do BigQuery. Os valores possíveis são WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. O padrão é WRITE_APPEND.
outputDeadletterTable [Opcional] Nome da tabela de saída do BigQuery para guardar os registros com falha. Se não for fornecido, o pipeline criará a tabela durante a execução com o nome {nome_da_tabela_de_saida}_registros_de_erros. Por exemplo, my-project-ID:my_dataset_name.my-table-name.
outputDirectory [Opcional] Caminho do local de saída do Cloud Storage. Obrigatório quando sinkType for Cloud Storage. Por exemplo, gs://mybucket/pathprefix/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída gravado no Cloud Storage. O padrão é output-.
windowDuration [Opcional] Intervalo de janela em que a saída é gravada no Cloud Storage. O padrão é 1 min (ou seja, 1 minuto).
numShards [Opcional] Número máximo de fragmentos de saída. Obrigatório quando sinkType é Cloud Storage e precisa ser definido como um número maior ou igual a 1.
messagesLimit [Opcional] Número máximo de mensagens de saída. O padrão é 0, que indica ilimitado.
autoscalingAlgorithm [Opcional] Algoritmo usado para escalonamento automático dos workers. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo.
maxNumWorkers [Opcional] Número máximo de máquinas de worker. Por exemplo, 10.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Streaming Data Generator template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SCHEMA_LOCATION: o caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • QPS: o número de mensagens a serem publicadas por segundo
  • PUBSUB_TOPIC: o tópico de saída do Pub/Sub. Por exemplo, projects/my-project-id/topics/my-topic-id.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SCHEMA_LOCATION: o caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • QPS: o número de mensagens a serem publicadas por segundo
  • PUBSUB_TOPIC: o tópico de saída do Pub/Sub. Por exemplo, projects/my-project-id/topics/my-topic-id.

A seguir