Streaming Data Generator to Pub/Sub, BigQuery, and Cloud Storage template

The Streaming Data Generator template is used to generate either an unlimited or fixed number of synthetic records or messages based on user provided schema at the specified rate. Compatible destinations include Pub/Sub topics, BigQuery tables, and Cloud Storage buckets.

Following are a set of few possible use cases:

  • Simulate large-scale real-time event publishing to a Pub/Sub topic to measure and determine the number and size of consumers required to process published events.
  • Generate synthetic data to a BigQuery table or a Cloud Storage bucket to evaluate performance benchmarks or serve as a proof of concept.

Supported sinks and encoding formats

The following table describes which sinks and encoding formats are supported by this template:
JSON Avro Parquet
Pub/Sub Yes Yes No
BigQuery Yes No No
Cloud Storage Yes Yes Yes

Pipeline requirements

  • The worker service account needs the Dataflow Worker (roles/dataflow.worker) assigned role. For more information, see Introduction to IAM.
  • Create a schema file that contains a JSON template for the generated data. This template uses the JSON Data Generator library, so you can provide various faker functions for each field in the schema. For more information, see the json-data-generator documentation.

    For example:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Upload the schema file to a Cloud Storage bucket.
  • The output target must exist prior to execution. The target must be a Pub/Sub topic, a BigQuery table, or a Cloud Storage bucket depending on sink type.
  • If the output encoding is Avro or Parquet, then create an Avro schema file and store it in a Cloud Storage location.
  • Assign the worker service account an additional IAM role depending on the desired destination.
    Destination Additionally needed IAM role Apply to which resource
    Pub/Sub Pub/Sub Publisher (roles/pubsub.publisher)
    (For more information, see Pub/Sub access control with IAM)
    Pub/Sub topic
    BigQuery BigQuery Data Editor (roles/bigquery.dataEditor)
    (For more information, see BigQuery access control with IAM)
    BigQuery dataset
    Cloud Storage Cloud Storage Object Admin (roles/storage.objectAdmin)
    (For more information, see Cloud Storage access control with IAM)
    Cloud Storage bucket

Template parameters

Parameter Description
schemaLocation Location of the schema file. For example: gs://mybucket/filename.json.
qps Number of messages to be published per second. For example: 100.
sinkType (Optional) Output sink Type. Possible values are PUBSUB, BIGQUERY, GCS. Default is PUBSUB.
outputType (Optional) Output encoding Type. Possible values are JSON, AVRO, PARQUET. Default is JSON.
avroSchemaLocation (Optional) Location of AVRO Schema file. Mandatory when outputType is AVRO or PARQUET. For example: gs://mybucket/filename.avsc.
topic (Optional) Name of the Pub/Sub topic to which the pipeline should publish data. Mandatory when sinkType is Pub/Sub. For example: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Optional) Name of the output BigQuery table. Mandatory when sinkType is BigQuery. For example: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Optional) BigQuery Write Disposition. Possible values are WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default is WRITE_APPEND.
outputDeadletterTable (Optional) Name of the output BigQuery table to hold failed records. If not provided, pipeline creates table during execution with name {output_table_name}_error_records. For example: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Optional) Path of the output Cloud Storage location. Mandatory when sinkType is Cloud Storage. For example: gs://mybucket/pathprefix/.
outputFilenamePrefix (Optional) The filename prefix of the output files written to Cloud Storage. Default is output-.
windowDuration (Optional) Window interval at which output is written to Cloud Storage. Default is 1m (in other words, 1 minute).
numShards (Optional) Maximum number of output shards. Mandatory when sinkType is Cloud Storage and should be set to 1 or higher number.
messagesLimit (Optional) Maximum number of output messages. Default is 0 indicating unlimited.
autoscalingAlgorithm (Optional) Algorithm used for autoscaling the workers. Possible values are THROUGHPUT_BASED to enable autoscaling or NONE to disable.
maxNumWorkers (Optional) Maximum number of worker machines. For example: 10.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Streaming Data Generator template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SCHEMA_LOCATION: the path to schema file in Cloud Storage. For example: gs://mybucket/filename.json.
  • QPS: the number of messages to be published per second
  • PUBSUB_TOPIC: the output Pub/Sub topic. For example: projects/my-project-id/topics/my-topic-id.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SCHEMA_LOCATION: the path to schema file in Cloud Storage. For example: gs://mybucket/filename.json.
  • QPS: the number of messages to be published per second
  • PUBSUB_TOPIC: the output Pub/Sub topic. For example: projects/my-project-id/topics/my-topic-id.

What's next