Google-provided streaming templates

Google provides a set of open-source Dataflow templates. For general information about templates, see the Overview page. For a list of all Google-provided templates, see the Get started with Google-provided templates page.

This page documents streaming templates:

Pub/Sub Subscription to BigQuery

The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.

Requirements for this pipeline:

  • The Pub/Sub messages must be in JSON format, described here. For example, messages formatted as {"k1":"v1", "k2":"v2"} may be inserted into a BigQuery table with two columns, named k1 and k2, with string data type.
  • The output table must exist prior to running the pipeline.

Template parameters

Parameter Description
inputSubscription The Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription>.
outputTableSpec The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>

Running the Pub/Sub Subscription to BigQuery template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub Subscription to BigQuery template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.

Requirements for this pipeline:

  • The Pub/Sub messages must be in JSON format, described here. For example, messages formatted as {"k1":"v1", "k2":"v2"} may be inserted into a BigQuery table with two columns, named k1 and k2, with string data type.
  • The output table must exist prior to pipeline execution.

Template parameters

Parameter Description
inputTopic The Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic>.
outputTableSpec The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>

Running the Pub/Sub Topic to BigQuery template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub Topic to BigQuery template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub dead-letter topic.

Requirements for this pipeline

  • The input Pub/Sub subscription must exist.
  • The schema file for the Avro records must exist on Cloud Storage.
  • The dead-letter Pub/Sub topic must exist.
  • The output BigQuery dataset must exist.

Template parameters

Parameter Description
schemaPath The Cloud Storage location of the Avro schema file. For example, gs://path/to/my/schema.avsc.
inputSubscription The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic The Pub/Sub topic to use as a dead-letter for failed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec The BigQuery output table location. For example, <my-project>:<my-dataset>.<my-table>. Depending on the createDisposition specified, the output table may be created automatically using the user provided Avro schema.
writeDisposition (Optional) The BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND
createDisposition (Optional) The BigQuery CreateDisposition. For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED

Running the Pub/Sub Avro to BigQuery template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub Avro to BigQuery template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud tool command-line tool

Note: To use the gcloud tool command-line tool to run templates, you must have Cloud SDK version 284.0.0 or higher.

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Replace the following:

  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION_NAME: the Dataflow region name (for example, us-central1)
  • SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
  • BIGQUERY_TABLE: the BigQuery output table name
  • DEADLETTER_TOPIC: the Pub/Sub topic to use for the dead-letter queue
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Run from the REST API

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • LOCATION: the Dataflow region name (for example, us-central1)
  • SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
  • BIGQUERY_TABLE: the BigQuery output table name
  • DEADLETTER_TOPIC: the Pub/Sub topic to use for the dead-letter queue
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.

Requirements for this pipeline:

  • The source Pub/Sub subscription must exist prior to execution.
  • The destination Pub/Sub topic must exist prior to execution.

Template parameters

Parameter Description
inputSubscription Pub/Sub subscription to read the input from. For example, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Cloud Pub/Sub topic to write the output to. For example, projects/<project-id>/topics/<topic-name>.
filterKey [Optional] Filter events based on an attribute key. No filters are applied if filterKey is not specified.
filterValue [Optional] Filter attribute value to use in case a filterKey is provided. A null filterValue is used by default.

Running the Pub/Sub to Pub/Sub template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub to Pub/Sub template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOPIC_NAME: the Pub/Sub topic name
  • FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
  • FILTER_VALUE: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (e.g. substring) will not be filtered. A null event filter value is used by default.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOPIC_NAME: the Pub/Sub topic name
  • FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
  • FILTER_VALUE: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (e.g. substring) will not be filtered. A null event filter value is used by default.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

The Pub/Sub to Splunk template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the message payload to Splunk via Splunk's HTTP Event Collector (HEC). Before writing to Splunk, you can also apply a JavaScript user-defined function to the message payload. Any messages that experience processing failures are forwarded to a Pub/Sub dead-letter topic for further troubleshooting and reprocessing.

As an extra layer of protection for your HEC token, you can also pass in a Cloud KMS key along with the base64-encoded HEC token parameter encrypted with the Cloud KMS key. See the Cloud KMS API encryption endpoint for additional details on encrypting your HEC token parameter.

