Plantilla de Streaming Data Generator a Pub/Sub, BigQuery y Cloud Storage

La plantilla Generador de datos de streaming se usa para generar un número ilimitado o fijo de registros o mensajes sintéticos según el esquema proporcionado por el usuario y a la velocidad especificada. Entre los destinos compatibles se incluyen temas de Pub/Sub, tablas de BigQuery y cubos de Cloud Storage.

A continuación se indican algunos casos prácticos posibles:

  • Simula la publicación de eventos en tiempo real a gran escala en un tema de Pub/Sub para medir y determinar el número y el tamaño de los consumidores necesarios para procesar los eventos publicados.
  • Genera datos sintéticos en una tabla de BigQuery o en un segmento de Cloud Storage para evaluar las métricas de rendimiento o como prueba de concepto.

Sinks y formatos de codificación admitidos

En la siguiente tabla se describen los receptores y los formatos de codificación compatibles con esta plantilla:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage

Requisitos del flujo de procesamiento

  • La cuenta de servicio de trabajador necesita el rol de trabajador de Dataflow (roles/dataflow.worker). Para obtener más información, consulta Introducción a la gestión de identidades y accesos.
  • Crea un archivo de esquema que contenga una plantilla JSON para los datos generados. Esta plantilla usa la biblioteca Generador de datos JSON, por lo que puedes proporcionar varias funciones falsas para cada campo del esquema. Para obtener más información, consulta la documentación de json-data-generator.

    Por ejemplo:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Sube el archivo de esquema a un segmento de Cloud Storage.
  • El destino de salida debe existir antes de la ejecución. El destino debe ser un tema de Pub/Sub, una tabla de BigQuery o un segmento de Cloud Storage, según el tipo de receptor.
  • Si la codificación de salida es Avro o Parquet, cree un archivo de esquema Avro y almacénelo en una ubicación de Cloud Storage.
  • Asigna un rol de IAM adicional a la cuenta de servicio de trabajador en función del destino que quieras.
    Destino Rol de gestión de identidades y accesos adicional necesario Aplicar a qué recurso
    Pub/Sub Editor de Pub/Sub (roles/pubsub.publisher)
    (para obtener más información, consulta Control de acceso de Pub/Sub con IAM).
    Tema Pub/Sub
    BigQuery Editor de datos de BigQuery (roles/bigquery.dataEditor)
    (para obtener más información, consulta Control de acceso de BigQuery con IAM).
    Conjunto de datos de BigQuery
    Cloud Storage Administrador de objetos de Cloud Storage (roles/storage.objectAdmin)
    (para obtener más información, consulta Control de acceso de Cloud Storage con IAM).
    Segmento de Cloud Storage

Parámetros de plantilla

Parámetro Descripción
schemaLocation Ubicación del archivo de esquema. Por ejemplo: gs://mybucket/filename.json.
qps Número de mensajes que se publicarán por segundo. Por ejemplo: 100.
sinkType (Opcional) Tipo de receptor de salida. Los valores posibles son PUBSUB, BIGQUERY y GCS. El valor predeterminado es PUBSUB.
outputType (Opcional) Tipo de codificación de salida. Los valores posibles son JSON, AVRO y PARQUET. El valor predeterminado es JSON.
avroSchemaLocation (Opcional) Ubicación del archivo de esquema AVRO. Obligatorio cuando outputType es AVRO o PARQUET. Por ejemplo: gs://mybucket/filename.avsc.
topic (Opcional) Nombre del tema de Pub/Sub en el que debe publicar datos la canalización. Obligatorio cuando sinkType es Pub/Sub. Por ejemplo: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opcional) Nombre de la tabla de salida de BigQuery. Obligatorio cuando sinkType es BigQuery. Por ejemplo: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Opcional) Write Disposition de BigQuery. Los valores posibles son WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. El valor predeterminado es WRITE_APPEND.
outputDeadletterTable (Opcional) Nombre de la tabla de BigQuery de salida que contendrá los registros fallidos. Si no se proporciona, la canalización crea una tabla durante la ejecución con el nombre {output_table_name}_error_records. Por ejemplo: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Opcional) Ruta de la ubicación de Cloud Storage de salida. Obligatorio cuando sinkType es Cloud Storage. Por ejemplo: gs://mybucket/pathprefix/.
outputFilenamePrefix (Opcional) Prefijo de nombre de archivo de los archivos de salida escritos en Cloud Storage. El valor predeterminado es output-.
windowDuration (Opcional) Intervalo de la ventana en el que se escribe la salida en Cloud Storage. El valor predeterminado es 1m (es decir, 1 minuto).
numShards (Opcional) Número máximo de fragmentos de salida. Obligatorio cuando sinkType es Cloud Storage y debe tener un valor igual o superior a 1.
messagesLimit (Opcional) Número máximo de mensajes de salida. El valor predeterminado es 0, lo que indica que no hay límite.
autoscalingAlgorithm (Opcional) Algoritmo usado para autoescalar los trabajadores. Los valores posibles son THROUGHPUT_BASED para habilitar el escalado automático o NONE para inhabilitarlo.
maxNumWorkers (Opcional) Número máximo de máquinas de trabajador. Por ejemplo: 10.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Streaming Data Generator template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: la ruta al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
  • QPS: número de mensajes que se publicarán por segundo
  • PUBSUB_TOPIC: el tema de Pub/Sub de salida. Por ejemplo: projects/my-project-id/topics/my-topic-id.

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: la ruta al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
  • QPS: número de mensajes que se publicarán por segundo
  • PUBSUB_TOPIC: el tema de Pub/Sub de salida. Por ejemplo: projects/my-project-id/topics/my-topic-id.

Siguientes pasos