Google provides a set of open-source Dataflow templates. For general information about templates, see Dataflow templates. For a list of all Google-provided templates, see Get started with Google-provided templates.
This guide documents utility templates.
File Format Conversion (Avro, Parquet, CSV)
The File Format Conversion template is a batch pipeline that converts files stored on Cloud Storage from one supported format to another.
The following format conversions are supported:
- CSV to Avro
- CSV to Parquet
- Avro to Parquet
- Parquet to Avro
Requirements for this pipeline:
- The output Cloud Storage bucket must exist before running the pipeline.
Template parameters
Parameter | Description |
---|---|
inputFileFormat |
The input file format. Must be one of [csv, avro, parquet] . |
outputFileFormat |
The output file format. Must be one of [avro, parquet] . |
inputFileSpec |
The Cloud Storage path pattern for input files. For example, gs://bucket-name/path/*.csv |
outputBucket |
The Cloud Storage folder to write output files. This path must end with a slash.
For example, gs://bucket-name/output/ |
schema |
The Cloud Storage path to the Avro schema file. For example, gs://bucket-name/schema/my-schema.avsc |
containsHeaders |
(Optional) The input CSV files contain a header record (true/false). The default value is false . Only required when reading CSV files. |
csvFormat |
(Optional) The CSV format specification to use for parsing records. The default value is Default .
See Apache Commons CSV Format
for more details. |
delimiter |
(Optional) The field delimiter used by the input CSV files. |
outputFilePrefix |
(Optional) The output file prefix. The default value is output . |
numShards |
(Optional) The number of output file shards. |
Running the File Format Conversion template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Convert file formats template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \ --parameters \ inputFileFormat=INPUT_FORMAT,\ outputFileFormat=OUTPUT_FORMAT,\ inputFileSpec=INPUT_FILES,\ schema=SCHEMA,\ outputBucket=OUTPUT_FOLDER
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
INPUT_FORMAT
: the file format of the input file; must be one of[csv, avro, parquet]
OUTPUT_FORMAT
: the file format of the output files; must be one of[avro, parquet]
INPUT_FILES
: the path pattern for input filesOUTPUT_FOLDER
: your Cloud Storage folder for output filesSCHEMA
: the path to the Avro schema file
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileFormat": "INPUT_FORMAT", "outputFileFormat": "OUTPUT_FORMAT", "inputFileSpec": "INPUT_FILES", "schema": "SCHEMA", "outputBucket": "OUTPUT_FOLDER" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion", } }
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
INPUT_FORMAT
: the file format of the input file; must be one of[csv, avro, parquet]
OUTPUT_FORMAT
: the file format of the output files; must be one of[avro, parquet]
INPUT_FILES
: the path pattern for input filesOUTPUT_FOLDER
: your Cloud Storage folder for output filesSCHEMA
: the path to the Avro schema file
Bulk Compress Cloud Storage Files
The Bulk Compress Cloud Storage Files template is a batch pipeline that compresses files on
Cloud Storage to a specified location. This template can be useful when you need to compress
large batches of files as part of a periodic archival process. The supported compression modes are:
BZIP2
, DEFLATE
, GZIP
.
Files output to the destination location will follow a naming schema of original filename appended
with the compression mode extension. The extensions appended will be one of:
.bzip2
, .deflate
, .gz
.
Requirements for this pipeline:
- The compression must be in one of the following formats:
BZIP2
,DEFLATE
,GZIP
. - The output directory must exist prior to running the pipeline.
Template parameters
Parameter | Description |
---|---|
inputFilePattern |
The input file pattern to read from. For example, gs://bucket-name/uncompressed/*.txt . |
outputDirectory |
The output location to write to. For example, gs://bucket-name/compressed/ . |
outputFailureFile |
The error log output file to use for write failures that occur during the compression
process. For example, gs://bucket-name/compressed/failed.csv . If there are no
failures, the file is still created but will be empty. The file contents are in CSV format
(Filename, Error) and consist of one line for each file that fails compression. |
compression |
The compression algorithm used to compress the matched files. Must be one of:
BZIP2 , DEFLATE , GZIP
|
Running the Bulk Compress Cloud Storage Files template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bulk Compress Files on Cloud Storage template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\ outputDirectory=gs://BUCKET_NAME/compressed,\ outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\ compression=COMPRESSION
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
BUCKET_NAME
: the name of your Cloud Storage bucketCOMPRESSION
: your choice of compression algorithm
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt", "outputDirectory": "gs://BUCKET_NAME/compressed", "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv", "compression": "COMPRESSION" }, "environment": { "zone": "us-central1-f" } }
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
BUCKET_NAME
: the name of your Cloud Storage bucketCOMPRESSION
: your choice of compression algorithm
Bulk Decompress Cloud Storage Files
The Bulk Decompress Cloud Storage Files template is a batch pipeline that decompresses files on
Cloud Storage to a specified location. This functionality is useful when you want to use
compressed data to minimize network bandwidth costs during a migration, but would like to maximize
analytical processing speed by operating on uncompressed data after migration. The pipeline
automatically handles multiple compression modes during a single run and determines the
decompression mode to use based on the file extension
(.bzip2
, .deflate
, .gz
, .zip
).
Note: The Bulk Decompress Cloud Storage Files template is intended for single compressed files and not compressed folders.
Requirements for this pipeline:
- The files to decompress must be in one of the following formats:
Bzip2
,Deflate
,Gzip
,Zip
. - The output directory must exist prior to running the pipeline.
Template parameters
Parameter | Description |
---|---|
inputFilePattern |
The input file pattern to read from. For example, gs://bucket-name/compressed/*.gz . |
outputDirectory |
The output location to write to. For example, gs://bucket-name/decompressed . |
outputFailureFile |
The error log output file to use for write failures that occur during the decompression
process. For example, gs://bucket-name/decompressed/failed.csv . If there are no
failures, the file is still created but will be empty. The file contents are in CSV format
(Filename, Error) and consist of one line for each file that fails decompression. |
Running the Bulk Decompress Cloud Storage Files template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bulk Decompress Files on Cloud Storage template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\ outputDirectory=gs://BUCKET_NAME/decompressed,\ outputFailureFile=OUTPUT_FAILURE_FILE_PATH
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
BUCKET_NAME
: the name of your Cloud Storage bucketOUTPUT_FAILURE_FILE_PATH
: your choice of path to the file containing failure information
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz", "outputDirectory": "gs://BUCKET_NAME/decompressed", "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH" }, "environment": { "zone": "us-central1-f" } }
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
BUCKET_NAME
: the name of your Cloud Storage bucketOUTPUT_FAILURE_FILE_PATH
: your choice of path to the file containing failure information
Datastore Bulk Delete [Deprecated]
This template is deprecated and will be removed in Q1 2022. Please migrate to Firestore Bulk Delete template.
The Datastore Bulk Delete template is a pipeline which reads in Entities from Datastore with a given GQL query and then deletes all matching Entities in the selected target project. The pipeline can optionally pass the JSON encoded Datastore Entities to your Javascript UDF, which you can use to filter out Entities by returning null values.
Requirements for this pipeline:
- Datastore must be set up in the project prior to running the template.
- If reading and deleting from separate Datastore instances, the Dataflow Worker Service Account must have permission to read from one instance and delete from the other.
Template parameters
Parameter | Description |
---|---|
datastoreReadGqlQuery |
GQL query which specifies which entities to match for deletion. Using a keys-only query may improve performance. For example: "SELECT __key__ FROM MyKind". |
datastoreReadProjectId |
GCP Project ID of the Datastore instance from which you want to read entities (using your GQL query) that are used for matching. |
datastoreDeleteProjectId |
GCP Project ID of the Datastore instance from which to delete matching entities. This can be the same as datastoreReadProjectId if you want to read and delete within the same Datastore instance. |
datastoreReadNamespace |
(Optional) Namespace of requested Entities. Set as "" for default namespace. |
datastoreHintNumWorkers |
(Optional) Hint for the expected number of workers in the Datastore ramp-up throttling step. Default is 500 . |
javascriptTextTransformGcsPath |
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ } , then the function name is
myTransform . For sample JavaScript UDFs, see
UDF Examples.
If this function returns a value of undefined or null for
a given Datastore entity, then that entity is not deleted. |
Running the Datastore Bulk Delete template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bulk Delete Entities in Datastore template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \ --region REGION_NAME \ --parameters \ datastoreReadGqlQuery="GQL_QUERY",\ datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\ datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GQL_QUERY
: the query you'll use to match entities for deletionDATASTORE_READ_AND_DELETE_PROJECT_ID
: your Datastore instance project ID. This example both reads and deletes from the same Datastore instance.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete { "jobName": "JOB_NAME", "parameters": { "datastoreReadGqlQuery": "GQL_QUERY", "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID", "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GQL_QUERY
: the query you'll use to match entities for deletionDATASTORE_READ_AND_DELETE_PROJECT_ID
: your Datastore instance project ID. This example both reads and deletes from the same Datastore instance.
Firestore Bulk Delete
The Firestore Bulk Delete template is a pipeline which reads in Entities from Firestore with a given GQL query and then deletes all matching Entities in the selected target project. The pipeline can optionally pass the JSON encoded Firestore Entities to your Javascript UDF, which you can use to filter out Entities by returning null values.
Requirements for this pipeline:
- Firestore must be set up in the project prior to running the template.
- If reading and deleting from separate Firestore instances, the Dataflow Worker Service Account must have permission to read from one instance and delete from the other.
Template parameters
Parameter | Description |
---|---|
firestoreReadGqlQuery |
GQL query which specifies which entities to match for deletion. Using a keys-only query may improve performance. For example: "SELECT __key__ FROM MyKind". |
firestoreReadProjectId |
GCP Project ID of the Firestore instance from which you want to read entities (using your GQL query) that are used for matching. |
firestoreDeleteProjectId |
GCP Project ID of the Firestore instance from which to delete matching entities. This can be the same as firestoreReadProjectId if you want to read and delete within the same Firestore instance. |
firestoreReadNamespace |
(Optional) Namespace of requested Entities. Set as "" for default namespace. |
firestoreHintNumWorkers |
(Optional) Hint for the expected number of workers in the Firestore ramp-up throttling step. Default is 500 . |
javascriptTextTransformGcsPath |
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ } , then the function name is
myTransform . For sample JavaScript UDFs, see
UDF Examples.
If this function returns a value of undefined or null for
a given Firestore entity, then that entity is not deleted. |
Running the Firestore Bulk Delete template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bulk Delete Entities in Firestore template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \ --region REGION_NAME \ --parameters \ firestoreReadGqlQuery="GQL_QUERY",\ firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\ firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GQL_QUERY
: the query you'll use to match entities for deletionFIRESTORE_READ_AND_DELETE_PROJECT_ID
: your Firestore instance project ID. This example both reads and deletes from the same Firestore instance.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete { "jobName": "JOB_NAME", "parameters": { "firestoreReadGqlQuery": "GQL_QUERY", "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID", "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GQL_QUERY
: the query you'll use to match entities for deletionFIRESTORE_READ_AND_DELETE_PROJECT_ID
: your Firestore instance project ID. This example both reads and deletes from the same Firestore instance.
Streaming Data Generator to Pub/Sub, BigQuery, and Cloud Storage
The Streaming Data Generator template is used to generate either an unlimited or fixed number of synthetic records or messages based on user provided schema at the specified rate. Compatible destinations include Pub/Sub topics, BigQuery tables, and Cloud Storage buckets.
Following are a set of few possible use cases:
- Simulate large-scale real-time event publishing to a Pub/Sub topic to measure and determine the number and size of consumers required to process published events.
- Generate synthetic data to a BigQuery table or a Cloud Storage bucket to evaluate performance benchmarks or serve as a proof of concept.
Supported sinks and encoding formats
The following table describes which sinks and encoding formats are supported by this template:JSON | Avro | Parquet | |
---|---|---|---|
Pub/Sub | Yes | Yes | No |
BigQuery | Yes | No | No |
Cloud Storage | Yes | Yes | Yes |
The JSON Data Generator library used by the pipeline allows various faker functions to be used for each schema field. For more information on the faker functions and schema format, see the json-data-generator documentation.
Requirements for this pipeline:
- Create a message schema file and store this file in a Cloud Storage location.
- The output target must exist prior to execution. The target must be a Pub/Sub topic, a BigQuery table, or a Cloud Storage bucket depending on sink type.
- If the output encoding is Avro or Parquet, then create an Avro schema file and store it in a Cloud Storage location.
Template parameters
Parameter | Description |
---|---|
schemaLocation |
Location of the schema file. For example: gs://mybucket/filename.json . |
qps |
Number of messages to be published per second. For example: 100 . |
sinkType |
(Optional) Output sink Type. Possible values are PUBSUB , BIGQUERY , GCS . Default is PUBSUB. |
outputType |
(Optional) Output encoding Type. Possible values are JSON , AVRO , PARQUET . Default is JSON. |
avroSchemaLocation |
(Optional) Location of AVRO Schema file. Mandatory when outputType is AVRO or PARQUET. For example: gs://mybucket/filename.avsc . |
topic |
(Optional) Name of the Pub/Sub topic to which the pipeline should publish data. Mandatory when sinkType is Pub/Sub. For example: . |
outputTableSpec |
(Optional) Name of the output BigQuery table. Mandatory when sinkType is BigQuery. For example: . |
writeDisposition |
(Optional) BigQuery Write Disposition. Possible values are WRITE_APPEND , WRITE_EMPTY or WRITE_TRUNCATE . Default is WRITE_APPEND. |
outputDeadletterTable |
(Optional) Name of the output BigQuery table to hold failed records. If not provided, pipeline creates table during execution with name {output_table_name}_error_records. For example: . |
outputDirectory |
(Optional) Path of the output Cloud Storage location. Mandatory when sinkType is Cloud Storage. For example: gs://mybucket/pathprefix/ . |
outputFilenamePrefix |
(Optional) The filename prefix of the output files written to Cloud Storage. Default is output-. |
windowDuration |
(Optional) Window interval at which output is written to Cloud Storage. Default is 1m (in other words, 1 minute). |
numShards |
(Optional) Maximum number of output shards. Mandatory when sinkType is Cloud Storage and should be set to 1 or higher number. |
messagesLimit |
(Optional) Maximum number of output messages. Default is 0 indicating unlimited. |
autoscalingAlgorithm |
(Optional) Algorithm used for autoscaling the workers. Possible values are
THROUGHPUT_BASED to enable autoscaling or NONE to disable. |
maxNumWorkers |
(Optional) Maximum number of worker machines. For example: 10 . |
Running the Streaming Data Generator template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Streaming Data Generator template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \ --parameters \ schemaLocation=SCHEMA_LOCATION,\ qps=QPS,\ topic=PUBSUB_TOPIC
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
JOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
SCHEMA_LOCATION
: the path to schema file in Cloud Storage. For example:gs://mybucket/filename.json
.QPS
: the number of messages to be published per secondPUBSUB_TOPIC
: the output Pub/Sub topic. For example:projects/my-project-ID/topics/my-topic-ID
.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "schemaLocation": "SCHEMA_LOCATION", "qps": "QPS", "topic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator", } }
Replace the following:
PROJECT_ID
: the Cloud project ID where you want to run the Dataflow jobLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
JOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
SCHEMA_LOCATION
: the path to schema file in Cloud Storage. For example:gs://mybucket/filename.json
.QPS
: the number of messages to be published per secondPUBSUB_TOPIC
: the output Pub/Sub topic. For example:projects/my-project-ID/topics/my-topic-ID
.