Requirements for this pipeline:

  • The source Pub/Sub subscription must exist prior to running the pipeline.
  • The Pub/Sub dead-letter topic must exist prior to running the pipeline.
  • The Splunk HEC endpoint must be accessible from the Dataflow workers' network.
  • The Splunk HEC token must be generated and available.

Template parameters

Parameter Description
inputSubscription The Pub/Sub subscription from which to read the input. For example, projects/<project-id>/subscriptions/<subscription-name>.
token The Splunk HEC authentication token. This base64-encoded string can be encrypted with a Cloud KMS key for additional security.
url The Splunk HEC url. This must be routable from the VPC in which the pipeline runs. For example, https://splunk-hec-host:8088.
outputDeadletterTopic The Pub/Sub topic to forward undeliverable messages. For example, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Optional] The Cloud Storage path that contains all your JavaScript code. For example, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Optional] The name of the JavaScript function to be called. For example, if your JavaScript function is function myTransform(inJson) { ...dostuff...} then the function name is myTransform.
batchCount [Optional] The batch size for sending multiple events to Splunk. Default 1 (no batching).
parallelism [Optional] The maximum number of parallel requests. Default 1 (no parallelism).
disableCertificateValidation [Optional] Disable SSL certificate validation. Default false (validation enabled).
includePubsubMessage [Optional] Include the full Pub/Sub message in the payload. Default false (only the data element is included in the payload).
tokenKMSEncryptionKey [Optional] The Cloud KMS key to decrypt the HEC token string. If the Cloud KMS key is provided, the HEC token string must be passed in encrypted.

