Plantilla del generador de datos de transmisión a Pub/Sub, BigQuery y Cloud Storage

La plantilla de Generador de datos de transmisión se usa para generar una cantidad ilimitada o fija de registros sintéticos o mensajes basados en el esquema proporcionado por el usuario a la velocidad especificada. Los destinos compatibles incluyen temas de Pub/Sub, tablas de BigQuery y buckets de Cloud Storage.

A continuación, se incluyen algunos casos de uso posibles:

  • Simula la publicación de eventos en tiempo real a gran escala en un tema de Pub/Sub para medir y determinar la cantidad y el tamaño de los consumidores necesarios para procesar los eventos publicados.
  • Genera datos sintéticos en una tabla de BigQuery o un bucket de Cloud Storage para evaluar comparativas de rendimiento o servir como prueba de concepto.

Receptores y formatos de codificación admitidos

En la siguiente tabla, se describe qué receptores y formatos de codificación son compatibles con esta plantilla:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage

Requisitos de la canalización

  • La cuenta de servicio de trabajador necesita la función asignada de trabajador de Dataflow (roles/dataflow.worker). Para obtener más información, consulta Introducción a IAM.
  • Crea un archivo de esquema que contenga una plantilla JSON para los datos generados. Esta plantilla usa la biblioteca del Generador de datos JSON, de modo que puedes proporcionar varias funciones de este tipo para cada campo del esquema. Para obtener más información, consulta la documentación de json-data-generator.

    Por ejemplo:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Sube el archivo de esquema a un bucket de Cloud Storage.
  • El objetivo de salida debe existir antes de la ejecución. El destino debe ser un tema de Pub/Sub, una tabla de BigQuery o un bucket de Cloud Storage según el tipo de receptor.
  • Si la codificación de salida es Avro o Parquet, crea un archivo de esquema Avro y almacénalo en una ubicación de Cloud Storage.
  • Asigna a la cuenta de servicio de trabajador un rol de IAM adicional según el destino deseado.
    Destino Además, se necesita un rol de IAM Aplicar a qué recurso
    Pub/Sub Publicador de Pub/Sub (roles/pubsub.publisher)
    (Para obtener más información, consulta Control de acceso de Pub/Sub con IAM)
    Tema de Pub/Sub
    BigQuery Editor de datos de BigQuery (roles/bigquery.dataEditor)
    (Para obtener más información, consulta Control de acceso a BigQuery con IAM).
    Conjunto de datos de BigQuery
    Cloud Storage Administrador de objetos de Cloud Storage (roles/storage.objectAdmin)
    (Para obtener más información, consulta Control de acceso de Cloud Storage con IAM).
    Bucket de Cloud Storage

Parámetros de la plantilla

Parámetro Descripción
schemaLocation Ubicación del archivo de esquema. Por ejemplo: gs://mybucket/filename.json.
qps Cantidad de mensajes que se publicarán por segundo. Por ejemplo: 100.
sinkType Tipo de receptor de salida (opcional). Los valores posibles son PUBSUB, BIGQUERY, GCS. El valor predeterminado es PUBSUB.
outputType Tipo de codificación de salida (opcional). Los valores posibles son JSON, AVRO, PARQUET. El valor predeterminado es JSON.
avroSchemaLocation Ubicación del archivo de esquema de AVRO (opcional). Es obligatorio cuando outputType es AVRO o PARQUET. Por ejemplo: gs://mybucket/filename.avsc.
topic Nombre del tema de Pub/Sub al que la canalización debe publicar datos. Es obligatorio cuando sinkType es Pub/Sub. Por ejemplo: projects/my-project-id/topics/my-topic-id (opcional).
outputTableSpec Nombre de la tabla de BigQuery de salida (opcional). Obligatorio cuando sinkType es BigQuery. Por ejemplo: my-project-ID:my_dataset_name.my-table-name.
writeDisposition Disposición de escritura de BigQuery (opcional). Los valores posibles son WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. El valor predeterminado es WRITE_APPEND.
outputDeadletterTable Nombre de la tabla de BigQuery de salida para contener los registros con errores (opcional). Si no se proporciona, la canalización crea una tabla durante la ejecución con el nombre {output_table_name}_error_records. Por ejemplo: my-project-ID:my_dataset_name.my-table-name.
outputDirectory Ruta de la ubicación de salida de Cloud Storage (opcional). Obligatorio cuando sinkType es Cloud Storage. Por ejemplo: gs://mybucket/pathprefix/.
outputFilenamePrefix El prefijo del nombre de archivo de los archivos de salida escritos en Cloud Storage (opcional). La configuración predeterminada es output-.
windowDuration El intervalo de ventana en el que se escribe el resultado en Cloud Storage (opcional). El valor predeterminado es 1 m (en otras palabras, 1 minuto).
numShards La cantidad máxima de fragmentos de salida (opcional). Obligatorio cuando sinkType es Cloud Storage y debe establecerse en 1 o en un número mayor.
messagesLimit Cantidad máxima de mensajes de salida (opcional). El valor predeterminado es 0, lo que indica que es ilimitado.
autoscalingAlgorithm Algoritmo que se usa para el ajuste de escala automático de los trabajadores (opcional). Los valores posibles son THROUGHPUT_BASED, para habilitar el ajuste de escala automático, o NONE, si deseas inhabilitarlo.
maxNumWorkers Cantidad máxima de máquinas de trabajador (opcional). Por ejemplo: 10.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Streaming Data Generator template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: Es la ruta de acceso al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
  • QPS: Es la cantidad de mensajes que se publicarán por segundo.
  • PUBSUB_TOPIC: Es el tema de salida de Pub/Sub. Por ejemplo: projects/my-project-id/topics/my-topic-id.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: Es la ruta de acceso al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
  • QPS: Es la cantidad de mensajes que se publicarán por segundo.
  • PUBSUB_TOPIC: Es el tema de salida de Pub/Sub. Por ejemplo: projects/my-project-id/topics/my-topic-id.

¿Qué sigue?