Modelo de gerador de dados de streaming para Pub/Sub, BigQuery e Cloud Storage

O modelo do gerador de dados de streaming é usado para gerar um número ilimitado ou fixo de registos ou mensagens sintéticos com base no esquema fornecido pelo utilizador à taxa especificada. Os destinos compatíveis incluem tópicos do Pub/Sub, tabelas do BigQuery e contentores do Cloud Storage.

Seguem-se alguns exemplos de utilização possíveis:

  • Simule a publicação de eventos em tempo real em grande escala num tópico do Pub/Sub para medir e determinar o número e o tamanho dos consumidores necessários para processar eventos publicados.
  • Gere dados sintéticos para uma tabela do BigQuery ou um contentor do Cloud Storage para avaliar referências de desempenho ou servir como validação de conceito.

Formatos de codificação e destinos suportados

A tabela seguinte descreve os destinos e os formatos de codificação suportados por este modelo:
JSON Avro Parquet
Pub/Sub Sim Sim Não
BigQuery Sim Não Não
Cloud Storage Sim Sim Sim

Requisitos do pipeline

  • A conta de serviço do trabalhador precisa da função atribuída Trabalhador do Dataflow (roles/dataflow.worker). Para mais informações, consulte o artigo Introdução à IAM.
  • Crie um ficheiro de esquema que contenha um modelo JSON para os dados gerados. Este modelo usa a biblioteca JSON Data Generator, para que possa fornecer várias funções de dados fictícios para cada campo no esquema. Para mais informações, consulte a documentação do gerador de dados JSON.

    Por exemplo:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Carregue o ficheiro de esquema para um contentor do Cloud Storage.
  • O alvo de saída tem de existir antes da execução. O destino tem de ser um tópico do Pub/Sub, uma tabela do BigQuery ou um contentor do Cloud Storage, consoante o tipo de destino.
  • Se a codificação de saída for Avro ou Parquet, crie um ficheiro de esquema Avro e armazene-o numa localização do Cloud Storage.
  • Atribua à conta de serviço do trabalhador uma função do IAM adicional, consoante o destino pretendido.
    Destino Função de IAM adicional necessária A que recurso se aplica
    Pub/Sub Publicador do Pub/Sub (roles/pubsub.publisher)
    (para mais informações, consulte o artigo Controlo de acesso do Pub/Sub com o IAM)
    Tópico do Pub/Sub
    BigQuery Editor de dados do BigQuery (roles/bigquery.dataEditor)
    (para mais informações, consulte o artigo Controlo de acesso do BigQuery com o IAM)
    Conjunto de dados do BigQuery
    Cloud Storage Administrador de objetos do Cloud Storage (roles/storage.objectAdmin)
    (para mais informações, consulte o artigo Controlo de acesso do Cloud Storage com o IAM)
    Contentor do Cloud Storage

Parâmetros de modelos

Parâmetro Descrição
schemaLocation Localização do ficheiro de esquema. Por exemplo: gs://mybucket/filename.json.
qps Número de mensagens a publicar por segundo. Por exemplo: 100.
sinkType (Opcional) Tipo de destino de saída. Os valores possíveis são PUBSUB, BIGQUERY e GCS. A predefinição é PUBSUB.
outputType (Opcional) Tipo de codificação de saída. Os valores possíveis são JSON, AVRO e PARQUET. A predefinição é JSON.
avroSchemaLocation (Opcional) Localização do ficheiro de esquema AVRO. Obrigatório quando outputType é AVRO ou PARQUET. Por exemplo: gs://mybucket/filename.avsc.
topic (Opcional) Nome do tópico do Pub/Sub para o qual o pipeline deve publicar dados. Obrigatório quando sinkType é Pub/Sub. Por exemplo: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opcional) Nome da tabela de resultados do BigQuery. Obrigatório quando sinkType é o BigQuery. Por exemplo: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Opcional) BigQuery Write Disposition. Os valores possíveis são WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. A predefinição é WRITE_APPEND.
outputDeadletterTable (Opcional) Nome da tabela de resultados do BigQuery para guardar os registos com falhas. Se não for indicado, o pipeline cria uma tabela durante a execução com o nome {output_table_name}_error_records. Por exemplo: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Opcional) Caminho da localização do Cloud Storage de saída. Obrigatório quando sinkType é o Cloud Storage. Por exemplo: gs://mybucket/pathprefix/.
outputFilenamePrefix (Opcional) O prefixo do nome dos ficheiros de saída escritos no Cloud Storage. A predefinição é output-.
windowDuration (Opcional) Intervalo da janela no qual a saída é escrita no Cloud Storage. A predefinição é 1 m (ou seja, 1 minuto).
numShards (Opcional) Número máximo de fragmentos de saída. Obrigatório quando sinkType é o armazenamento na nuvem e deve ser definido como 1 ou um número superior.
messagesLimit (Opcional) Número máximo de mensagens de saída. A predefinição é 0, o que indica que não existe limite.
autoscalingAlgorithm (Opcional) Algoritmo usado para a escala automática dos trabalhadores. Os valores possíveis são THROUGHPUT_BASED para ativar o ajuste de escala automático ou NONE para desativar.
maxNumWorkers (Opcional) Número máximo de máquinas de trabalho. Por exemplo: 10.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Streaming Data Generator template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • SCHEMA_LOCATION: o caminho para o ficheiro de esquema no Cloud Storage. Por exemplo: gs://mybucket/filename.json.
  • QPS: o número de mensagens a publicar por segundo
  • PUBSUB_TOPIC: o tópico Pub/Sub de saída. Por exemplo: projects/my-project-id/topics/my-topic-id.

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • SCHEMA_LOCATION: o caminho para o ficheiro de esquema no Cloud Storage. Por exemplo: gs://mybucket/filename.json.
  • QPS: o número de mensagens a publicar por segundo
  • PUBSUB_TOPIC: o tópico Pub/Sub de saída. Por exemplo: projects/my-project-id/topics/my-topic-id.

O que se segue?