Running the Pub/Sub to Splunk template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub to Splunk template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOKEN: Splunk's Http Event Collector token
  • URL: the URL path for Splunk's Http Event Collector (for example, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
  • JAVASCRIPT_FUNCTION: your JavaScript function name
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage path to the .js file containing your JavaScript code (for example, gs://your-bucket/your-function.js)
  • BATCH_COUNT: the batch size to use for sending multiple events to Splunk
  • PARALLELISM: the number of parallel requests to use for sending events to Splunk
  • DISABLE_VALIDATION: true if you want to disable SSL certificate validation
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOKEN: Splunk's Http Event Collector token
  • URL: the URL path for Splunk's Http Event Collector (for example, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
  • JAVASCRIPT_FUNCTION: your JavaScript function name
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage path to the .js file containing your JavaScript code (for example, gs://your-bucket/your-function.js)
  • BATCH_COUNT: the batch size to use for sending multiple events to Splunk
  • PARALLELISM: the number of parallel requests to use for sending events to Splunk
  • DISABLE_VALIDATION: true if you want to disable SSL certificate validation
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket.

Requirements for this pipeline:

  • The input Pub/Sub topic must exist prior to pipeline execution.

Template parameters

Parameter Description
inputTopic Cloud Pub/Sub Topic to subscribe for message consumption. The topic name should be in the format of projects/<project-id>/topics/<topic-name>.
outputDirectory Output Directory where output Avro Files will be archived. Add / at the end. For example: gs://example-bucket/example-directory/.
avroTempDirectory Directory for temporary Avro Files. Please add / at the end. For eg: gs://example-bucket/example-directory/.
outputFilenamePrefix [Optional] Output Filename Prefix for the Avro Files.
outputFilenameSuffix [Optional] Output Filename Suffix for the Avro Files.
outputShardTemplate [Optional] The shard template of the output file. Specified as repeating sequences of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the shard number, or number of shards respectively. Default Template Format is 'W-P-SS-of-NN' when this parameter is not specified.
numShards [Optional] The maximum number of output shards produced when writing. Default maximum number of shards is 1.

Running the Pub/Sub to Cloud Storage Avro template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub to Cloud Storage Avro template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template
  • NUM_SHARDS: the number of output shards
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template
  • NUM_SHARDS: the number of output shards
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub to Text Files on Cloud Storage

The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.

Requirements for this pipeline:

  • The Pub/Sub topic must exist prior to execution.
  • The messages published to the topic must be in text format.
  • The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.

Template parameters

Parameter Description
inputTopic The Pub/Sub topic to read the input from. The topic name should be in the format projects/<project-id>/topics/<topic-name>.
outputDirectory The path and filename prefix for writing output files. For example, gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix The prefix to place on each windowed file. For example, output-
outputFilenameSuffix The suffix to place on each windowed file, typically a file extension such as .txt or .csv.
outputShardTemplate The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data will land into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate will be 00-of-01.

Running the Pub/Sub to Text Files on Cloud Storage template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Pub/Sub to Text Files on Cloud Storage template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery deadletter table along with input message. The pipeline automatically creates the deadletter table if the table does not exist prior to execution.

Requirements for this pipeline:

  • The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.
  • The MongoDB cluster must exist and should be acccessible from the Dataflow worker machines.

Template parameters

Parameter Description
inputSubscription Name of the Pub/Sub subscription. For example: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri Comma separated list of MongoDB servers. For example: 192.285.234.12:27017,192.287.123.11:27017
database Database in MongoDB to store the collection. For example: my-db.
collection Name of the collection inside MongoDB database. For example: my-collection.
deadletterTable BigQuery table that store messages due to failures (mismatched schema, malformed json etc). For example: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath [Optional] Cloud Storage location of JavaScript file contating UDF transform. For example: gs://mybucket/filename.json.
javascriptTextTransformFunctionName [Optional] Name of JavaScript UDF. For example: transform.
batchSize [Optional] Batch size used for batch insertion of documents into MongoDB. Default: 1000.
batchSizeBytes [Optional] Batch size in bytes. Default: 5242880.
maxConnectionIdleTime [Optional] Maximum idle time allowed in seconds before connection time out occurs. Default: 60000.
sslEnabled [Optional] Boolean value indicating whether connection to MongoDB is SSL enabled. Default: true.
ignoreSSLCertificate [Optional] Boolean value indicating if SSL certifcate should be ignored. Default: true.
withOrdered [Optional] Boolean value enabling ordered bulk insertions into MongoDB. Default: true.
withSSLInvalidHostNameAllowed [Optional] Boolean value indicating if invalid host name is allowed for SSL connection. Default: true.

Running the Pub/Sub to MongoDB template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select Pub/Sub to MongoDB template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 284.0.0 or higher.

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Replace the following:

  • PROJECT_ID: your project ID
  • REGION_NAME: Dataflow region name (for example, us-central1)
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: the name of the MongoDB database (for example, users)
  • COLLECTION: the name of the MongoDB collection (for example, profiles)
  • UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Run from the REST API

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • LOCATION: Dataflow region name (for example, us-central1)
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: the name of the MongoDB database (for example, users)
  • COLLECTION: the name of the MongoDB collection (for example, profiles)
  • UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Text Files on Cloud Storage to BigQuery (Stream)

The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and output the result to BigQuery.

Requirements for this pipeline:

  • Create a JSON formatted BigQuery schema file that describes your output table.
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • Create a JavaScript (.js) file with your UDF function that supplies the logic to transform the lines of text. Note that your function must return a JSON string.

    For example, this function splits each line of a CSV file and returns a JSON string after transforming the values.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Template parameters

Parameter Description
javascriptTextTransformGcsPath Cloud Storage location of your JavaScript UDF. For example: gs://my_bucket/my_function.js.
JSONPath Cloud Storage location of your BigQuery schema file, described as a JSON. For example: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName The name of the JavaScript function you wish to call as your UDF. For example: transform.
outputTable The fully qualified BigQuery table. For example: my-project:dataset.table
inputFilePattern Cloud Storage location of the text you'd like to process. For example: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Temporary directory for BigQuery loading process. For example: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table for messages that failed to reach the output table. For example: my-project:dataset.my-unprocessed-table. If it doesn't exist, it will be created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.

Running the Cloud Storage Text to BigQuery (Stream) template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Storage Text to BigQuery template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: the name of your UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage path to the .js file containing your JavaScript code
  • PATH_TO_TEXT_DATA: the Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • BIGQUERY_UNPROCESSED_TABLE: the name of your BigQuery table for unprocessed messages
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: the name of your UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage path to the .js file containing your JavaScript code
  • PATH_TO_TEXT_DATA: the Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • BIGQUERY_UNPROCESSED_TABLE: the name of your BigQuery table for unprocessed messages
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Text Files on Cloud Storage to Pub/Sub (Stream)

This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. You can use this template to replay data to Pub/Sub.

Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time will be equal to the publishing time during execution. If your pipeline relies on an accurate event time for processing, you should not use this pipeline.

Requirements for this pipeline:

  • Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, as each line within the files will be published as a message to Pub/Sub.
  • The Pub/Sub topic must exist prior to execution.
  • The pipeline runs indefinitely and needs to be terminated manually.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/files/*.json or gs://bucket-name/path/*.csv.
outputTopic The Pub/Sub input topic to write to. The name should be in the format of projects/<project-id>/topics/<topic-name>.

Running the Text Files on Cloud Storage to Pub/Sub (Stream) template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Text Files on Cloud Storage to Pub/Sub (Stream) template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket (for example, path/*.csv)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

Replace the following:

  • PROJECT_ID: your project ID
  • JOB_NAME: a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • REGION: the regional endpoint (for example, us-west1)
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket (for example, path/*.csv)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream)

The Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery template is a streaming pipeline that reads csv files from a Cloud Storage bucket, calls the Cloud Data Loss Prevention (Cloud DLP) API for de-identification, and writes the de-identified data into the specified BigQuery table. This template supports using both a Cloud DLP inspection template and a Cloud DLP de-identification template . This allows users to inspect for potentially sensitive information and de-identify, as well as de-identify structured data where columns are specified to be de-identified and no inspection is needed.

Requirements for this pipeline:

  • The input data to tokenize must exist
  • The Cloud DLP Templates must exist (for example, DeidentifyTemplate and InspectTemplate). See Cloud DLP templates for more details.
  • The BigQuery dataset must exist

Template parameters

Parameter Description
inputFilePattern The csv file(s) to read input data records from. Wildcarding is also accepted. For example, gs://mybucket/my_csv_filename.csv or gs://mybucket/file-*.csv.
dlpProjectId Cloud DLP project ID that owns the Cloud DLP API resource. This Cloud DLP project can be the same project that owns the Cloud DLP templates, or it can be a separate project. For example, my_dlp_api_project.
deidentifyTemplateName Cloud DLP deidentification template to use for API requests, specified with the pattern projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. For example, projects/my_project/deidentifyTemplates/100.
datasetName BigQuery dataset for sending tokenized results.
batchSize Chunking/Batch size for sending data to inspect and/or detokenize. In the case of a csv file, batchSize is the number of rows in a batch. Users must determine the batch size based on the size of the records and the sizing of the file. Note that the Cloud DLP API has a payload size limit of 524 KB per API call.
inspectTemplateName [Optional] Cloud DLP inspection template to use for API requests, specified with the pattern projects/{template_project_id}/identifyTemplates/{idTemplateId}. For example, projects/my_project/identifyTemplates/100.

Running the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

You must replace the following values in this example:

  • Replace YOUR_TEMPLATE_PROJECT_ID with your template project ID.
  • Replace YOUR_DLP_API_PROJECT_ID with your Cloud DLP API project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_INPUT_DATA with your input file path.
  • Replace YOUR_DEIDENTIFY_TEMPLATE with the Cloud DLPDeidentify Template number.
  • Replace YOUR_DATASET_NAME with the BigQuery dataset name.
  • Replace YOUR_INSPECT_TEMPLATE with the Cloud DLPInspect Template number
  • Replace BATCH_SIZE_VALUE with the batch size (# of rows per API for csv's).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_TEMPLATE_PROJECT_ID with your template project ID.
  • Replace YOUR_DLP_API_PROJECT_ID with your Cloud DLP API project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_INPUT_DATA with your input file path.
  • Replace YOUR_DEIDENTIFY_TEMPLATE with the Cloud DLPDeidentify Template number.
  • Replace YOUR_DATASET_NAME with the BigQuery dataset name.
  • Replace YOUR_INSPECT_TEMPLATE with the Cloud DLPInspect Template number
  • Replace BATCH_SIZE_VALUE with the batch size (# of rows per API for csv's).
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

The Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub template is a streaming pipeline that reads Pub/Sub messages with change data from a MySQL database and writes the records to BigQuery. A Debezium connector captures changes to the MySQL database and publishes the changed data to Pub/Sub. The template then reads the Pub/Sub messages and writes them to BigQuery.

You can use this template to sync MySQL databases and BigQuery tables. The pipeline writes the changed data to a BigQuery staging table and intermittently updates a BigQuery table replicating the MySQL database.

Requirements for this pipeline:

  • The Debezium connector must be deployed.
  • The Pub/Sub messages must be serialized in a Beam Row.

Template parameters

Parameter Description
inputSubscriptions The comma-separated list of Pub/Sub input subscriptions to read from, in the format of <subscription>,<subscription>, ...
changeLogDataset The BigQuery dataset to store the staging tables, in the format of <my-dataset>
replicaDataset The location of the BigQuery dataset to store the replica tables, in the format of <my-dataset>
Optional: updateFrequencySecs The interval at which the pipeline updates the BigQuery table replicating the MySQL database.

Running the Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery template

To run this template, perform the following steps:

  1. On your local machine, clone the DataflowTemplates repository.
  2. Change to the v2/cdc-parent directory.
  3. Ensure that the Debezium connector is deployed.
  4. Using Maven, run the Dataflow template.

    You must replace the following values in this example:

    • Replace PROJECT_ID with your project ID.
    • Replace YOUR_SUBSCRIPTIONS with your comma-separated list of Pub/Sub subscription names.
    • Replace YOUR_CHANGELOG_DATASET with your BigQuery dataset for changelog data, and replace YOUR_REPLICA_DATASET with your BigQuery dataset for replica tables.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka to BigQuery

The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. If the errors table does not exist prior to execution, then it is created.

Requirements for this pipeline

  • The output BigQuery table must exist.
  • The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
  • The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format.

Template parameters

Parameter Description
outputTableSpec The BigQuery output table location to write the Apache Kafka messages to, in the format of my-project:dataset.table
inputTopics The Apache Kafka input topics to read from in a comma-separated list. For example: messages
bootstrapServers The host address of the running Apache Kafka broker servers in a comma-separated list, each host address in the format of 35.70.252.199:9092
javascriptTextTransformGcsPath (Optional) Cloud Storage location path to the JavaScript UDF. For example: gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (Optional) The name of the JavaScript to call as your UDF. For example: transform
outputDeadletterTable (Optional) The BigQuery output table location to write deadletter records to, in the format of my-project:dataset.my-deadletter-table. If it doesn't exist, the table is created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.

Running the Apache Kafka to BigQuery template

CONSOLE

Run from the Google Cloud Console
  1. Go to the Dataflow page in the Cloud Console.
  2. Go to the Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Apache Kafka to BigQuery template from the Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud tool command-line tool

Note: To use the gcloud tool command-line tool to run templates, you must have Cloud SDK version 284.0.0 or higher.

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_JAVASCRIPT_FUNCTION with the name of your UDF.
  • Replace REGION_NAME with the Dataflow region name. For example: us-central1.
  • Replace BIGQUERY_TABLE with your BigQuery table name.
  • Replace KAFKA_TOPICS with the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
  • Replace PATH_TO_JAVASCRIPT_UDF_FILE with the Cloud Storage path to the .js file containing your JavaScript code.
  • Replace YOUR_JAVASCRIPT_FUNCTION with the name of your UDF.
  • Replace KAFKA_SERVER_ADDRESSES with the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, follow instructions on how to escape commas.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Run from the REST API

When running this template, you need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_JAVASCRIPT_FUNCTION with the name of your UDF.
  • Replace LOCATION with the Dataflow region name. For example: us-central1.
  • Replace BIGQUERY_TABLE with your BigQuery table name.
  • Replace KAFKA_TOPICS with the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
  • Replace PATH_TO_JAVASCRIPT_UDF_FILE with the Cloud Storage path to the .js file containing your JavaScript code.
  • Replace YOUR_JAVASCRIPT_FUNCTION with the name of your UDF.
  • Replace KAFKA_SERVER_ADDRESSES with the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, follow instructions on how to escape commas.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}