Google-provided utility templates

Google provides a set of open-source Dataflow templates. For general information about templates, see the Overview page. For a list of all Google-provided templates, see the Get started with Google-provided templates page.

This page documents utility templates:

Bulk Compress Cloud Storage Files

The Bulk Compress Cloud Storage Files template is a batch pipeline that compresses files on Cloud Storage to a specified location. This template can be useful when you need to compress large batches of files as part of a periodic archival process. The supported compression modes are: BZIP2, DEFLATE, GZIP. Files output to the destination location will follow a naming schema of original filename appended with the compression mode extension. The extensions appended will be one of: .bzip2, .deflate, .gz.

Any errors which occur during the compression process will be output to the failure file in CSV format of filename, error message. If no failures occur wile running the pipeline, the error file will still be created but will contain no error records.

Requirements for this pipeline:

  • The compression must be in one of the following formats: BZIP2, DEFLATE, GZIP.
  • The output directory must exist prior to running the pipeline.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/uncompressed/*.txt.
outputDirectory The output location to write to. For example, gs://bucket-name/compressed/.
outputFailureFile The error log output file to use for write failures that occur during the compression process. For example, gs://bucket-name/compressed/failed.csv. If there are no failures, the file is still created but will be empty. The file contents are in CSV format (Filename, Error) and consist of one line for each file that fails compression.
compression The compression algorithm used to compress the matched files. Must be one of: BZIP2, DEFLATE, GZIP

Executing the Bulk Compress Cloud Storage Files template

Console

Execute from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Bulk Compress Cloud Storage Files template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

gcloud

Execute from the gcloud command-line tool

Note: To use the gcloud command-line tool to execute templates, you must have Cloud SDK version 138.0.0 or higher.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Replace the following:

  • JOB_NAME: a job name of your choice
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • COMPRESSION: your choice of compression algorithm

API

Execute from the REST API

When executing this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

To execute this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • COMPRESSION: your choice of compression algorithm

Bulk Decompress Cloud Storage Files

The Bulk Decompress Cloud Storage Files template is a batch pipeline that decompresses files on Cloud Storage to a specified location. This functionality is useful when you want to use compressed data to minimize network bandwidth costs during a migration, but would like to maximize analytical processing speed by operating on uncompressed data after migration. The pipeline automatically handles multiple compression modes during a single run and determines the decompression mode to use based on the file extension (.bzip2, .deflate, .gz, .zip).

Requirements for this pipeline:

  • The files to decompress must be in one of the following formats: Bzip2, Deflate, Gzip, Zip.
  • The output directory must exist prior to running the pipeline.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/compressed/*.gz.
outputDirectory The output location to write to. For example, gs://bucket-name/decompressed.
outputFailureFile The error log output file to use for write failures that occur during the decompression process. For example, gs://bucket-name/decompressed/failed.csv. If there are no failures, the file is still created but will be empty. The file contents are in CSV format (Filename, Error) and consist of one line for each file that fails decompression.

Executing the Bulk Decompress Cloud Storage Files template

Console

Execute from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Bulk Decompress Cloud Storage Files template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

gcloud

Execute from the gcloud command-line tool

Note: To use the gcloud command-line tool to execute templates, you must have Cloud SDK version 138.0.0 or higher.

When executing this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Replace the following:

  • JOB_NAME: a job name of your choice
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • OUTPUT_FAILURE_FILE_PATH: your choice of path to the file containing failure information

API

Execute from the REST API

When executing this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

To execute this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • OUTPUT_FAILURE_FILE_PATH: your choice of path to the file containing failure information

Datastore Bulk Delete

The Datastore Bulk Delete template is a pipeline which reads in Entities from Datastore with a given GQL query and then deletes all matching Entities in the selected target project. The pipeline can optionally pass the JSON encoded Datastore Entities to your Javascript UDF, which you can use to filter out Entities by returning null values.

Requirements for this pipeline:

  • Datastore must be set up in the project prior to running the template.
  • If reading and deleting from separate Datastore instances, the Dataflow Controller Service Account must have permission to read from one instance and delete from the other.

Template parameters

Parameter Description
datastoreReadGqlQuery GQL Query which specifies which entities to match for deletion. e.g: "SELECT * FROM MyKind".
datastoreReadProjectId GCP Project ID of the Datastore instance from which you want to read entities (using your GQL Query) that are used for matching.
datastoreDeleteProjectId GCP Project ID of the Datastore instance from which to delete matching entities. This can be the same as datastoreReadProjectId if you want to read and delete within the same Datastore instance.
datastoreReadNamespace (Optional) Namespace of requested Entities. Set as "" for default namespace.
javascriptTextTransformGcsPath (Optional) A Cloud Storage path which contains all your JavaScript code. e.g: "gs://mybucket/mytransforms/*.js". If you don't want to use a UDF leave this field blank.
javascriptTextTransformFunctionName (Optional) Name of the Function to be called. If this function returns a value of undefined or null for a given Datastore Entity, then that Entity will not be deleted. If you have the javascript code of: "function myTransform(inJson) { ...dostuff...}" then your function name is "myTransform". If you don't want to use a UDF leave this field blank.

Executing the Datastore Bulk Delete template

Console

Execute from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Datastore Bulk Delete template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

gcloud

Execute from the gcloud command-line tool

Note: To use the gcloud command-line tool to execute templates, you must have Cloud SDK version 138.0.0 or higher.

When executing this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Replace the following:

  • JOB_NAME: a job name of your choice
  • GQL_QUERY: the query you'll use to match entities for deletion
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: your Datastore instance project ID. This example both reads and deletes from the same Datastore instance.

API

Execute from the REST API

When executing this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

To execute this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice
  • GQL_QUERY: the query you'll use to match entities for deletion
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: your Datastore instance project ID. This example both reads and deletes from the same Datastore instance.

Streaming Data Generator to Pub/Sub/BigQuery/Cloud Storage

The Streaming Data Generator template is used to generate either an unlimited or fixed number of synthetic records or messages based on user provided schema at the specified rate. Compatible destinations include Pub/Sub topics, BigQuery tables, and Cloud Storage buckets.

Following are a set of few possible use cases:

  • Simulate large-scale real-time event publishing to a Pub/Sub topic to measure and determine the number and size of consumers required to process published events.
  • Generate synthetic data to a BigQuery table or a Cloud Storage bucket to evaluate performance benchmarks or serve as a proof of concept.

Supported sinks and encoding formats

The following table describes which sinks and encoding formats are supported by this template:
JSON Avro Parquet
Pub/Sub Yes Yes No
BigQuery Yes No No
Cloud Storage Yes Yes Yes

The JSON Data Generator library used by the pipeline allows various faker functions to be used for each schema field. For more information on the faker functions and schema format, see the json-data-generator documentation.

Requirements for this pipeline:

  • Create a message schema file and store this file in a Cloud Storage location.
  • The output target must exist prior to execution. The target must be a Pub/Sub topic, a BigQuery table, or a Cloud Storage bucket depending on sink type.
  • If the output encoding is Avro or Parquet, then create an Avro schema file and store it in a Cloud Storage location.

Template parameters

Parameter Description
schemaLocation Location of the schema file. For example: gs://mybucket/filename.json.
qps Number of messages to be published per second. For example: 100.
sinkType (Optional) Output sink Type. Possible values are PUBSUB, BIGQUERY, GCS. Default is PUBSUB.
outputType (Optional) Output encoding Type. Possible values are JSON, AVRO, PARQUET. Default is JSON.
avroSchemaLocation (Optional) Location of AVRO Schema file. Mandatory when outputType is AVRO or PARQUET. For example: gs://mybucket/filename.avsc.
topic (Optional) Name of the Pub/Sub topic to which the pipeline should publish data.Mandatory when sinkType is Pub/Sub. For example: projects/<project-id>/topics/<topic-name>.
outputTableSpec (Optional) Name of the output BigQuery table.Mandatory when sinkType is BigQuery. For example: your-project:your-dataset.your-table-name.
writeDisposition (Optional) BigQuery Write Disposition. Possible values are WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default is WRITE_APPEND.
outputDeadletterTable (Optional) Name of the output BigQuery table to hold failed records. If not provided, pipeline creates table during execution with name {output_table_name}_error_records. For example: your-project:your-dataset.your-table-name.
outputDirectory (Optional) Path of the output Cloud Storage location. Mandatory when sinkType is Cloud Storage. For example: gs://mybucket/pathprefix/.
outputFilenamePrefix (Optional) The filename prefix of the output files written to Cloud Storage. Default is output-.
windowDuration (Optional) Window interval at which output is written to Cloud Storage. Default is 1m (in other words, 1 minute).
numShards [Optional] Maximum number of output shards. Mandatory when sinkType is Cloud Storage and should be set to 1 or higher number.
messagesLimit (Optional) Maximum number of output messages. Default is 0 indicating unlimited.
autoscalingAlgorithm (Optional) Algorithm used for autoscaling the workers. Possible values are THROUGHPUT_BASED to enable autoscaling or NONE to disable.
maxNumWorkers (Optional) Maximum number of worker machines. For example: 10.

Running the Streaming Data Generator template

Console

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Streaming Data Generator template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

gcloud

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 284.0.0 or higher.

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Replace the following:

  • JOB_NAME: a job name of your choice
  • PROJECT_ID: your project ID
  • REGION_NAME: the Dataflow region name. For example: us-central1.
  • SCHEMA_LOCATION: the path to schema file in Cloud Storage. For example: gs://mybucket/filename.json.
  • QPS: the number of messages to be published per second
  • PUBSUB_TOPIC: the output Pub/Sub topic. For example: projects/<project-id>/topics/<topic-name>.

API

Run from the REST API

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

POST  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}
  

Replace the following:

  • PROJECT_ID: your project ID
  • LOCATION: the Dataflow region name. For example: us-central1.
  • JOB_NAME: a job name of your choice
  • SCHEMA_LOCATION: the path to schema file in Cloud Storage. For example: gs://mybucket/filename.json.
  • QPS: the number of messages to be published per second
  • PUBSUB_TOPIC: the output Pub/Sub topic. For example: projects/<project-id>/topics/<topic-name>.