Google provides a set of open-source Dataflow templates. For general information about templates, see the Overview page. For a list of all Google-provided templates, see the Get started with Google-provided templates page.
This page documents streaming templates:
- Pub/Sub Subscription to BigQuery
- Pub/Sub Topic to BigQuery
- Pub/Sub Avro to BigQuery
- Pub/Sub to Pub/Sub
- Pub/Sub to Splunk
- Pub/Sub to Avro Files on Cloud Storage
- Pub/Sub to Text Files on Cloud Storage
- Pub/Sub to MongoDB
- Text Files on Cloud Storage to BigQuery (Stream)
- Text Files on Cloud Storage to Pub/Sub (Stream)
- Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)
- Change Data Capture to BigQuery (Stream)
- Apache Kafka to BigQuery
Pub/Sub Subscription to BigQuery
The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.
Requirements for this pipeline:
- The Pub/Sub messages must be in JSON format, described
here.
For example, messages formatted as
{"k1":"v1", "k2":"v2"}
may be inserted into a BigQuery table with two columns, namedk1
andk2
, with string data type. - The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.
Template parameters
Parameter | Description |
---|---|
inputSubscription |
The Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription> . |
outputTableSpec |
The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table> |
outputDeadletterTable |
The BigQuery table for messages that failed to reach the output table, in the format of <my-project>:<my-dataset>.<my-table> .
If it doesn't exist, it is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead. |
Running the Pub/Sub Subscription to BigQuery template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub Subscription to BigQuery template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choice.REGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: your Pub/Sub subscription nameDATASET
: your BigQuery datasetTABLE_NAME
: your BigQuery table name
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choice.REGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: your Pub/Sub subscription nameDATASET
: your BigQuery datasetTABLE_NAME
: your BigQuery table name
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
Pub/Sub Topic to BigQuery
The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.
Requirements for this pipeline:
- The Pub/Sub messages must be in JSON format, described
here.
For example, messages formatted as
{"k1":"v1", "k2":"v2"}
may be inserted into a BigQuery table with two columns, namedk1
andk2
, with string data type. - The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.
Template parameters
Parameter | Description |
---|---|
inputTopic |
The Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic> . |
outputTableSpec |
The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table> |
outputDeadletterTable |
The BigQuery table for messages that failed to reach the output table. It should be in <my-project>:<my-dataset>.<my-table> format.
If it doesn't exist, it is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead. |
Running the Pub/Sub Topic to BigQuery template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub Topic to BigQuery template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/PubSub_to_BigQuery
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: your Pub/Sub topic nameDATASET
: your BigQuery datasetTABLE_NAME
: your BigQuery table name
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/PubSub_to_BigQuery
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: your Pub/Sub topic nameDATASET
: your BigQuery datasetTABLE_NAME
: your BigQuery table name
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
Pub/Sub Avro to BigQuery
The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.
Requirements for this pipeline
- The input Pub/Sub subscription must exist.
- The schema file for the Avro records must exist on Cloud Storage.
- The unprocessed Pub/Sub topic must exist.
- The output BigQuery dataset must exist.
Template parameters
Parameter | Description |
---|---|
schemaPath |
The Cloud Storage location of the Avro schema file. For example, gs://path/to/my/schema.avsc . |
inputSubscription |
The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription> . |
outputTopic |
The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
The BigQuery output table location. For example, <my-project>:<my-dataset>.<my-table> .
Depending on the createDisposition specified, the output table may be created
automatically using the user provided Avro schema. |
writeDisposition |
(Optional) The BigQuery WriteDisposition.
For example, WRITE_APPEND , WRITE_EMPTY or WRITE_TRUNCATE . Default: WRITE_APPEND |
createDisposition |
(Optional) The BigQuery CreateDisposition.
For example, CREATE_IF_NEEDED , CREATE_NEVER . Default: CREATE_IF_NEEDED |
Running the Pub/Sub Avro to BigQuery template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub Avro to BigQuery template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
tool command-line tool
Note: To use the gcloud
tool
command-line tool to run templates, you must have
Cloud SDK version 284.0.0 or higher.
When running this template, you need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery
Replace the following:
JOB_NAME
: a job name of your choiceREGION_NAME
: the Dataflow region name (for example,us-central1
)SCHEMA_PATH
: the Cloud Storage path to the Avro schema file (for example,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameDEADLETTER_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
API
Run from the REST APIWhen running this template, you need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
JOB_NAME
: a job name of your choiceLOCATION
: the Dataflow region name (for example,us-central1
)SCHEMA_PATH
: the Cloud Storage path to the Avro schema file (for example,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameDEADLETTER_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Pub/Sub to Pub/Sub
The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.
Requirements for this pipeline:
- The source Pub/Sub subscription must exist prior to execution.
- The destination Pub/Sub topic must exist prior to execution.
Template parameters
Parameter | Description |
---|---|
inputSubscription |
Pub/Sub subscription to read the input from. For example, projects/<project-id>/subscriptions/<subscription-name> . |
outputTopic |
Cloud Pub/Sub topic to write the output to. For example, projects/<project-id>/topics/<topic-name> . |
filterKey |
[Optional] Filter events based on an attribute key. No filters are applied if filterKey is not specified. |
filterValue |
[Optional] Filter attribute value to use in case a filterKey is provided. A null filterValue is used by default. |
Running the Pub/Sub to Pub/Sub template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub to Pub/Sub template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: the Pub/Sub subscription nameTOPIC_NAME
: the Pub/Sub topic nameFILTER_KEY
: the attribute key by which events are filtered. No filters are applied if no key is specified.FILTER_VALUE
: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (e.g. substring) will not be filtered. A null event filter value is used by default.
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: the Pub/Sub subscription nameTOPIC_NAME
: the Pub/Sub topic nameFILTER_KEY
: the attribute key by which events are filtered. No filters are applied if no key is specified.FILTER_VALUE
: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (e.g. substring) will not be filtered. A null event filter value is used by default.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
Pub/Sub to Splunk
The Pub/Sub to Splunk template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the message payload to Splunk via Splunk's HTTP Event Collector (HEC). Before writing to Splunk, you can also apply a JavaScript user-defined function to the message payload. Any messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further troubleshooting and reprocessing.
As an extra layer of protection for your HEC token, you can also pass in a Cloud KMS key along with the base64-encoded HEC token parameter encrypted with the Cloud KMS key. See the Cloud KMS API encryption endpoint for additional details on encrypting your HEC token parameter.
Requirements for this pipeline:
- The source Pub/Sub subscription must exist prior to running the pipeline.
- The Pub/Sub unprocessed topic must exist prior to running the pipeline.
- The Splunk HEC endpoint must be accessible from the Dataflow workers' network.
- The Splunk HEC token must be generated and available.
Template parameters
Parameter | Description |
---|---|
inputSubscription |
The Pub/Sub subscription from which to read the input. For example, projects/<project-id>/subscriptions/<subscription-name> . |
token |
The Splunk HEC authentication token. This base64-encoded string can be encrypted with a Cloud KMS key for additional security. |
url |
The Splunk HEC url. This must be routable from the VPC in which the pipeline runs. For example, https://splunk-hec-host:8088. |
outputDeadletterTopic |
The Pub/Sub topic to forward undeliverable messages. For example, projects/<project-id>/topics/<topic-name> . |
javascriptTextTransformGcsPath |
[Optional] The Cloud Storage path that contains all your JavaScript code. For example,
gs://mybucket/mytransforms/*.js . |
javascriptTextTransformFunctionName |
[Optional] The name of the JavaScript function to be called. For example, if your JavaScript function is
function myTransform(inJson) { ...dostuff...} then the function name is
myTransform .
|
batchCount |
[Optional] The batch size for sending multiple events to Splunk. Default 1 (no batching). |
parallelism |
[Optional] The maximum number of parallel requests. Default 1 (no parallelism). |
disableCertificateValidation |
[Optional] Disable SSL certificate validation. Default false (validation enabled). |
includePubsubMessage |
[Optional] Include the full Pub/Sub message in the payload. Default false (only the data element is included in the payload). |
tokenKMSEncryptionKey |
[Optional] The Cloud KMS key to decrypt the HEC token string. If the Cloud KMS key is provided, the HEC token string must be passed in encrypted. |
Running the Pub/Sub to Splunk template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub to Splunk template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: the Pub/Sub subscription nameTOKEN
: Splunk's Http Event Collector tokenURL
: the URL path for Splunk's Http Event Collector (for example,https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: the Pub/Sub topic nameJAVASCRIPT_FUNCTION
: your JavaScript function namePATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage path to the.js
file containing your JavaScript code (for example, gs://your-bucket/your-function.js)BATCH_COUNT
: the batch size to use for sending multiple events to SplunkPARALLELISM
: the number of parallel requests to use for sending events to SplunkDISABLE_VALIDATION
:true
if you want to disable SSL certificate validation
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: the Pub/Sub subscription nameTOKEN
: Splunk's Http Event Collector tokenURL
: the URL path for Splunk's Http Event Collector (for example,https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: the Pub/Sub topic nameJAVASCRIPT_FUNCTION
: your JavaScript function namePATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage path to the.js
file containing your JavaScript code (for example, gs://your-bucket/your-function.js)BATCH_COUNT
: the batch size to use for sending multiple events to SplunkPARALLELISM
: the number of parallel requests to use for sending events to SplunkDISABLE_VALIDATION
:true
if you want to disable SSL certificate validation
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION" } }
Pub/Sub to Avro Files on Cloud Storage
The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket.
Requirements for this pipeline:
- The input Pub/Sub topic must exist prior to pipeline execution.
Template parameters
Parameter | Description |
---|---|
inputTopic |
Cloud Pub/Sub Topic to subscribe for message consumption. The topic name should be in the format of projects/<project-id>/topics/<topic-name> . |
outputDirectory |
Output Directory where output Avro Files will be archived. Add / at the end. For example: gs://example-bucket/example-directory/ . |
avroTempDirectory |
Directory for temporary Avro Files. Please add / at the end. For eg: gs://example-bucket/example-directory/ . |
outputFilenamePrefix |
[Optional] Output Filename Prefix for the Avro Files. |
outputFilenameSuffix |
[Optional] Output Filename Suffix for the Avro Files. |
outputShardTemplate |
[Optional] The shard template of the output file. Specified as repeating sequences of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the shard number, or number of shards respectively. Default Template Format is 'W-P-SS-of-NN' when this parameter is not specified. |
numShards |
[Optional] The maximum number of output shards produced when writing. Default maximum number of shards is 1. |
Running the Pub/Sub to Cloud Storage Avro template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub to Cloud Storage Avro template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: the Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucketFILENAME_PREFIX
: the preferred output filename prefixFILENAME_SUFFIX
: the preferred output filename suffixSHARD_TEMPLATE
: the preferred output shard templateNUM_SHARDS
: the number of output shards
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ numShards=NUM_SHARDS,\ avroTempDirectory=gs://BUCKET_NAME/temp/
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: the Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucketFILENAME_PREFIX
: the preferred output filename prefixFILENAME_SUFFIX
: the preferred output filename suffixSHARD_TEMPLATE
: the preferred output shard templateNUM_SHARDS
: the number of output shards
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE", "numShards": "NUM_SHARDS", } }
Pub/Sub to Text Files on Cloud Storage
The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.
Requirements for this pipeline:
- The Pub/Sub topic must exist prior to execution.
- The messages published to the topic must be in text format.
- The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.
Template parameters
Parameter | Description |
---|---|
inputTopic |
The Pub/Sub topic to read the input from. The topic name should be in the
format projects/<project-id>/topics/<topic-name> . |
outputDirectory |
The path and filename prefix for writing output files. For example,
gs://bucket-name/path/ . This value must end in a slash. |
outputFilenamePrefix |
The prefix to place on each windowed file. For example, output- |
outputFilenameSuffix |
The suffix to place on each windowed file, typically a file extension such as
.txt or .csv . |
outputShardTemplate |
The shard template defines the dynamic portion of each windowed file. By default, the
pipeline uses a single shard for output to the file system within each window. This means
that all data will land into a single file per window. The outputShardTemplate
defaults to W-P-SS-of-NN where W is the window date range,
P is the pane info, S is the shard number, and N is
the number of shards. In case of a single file, the SS-of-NN portion of the
outputShardTemplate will be 00-of-01 .
|
Running the Pub/Sub to Text Files on Cloud Storage template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Pub/Sub to Text Files on Cloud Storage template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: your Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucket
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: your Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucket
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
Pub/Sub to MongoDB
The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. If a table for unprocessed records does not exist prior to execution, the pipeline automatically creates this table.
Requirements for this pipeline:
- The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.
- The MongoDB cluster must exist and should be acccessible from the Dataflow worker machines.
Template parameters
Parameter | Description |
---|---|
inputSubscription |
Name of the Pub/Sub subscription. For example:
|
mongoDBUri |
Comma separated list of MongoDB servers. For example: 192.285.234.12:27017,192.287.123.11:27017 |
database |
Database in MongoDB to store the collection. For example: my-db . |
collection |
Name of the collection inside MongoDB database. For example: my-collection . |
deadletterTable |
BigQuery table that store messages due to failures (mismatched schema, malformed json etc). For example: project-id:dataset-name.table-name . |
javascriptTextTransformGcsPath |
[Optional] Cloud Storage location of JavaScript file contating UDF transform. For example: gs://mybucket/filename.json . |
javascriptTextTransformFunctionName |
[Optional] Name of JavaScript UDF. For example: transform . |
batchSize |
[Optional] Batch size used for batch insertion of documents into MongoDB. Default: 1000 . |
batchSizeBytes |
[Optional] Batch size in bytes. Default: 5242880 . |
maxConnectionIdleTime |
[Optional] Maximum idle time allowed in seconds before connection time out occurs. Default: 60000 . |
sslEnabled |
[Optional] Boolean value indicating whether connection to MongoDB is SSL enabled. Default: true . |
ignoreSSLCertificate |
[Optional] Boolean value indicating if SSL certifcate should be ignored. Default: true . |
withOrdered |
[Optional] Boolean value enabling ordered bulk insertions into MongoDB. Default: true . |
withSSLInvalidHostNameAllowed |
[Optional] Boolean value indicating if invalid host name is allowed for SSL connection. Default: true . |
Running the Pub/Sub to MongoDB template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select Pub/Sub to MongoDB template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 284.0.0 or higher.
When running this template, you need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB
Replace the following:
PROJECT_ID
: your project IDREGION_NAME
: Dataflow region name (for example,us-central1
)JOB_NAME
: a job name of your choiceINPUT_SUBSCRIPTION
: the Pub/Sub subscription (for example,
)projects/<project-id>/subscriptions/<subscription-name>
MONGODB_URI
: the MongoDB server addresses (for example,192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: the name of the MongoDB database (for example,users
)COLLECTION
: the name of the MongoDB collection (for example,profiles
)UNPROCESSED_TABLE
: the name of the BigQuery table (for example,your-project:your-dataset.your-table-name
)
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
API
Run from the REST APIWhen running this template, you need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDLOCATION
: Dataflow region name (for example,us-central1
)JOB_NAME
: a job name of your choiceINPUT_SUBSCRIPTION
: the Pub/Sub subscription (for example,
)projects/<project-id>/subscriptions/<subscription-name>
MONGODB_URI
: the MongoDB server addresses (for example,192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: the name of the MongoDB database (for example,users
)COLLECTION
: the name of the MongoDB collection (for example,profiles
)UNPROCESSED_TABLE
: the name of the BigQuery table (for example,your-project:your-dataset.your-table-name
)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB", } }
Text Files on Cloud Storage to BigQuery (Stream)
The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and output the result to BigQuery.
Requirements for this pipeline:
- Create a JSON file that describes the schema of your output table in BigQuery.
Ensure that there is a top-level JSON array titled
BigQuery Schema
and that its contents follow the pattern{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. For example:{ "BigQuery Schema": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING", "mode": "REQUIRED" }, { "name": "coffee", "type": "STRING", "mode": "REQUIRED" } ] }
- Create a JavaScript (
.js
) file with your UDF function that supplies the logic to transform the lines of text. Note that your function must return a JSON string.For example, this function splits each line of a CSV file and returns a JSON string after transforming the values.
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
Template parameters
Parameter | Description |
---|---|
javascriptTextTransformGcsPath |
Cloud Storage location of your JavaScript UDF.
For example: gs://my_bucket/my_function.js . |
JSONPath |
Cloud Storage location of your BigQuery schema file, described as a JSON.
For example: gs://path/to/my/schema.json . |
javascriptTextTransformFunctionName |
The name of the JavaScript function you wish to call as your UDF.
For example: transform . |
outputTable |
The fully qualified BigQuery table.
For example: my-project:dataset.table |
inputFilePattern |
Cloud Storage location of the text you'd like to process.
For example: gs://my-bucket/my-files/text.txt . |
bigQueryLoadingTemporaryDirectory |
Temporary directory for BigQuery loading process.
For example: gs://my-bucket/my-files/temp_dir |
outputDeadletterTable |
Table for messages that failed to reach the output table.
For example: my-project:dataset.my-unprocessed-table . If it doesn't exist,
it will be created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead. |
Running the Cloud Storage Text to BigQuery (Stream) template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Cloud Storage Text to BigQuery template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: the name of your UDFPATH_TO_BIGQUERY_SCHEMA_JSON
: the Cloud Storage path to the JSON file containing the schema definitionPATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage path to the.js
file containing your JavaScript codePATH_TO_TEXT_DATA
: the Cloud Storage path to your text datasetBIGQUERY_TABLE
: your BigQuery table nameBIGQUERY_UNPROCESSED_TABLE
: the name of your BigQuery table for unprocessed messagesPATH_TO_TEMP_DIR_ON_GCS
: the Cloud Storage path to the temp directory
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: the name of your UDFPATH_TO_BIGQUERY_SCHEMA_JSON
: the Cloud Storage path to the JSON file containing the schema definitionPATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage path to the.js
file containing your JavaScript codePATH_TO_TEXT_DATA
: the Cloud Storage path to your text datasetBIGQUERY_TABLE
: your BigQuery table nameBIGQUERY_UNPROCESSED_TABLE
: the name of your BigQuery table for unprocessed messagesPATH_TO_TEMP_DIR_ON_GCS
: the Cloud Storage path to the temp directory
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" } }
Text Files on Cloud Storage to Pub/Sub (Stream)
This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. You can use this template to replay data to Pub/Sub.
Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time will be equal to the publishing time during execution. If your pipeline relies on an accurate event time for processing, you should not use this pipeline.
Requirements for this pipeline:
- Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, as each line within the files will be published as a message to Pub/Sub.
- The Pub/Sub topic must exist prior to execution.
- The pipeline runs indefinitely and needs to be terminated manually.
Template parameters
Parameter | Description |
---|---|
inputFilePattern |
The input file pattern to read from. For example, gs://bucket-name/files/*.json
or gs://bucket-name/path/*.csv . |
outputTopic |
The Pub/Sub input topic to write to. The name should be in the format of
projects/<project-id>/topics/<topic-name> . |
Running the Text Files on Cloud Storage to Pub/Sub (Stream) template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Text Files on Cloud Storage to Pub/Sub (Stream) template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: your Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucketFILE_PATTERN
: the file pattern glob to read from in the Cloud Storage bucket (for example,path/*.csv
)
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION\ --staging-location TEMP_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
PROJECT_ID
: your project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)TOPIC_NAME
: your Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucketFILE_PATTERN
: the file pattern glob to read from in the Cloud Storage bucket (for example,path/*.csv
)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)
The Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template is a streaming pipeline that reads csv files from a Cloud Storage bucket, calls the Cloud Data Loss Prevention (Cloud DLP) API for de-identification, and writes the de-identified data into the specified BigQuery table. This template supports using both a Cloud DLP inspection template and a Cloud DLP de-identification template . This allows users to inspect for potentially sensitive information and de-identify, as well as de-identify structured data where columns are specified to be de-identified and no inspection is needed.
Requirements for this pipeline:
- The input data to tokenize must exist
- The Cloud DLP Templates must exist (for example, DeidentifyTemplate and InspectTemplate). See Cloud DLP templates for more details.
- The BigQuery dataset must exist
Template parameters
Parameter | Description |
---|---|
inputFilePattern |
The csv file(s) to read input data records from. Wildcarding is also accepted. For example, gs://mybucket/my_csv_filename.csv
or gs://mybucket/file-*.csv .
|
dlpProjectId |
Cloud DLP project ID that owns the Cloud DLP API resource.
This Cloud DLP project can be the same project that owns the
Cloud DLP templates, or it can be a separate project.
For example, my_dlp_api_project . |
deidentifyTemplateName |
Cloud DLP deidentification template to use for API requests, specified with the pattern
projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} .
For example, projects/my_project/deidentifyTemplates/100 . |
datasetName |
BigQuery dataset for sending tokenized results. |
batchSize |
Chunking/Batch size for sending data to inspect and/or detokenize. In the case of a csv file,
batchSize is the number of rows in a batch. Users must determine the batch size based on the size
of the records and the sizing of the file. Note that the Cloud DLP API has a payload size limit of 524 KB per API call. |
inspectTemplateName |
[Optional] Cloud DLP inspection template to use for API requests, specified with the pattern
projects/{template_project_id}/identifyTemplates/{idTemplateId} .
For example, projects/my_project/identifyTemplates/100 . |
Running the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
command-line tool
Note: To use the gcloud
command-line tool to run templates, you must have
Cloud SDK version 138.0.0 or higher.
When running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
Replace the following:
TEMPLATE_PROJECT_ID
: your template project IDDLP_API_PROJECT_ID
: your Cloud DLP API project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)INPUT_DATA
: your input file pathDEIDENTIFY_TEMPLATE
: the Cloud DLPDeidentify Template numberDATASET_NAME
: the BigQuery dataset nameINSPECT_TEMPLATE_NUMBER
: the Cloud DLPInspect Template numberBATCH_SIZE_VALUE
: the batch size (# of rows per API for csv's)
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputFilePattern=INPUT_DATA,\ datasetName=DATASET_NAME,\ batchSize=BATCH_SIZE_VALUE,\ dlpProjectId=DLP_API_PROJECT_ID,\ deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\ inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER
API
Run from the REST APIWhen running this template, you'll need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
TEMPLATE_PROJECT_ID
: your template project IDDLP_API_PROJECT_ID
: your Cloud DLP API project IDJOB_NAME
: a job name of your choiceREGION
: the regional endpoint (for example,us-west1
)TEMP_LOCATION
: the location for writing temporary files (for example,gs://your-bucket/temp
)INPUT_DATA
: your input file pathDEIDENTIFY_TEMPLATE
: the Cloud DLPDeidentify Template numberDATASET_NAME
: the BigQuery dataset nameINSPECT_TEMPLATE_NUMBER
: the Cloud DLPInspect Template numberBATCH_SIZE_VALUE
: the batch size (# of rows per API for csv's)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern":INPUT_DATA, "datasetName": "DATASET_NAME", "batchSize": "BATCH_SIZE_VALUE", "dlpProjectId": "DLP_API_PROJECT_ID", "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE", "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER" } }
Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)
The Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub template is a streaming pipeline that reads Pub/Sub messages with change data from a MySQL database and writes the records to BigQuery. A Debezium connector captures changes to the MySQL database and publishes the changed data to Pub/Sub. The template then reads the Pub/Sub messages and writes them to BigQuery.
You can use this template to sync MySQL databases and BigQuery tables. The pipeline writes the changed data to a BigQuery staging table and intermittently updates a BigQuery table replicating the MySQL database.
Requirements for this pipeline:
Template parameters
Parameter | Description |
---|---|
inputSubscriptions |
The comma-separated list of Pub/Sub input subscriptions to read from, in the format of <subscription>,<subscription>, ... |
changeLogDataset |
The BigQuery dataset to store the staging tables, in the format of <my-dataset> |
replicaDataset |
The location of the BigQuery dataset to store the replica tables, in the format of <my-dataset> |
updateFrequencySecs |
(Optional) The interval at which the pipeline updates the BigQuery table replicating the MySQL database. |
Running the Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery template
To run this template, perform the following steps:
- On your local machine, clone the DataflowTemplates repository.
- Change to the
v2/cdc-parent
directory. - Ensure that the Debezium connector is deployed.
- Using Maven, run the Dataflow template.
Replace the following values:
PROJECT_ID
: your project ID.YOUR_SUBSCRIPTIONS
: your comma-separated list of Pub/Sub subscription names.YOUR_CHANGELOG_DATASET
: your BigQuery dataset for changelog data.YOUR_REPLICA_DATASET
: your BigQuery dataset for replica tables.
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=YOUR_SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=YOUR_CHANGELOG_DATASET \ --replicaDataset=YOUR_REPLICA_DATASET \ --project=PROJECT_ID"
Apache Kafka to BigQuery
The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. If the errors table does not exist prior to execution, then it is created.
Requirements for this pipeline
- The output BigQuery table must exist.
- The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
- The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format.
Template parameters
Parameter | Description |
---|---|
outputTableSpec |
The BigQuery output table location to write the Apache Kafka messages to, in the format of my-project:dataset.table |
inputTopics |
The Apache Kafka input topics to read from in a comma-separated list. For example: messages |
bootstrapServers |
The host address of the running Apache Kafka broker servers in a comma-separated list, each host address in the format of 35.70.252.199:9092 |
javascriptTextTransformGcsPath |
(Optional) Cloud Storage location path to the JavaScript UDF. For example: gs://my_bucket/my_function.js |
javascriptTextTransformFunctionName |
(Optional) The name of the JavaScript to call as your UDF. For example: transform |
outputDeadletterTable |
(Optional) The BigQuery table for messages that failed to reach the output table, in the format of my-project:dataset.my-deadletter-table . If it doesn't exist, the table is created during pipeline execution.
If not specified, <outputTableSpec>_error_records is used instead. |
Running the Apache Kafka to BigQuery template
Console
Run from the Google Cloud Console- Go to the Dataflow page in the Cloud Console. Go to the Dataflow page
- Click addCreate job from template.
- Select the Apache Kafka to BigQuery template from the Dataflow template drop-down menu.
- Enter a job name in the Job Name field.
- Enter your parameter values in the provided parameter fields.
- Click Run Job.

gcloud
Run from thegcloud
tool command-line tool
Note: To use the gcloud
tool
command-line tool to run templates, you must have
Cloud SDK version 284.0.0 or higher.
When running this template, you need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery
Replace the following:
YOUR_PROJECT_ID
: your template project IDJOB_NAME
: a job name of your choiceREGION_NAME
: the Dataflow region name. For example:us-central1
BIGQUERY_TABLE
: your BigQuery table nameKAFKA_TOPICS
: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.PATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage path to the.js
file containing your JavaScript codeYOUR_JAVASCRIPT_FUNCTION
: the name of your UDFKAFKA_SERVER_ADDRESSES
: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example:35.70.252.199:9092
. If multiple addresses are provided, follow instructions on how to escape commas.
gcloud beta dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
API
Run from the REST APIWhen running this template, you need the Cloud Storage path to the template:
gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery
To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.
Replace the following:
YOUR_PROJECT_ID
: your template project IDJOB_NAME
: a job name of your choiceLOCATION
: the Dataflow region name. For example:us-central1
BIGQUERY_TABLE
: your BigQuery table nameKAFKA_TOPICS
: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.PATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage path to the.js
file containing your JavaScript codeYOUR_JAVASCRIPT_FUNCTION
: the name of your UDFKAFKA_SERVER_ADDRESSES
: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example:35.70.252.199:9092
. If multiple addresses are provided, follow instructions on how to escape commas.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery", } }