Google-provided streaming templates

Google provides a set of open-source Dataflow templates. For general information about templates, see Dataflow templates. For a list of all Google-provided templates, see Get started with Google-provided templates.

This guide documents streaming templates.

Pub/Sub Subscription to BigQuery

The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.

Requirements for this pipeline:

  • The Pub/Sub messages must be in JSON format, described here. For example, messages formatted as {"k1":"v1", "k2":"v2"} may be inserted into a BigQuery table with two columns, named k1 and k2, with string data type.
  • The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.

Template parameters

Parameter Description
inputSubscription The Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription>.
outputTableSpec The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>
outputDeadletterTable The BigQuery table for messages that failed to reach the output table, in the format of <my-project>:<my-dataset>.<my-table>. If it doesn't exist, it is created during pipeline execution. If not specified, OUTPUT_TABLE_SPEC_error_records is used instead.
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

Running the Pub/Sub Subscription to BigQuery template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub Subscription to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name

Pub/Sub Topic to BigQuery

The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.

Requirements for this pipeline:

  • The Pub/Sub messages must be in JSON format, described here. For example, messages formatted as {"k1":"v1", "k2":"v2"} may be inserted into a BigQuery table with two columns, named k1 and k2, with string data type.
  • The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.

Template parameters

Parameter Description
inputTopic The Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic>.
outputTableSpec The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>
outputDeadletterTable The BigQuery table for messages that failed to reach the output table. It should be in <my-project>:<my-dataset>.<my-table> format. If it doesn't exist, it is created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

Running the Pub/Sub Topic to BigQuery template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub Topic to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name

Pub/Sub Avro to BigQuery

The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.

Requirements for this pipeline

  • The input Pub/Sub subscription must exist.
  • The schema file for the Avro records must exist on Cloud Storage.
  • The unprocessed Pub/Sub topic must exist.
  • The output BigQuery dataset must exist.

Template parameters

Parameter Description
schemaPath The Cloud Storage location of the Avro schema file. For example, gs://path/to/my/schema.avsc.
inputSubscription The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec The BigQuery output table location. For example, <my-project>:<my-dataset>.<my-table>. Depending on the createDisposition specified, the output table may be created automatically using the user provided Avro schema.
writeDisposition (Optional) The BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND
createDisposition (Optional) The BigQuery CreateDisposition. For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED

Running the Pub/Sub Avro to BigQuery template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub Avro to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
  • BIGQUERY_TABLE: the BigQuery output table name
  • DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
  • BIGQUERY_TABLE: the BigQuery output table name
  • DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue

Pub/Sub to Pub/Sub

The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.

Requirements for this pipeline:

  • The source Pub/Sub subscription must exist prior to execution.
  • The destination Pub/Sub topic must exist prior to execution.

Template parameters

Parameter Description
inputSubscription Pub/Sub subscription to read the input from. For example, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Cloud Pub/Sub topic to write the output to. For example, projects/<project-id>/topics/<topic-name>.
filterKey (Optional) Filter events based on an attribute key. No filters are applied if filterKey is not specified.
filterValue (Optional) Filter attribute value to use in case a filterKey is provided. A null filterValue is used by default.

Running the Pub/Sub to Pub/Sub template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Pub/Sub template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOPIC_NAME: the Pub/Sub topic name
  • FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
  • FILTER_VALUE: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (e.g. substring) will not be filtered. A null event filter value is used by default.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOPIC_NAME: the Pub/Sub topic name
  • FILTER_KEY: the attribute key by which events are filtered. No filters are applied if no key is specified.
  • FILTER_VALUE: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (e.g. substring) will not be filtered. A null event filter value is used by default.

Pub/Sub to Splunk

The Pub/Sub to Splunk template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the message payload to Splunk via Splunk's HTTP Event Collector (HEC). The most common use case of this template is to export logs to Splunk. To see an example of the underlying workflow, see Deploying production-ready log exports to Splunk using Dataflow.

Before writing to Splunk, you can also apply a JavaScript user-defined function to the message payload. Any messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further troubleshooting and reprocessing.

As an extra layer of protection for your HEC token, you can also pass in a Cloud KMS key along with the base64-encoded HEC token parameter encrypted with the Cloud KMS key. See the Cloud KMS API encryption endpoint for additional details on encrypting your HEC token parameter.

Requirements for this pipeline:

  • The source Pub/Sub subscription must exist prior to running the pipeline.
  • The Pub/Sub unprocessed topic must exist prior to running the pipeline.
  • The Splunk HEC endpoint must be accessible from the Dataflow workers' network.
  • The Splunk HEC token must be generated and available.

Template parameters

Parameter Description
inputSubscription The Pub/Sub subscription from which to read the input. For example, projects/<project-id>/subscriptions/<subscription-name>.
token The Splunk HEC authentication token. This base64-encoded string can be encrypted with a Cloud KMS key for additional security.
url The Splunk HEC url. This must be routable from the VPC in which the pipeline runs. For example, https://splunk-hec-host:8088.
outputDeadletterTopic The Pub/Sub topic to forward undeliverable messages. For example, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.
batchCount (Optional) The batch size for sending multiple events to Splunk. Default 1 (no batching).
parallelism (Optional) The maximum number of parallel requests. Default 1 (no parallelism).
disableCertificateValidation (Optional) Disable SSL certificate validation. Default false (validation enabled).
includePubsubMessage (Optional) Include the full Pub/Sub message in the payload. Default false (only the data element is included in the payload).
tokenKMSEncryptionKey (Optional) The Cloud KMS key to decrypt the HEC token string. If the Cloud KMS key is provided, the HEC token string must be passed in encrypted.

Running the Pub/Sub to Splunk template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Splunk template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOKEN: Splunk's Http Event Collector token
  • URL: the URL path for Splunk's Http Event Collector (for example, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: the batch size to use for sending multiple events to Splunk
  • PARALLELISM: the number of parallel requests to use for sending events to Splunk
  • DISABLE_VALIDATION: true if you want to disable SSL certificate validation

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name
  • TOKEN: Splunk's Http Event Collector token
  • URL: the URL path for Splunk's Http Event Collector (for example, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: the Pub/Sub topic name
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: the batch size to use for sending multiple events to Splunk
  • PARALLELISM: the number of parallel requests to use for sending events to Splunk
  • DISABLE_VALIDATION: true if you want to disable SSL certificate validation

Pub/Sub to Avro Files on Cloud Storage

The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket.

Requirements for this pipeline:

  • The input Pub/Sub topic must exist prior to pipeline execution.

Template parameters

Parameter Description
inputTopic Pub/Sub topic to subscribe for message consumption. The topic name must be in the format of projects/<project-id>/topics/<topic-name>.
outputDirectory Output directory where output Avro files are archived. Must contain / at the end. For example: gs://example-bucket/example-directory/.
avroTempDirectory Directory for temporary Avro files. Must contain / at the end. For example: gs://example-bucket/example-directory/.
outputFilenamePrefix (Optional) Output filename prefix for the Avro files.
outputFilenameSuffix (Optional) Output filename suffix for the Avro files.
outputShardTemplate (Optional) The shard template of the output file. It is specified as repeating sequences of the letters S or N. For example, SSS-NNN. These are replaced with either the shard number or the total number of shards, respectively. When this parameter is not specified, the default template format is W-P-SS-of-NN.

Running the Pub/Sub to Cloud Storage Avro template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Avro Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files; for example, gs://your-bucket/temp
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files; for example, gs://your-bucket/temp
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template

Pub/Sub to Text Files on Cloud Storage

The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.

Requirements for this pipeline:

  • The Pub/Sub topic must exist prior to execution.
  • The messages published to the topic must be in text format.
  • The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.

Template parameters

Parameter Description
inputTopic The Pub/Sub topic to read the input from. The topic name should be in the format projects/<project-id>/topics/<topic-name>.
outputDirectory The path and filename prefix for writing output files. For example, gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix The prefix to place on each windowed file. For example, output-
outputFilenameSuffix The suffix to place on each windowed file, typically a file extension such as .txt or .csv.
outputShardTemplate The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data will land into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate will be 00-of-01.

Running the Pub/Sub to Text Files on Cloud Storage template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Text Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket

Pub/Sub to MongoDB

The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. If a table for unprocessed records does not exist prior to execution, the pipeline automatically creates this table.

Requirements for this pipeline:

  • The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.
  • The MongoDB cluster must exist and should be acccessible from the Dataflow worker machines.

Template parameters

Parameter Description
inputSubscription Name of the Pub/Sub subscription. For example: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Comma separated list of MongoDB servers. For example: 192.285.234.12:27017,192.287.123.11:27017
database Database in MongoDB to store the collection. For example: my-db.
collection Name of the collection inside MongoDB database. For example: my-collection.
deadletterTable BigQuery table that store messages due to failures (mismatched schema, malformed json etc). For example: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.
batchSize (Optional) Batch size used for batch insertion of documents into MongoDB. Default: 1000.
batchSizeBytes (Optional) Batch size in bytes. Default: 5242880.
maxConnectionIdleTime (Optional) Maximum idle time allowed in seconds before connection time out occurs. Default: 60000.
sslEnabled (Optional) Boolean value indicating whether connection to MongoDB is SSL enabled. Default: true.
ignoreSSLCertificate (Optional) Boolean value indicating if SSL certifcate should be ignored. Default: true.
withOrdered (Optional) Boolean value enabling ordered bulk insertions into MongoDB. Default: true.
withSSLInvalidHostNameAllowed (Optional) Boolean value indicating if invalid host name is allowed for SSL connection. Default: true.

Running the Pub/Sub to MongoDB template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to MongoDB template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: the name of the MongoDB database (for example, users)
  • COLLECTION: the name of the MongoDB collection (for example, profiles)
  • UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: the name of the MongoDB database (for example, users)
  • COLLECTION: the name of the MongoDB collection (for example, profiles)
  • UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)

Pub/Sub to Elasticsearch

The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. The Dataflow template uses Elasticsearch's data streams feature to store time series data across multiple indices while giving you a single named resource for requests. Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.

Requirements for this pipeline

  • The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.
  • A publicly reachable Elasticsearch host on a GCP instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See Google Cloud Integration for Elastic for more details.
  • A Pub/Sub topic for error output.

Template parameters

Parameter Description
inputSubscription The Pub/Sub subscription to consume from. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud.
apiKey Base64 Encoded API key used for authentication.
errorOutputTopic Pub/Sub output topic for publishing failed records in the format of projects/<project-id>/topics/<topic-name>
dataset (Optional) The type of logs sent via Pub/Sub, for which we have an out-of-the box dashboard. Known log types values are audit, vpcflow, and firewall. Default: pubsub.
namespace (Optional) An arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. Default: default.
batchSize (Optional) Batch size in number of documents. Default: 1000.
batchSizeBytes (Optional) Batch size in number of bytes. Default: 5242880 (5mb).
maxRetryAttempts (Optional) Max retry attempts, must be > 0. Default: no retries.
maxRetryDuration (Optional) Max retry duration in milliseconds, must be > 0. Default: no retries.
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

Running the Pub/Sub to Elasticsearch template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Elasticsearch template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • ERROR_OUTPUT_TOPIC: your Pub/Sub topic for error output
  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • CONNECTION_URL: your Elasticsearch URL
  • DATASET: your log type
  • NAMESPACE: your namespace for dataset
  • APIKEY: your base64 encoded API key for authentication

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • ERROR_OUTPUT_TOPIC: your Pub/Sub topic for error output
  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • CONNECTION_URL: your Elasticsearch URL
  • DATASET: your log type
  • NAMESPACE: your namespace for dataset
  • APIKEY: your base64 encoded API key for authentication

Datastream to Cloud Spanner

The Datastream to Cloud Spanner template is a streaming pipeline that reads Datastream events from a Cloud Storage bucket and writes them to a Cloud Spanner database. It is intended for data migration from Datastream sources to Cloud Spanner.

All tables required for migration must exist in the destination Cloud Spanner database prior to template execution. Hence schema migration from a source database to destination Cloud Spanner must be completed prior to data migration. Data can exist in the tables prior to migration. This template does not propagate Datastream schema changes to the Cloud Spanner database.

Data consistency is guaranteed only at the end of migration when all data has been written to Cloud Spanner. To store ordering information for each record written to Cloud Spanner, this template creates an additional table (called a shadow table) for each table in the Cloud Spanner database. This is used to ensure consistency at the end of migration. The shadow tables are not deleted after migration and can be used for validation purposes at the end of migration.

Any errors that occur during operation, such as schema mismatches, malformed JSON files, or errors resulting from executing transforms, are recorded in an error queue. The error queue is a Cloud Storage folder which stores all the Datastream events that had encountered errors along with the error reason in text format. The errors can be transient or permanent and are stored in appropriate Cloud Storage folders in the error queue. The transient errors are retried automatically while the permanent errors are not. In case of permanent errors, you have the option of making corrections to the change events and moving them to the retriable bucket while the template is running.

Requirements for this pipeline:

  • A Datastream stream in Running or Not started state.
  • A Cloud Storage bucket where Datastream events are replicated.
  • A Cloud Spanner database with existing tables. These tables can be empty or contain data.

Template parameters

Parameter Description
inputFilePattern The file location for Datastream files in Cloud Storage to replicate. Typically, this is the root path for a stream.
streamName The name or template for the stream to poll for schema information and source type.
instanceId The Cloud Spanner instance where the changes are replicated.
databaseId The Cloud Spanner database where the changes are replicated.
projectId The Cloud Spanner project ID.
deadLetterQueueDirectory (Optional) This is the file path to store the error queue output. Default is a directory under the Dataflow job's temp location.
inputFileFormat (Optional) The format of the output file produced by Datastream. For example avro,json. Default, avro.
shadowTablePrefix (Optional) The prefix used to name shadow tables. Default: shadow_.

Running the Datastream to Cloud Spanner template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Datastream to Spanner template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: your Cloud Spanner instance.
  • CLOUDSPANNER_DATABASE: your Cloud Spanner database.
  • DLQ: the Cloud Storage path for the error queue directory.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner",
   }
}
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: your Cloud Spanner instance.
  • CLOUDSPANNER_DATABASE: your Cloud Spanner database.
  • DLQ: the Cloud Storage path for the error queue directory.

Text Files on Cloud Storage to BigQuery (Stream)

The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.

The pipeline runs indefinitely and needs to be terminated manually via a cancel and not a drain, due to its use of the Watch transform, which is a splittable DoFn that does not support draining.

Requirements for this pipeline:

  • Create a JSON file that describes the schema of your output table in BigQuery.

    Ensure that there is a top-level JSON array titled BigQuery Schema and that its contents follow the pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. For example:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Create a JavaScript (.js) file with your UDF function that supplies the logic to transform the lines of text. Note that your function must return a JSON string.

    For example, this function splits each line of a CSV file and returns a JSON string after transforming the values.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Template parameters

Parameter Description
javascriptTextTransformGcsPath The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
JSONPath Cloud Storage location of your BigQuery schema file, described as a JSON. For example: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.
outputTable The fully qualified BigQuery table. For example: my-project:dataset.table
inputFilePattern Cloud Storage location of the text you'd like to process. For example: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Temporary directory for BigQuery loading process. For example: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table for messages that failed to reach the output table. For example: my-project:dataset.my-unprocessed-table. If it doesn't exist, it will be created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.

Running the Cloud Storage Text to BigQuery (Stream) template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Text Files on Cloud Storage to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: the Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • BIGQUERY_UNPROCESSED_TABLE: the name of your BigQuery table for unprocessed messages
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: the Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • BIGQUERY_UNPROCESSED_TABLE: the name of your BigQuery table for unprocessed messages
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory

Text Files on Cloud Storage to Pub/Sub (Stream)

This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. You can use this template to replay data to Pub/Sub.

The pipeline runs indefinitely and needs to be terminated manually via a 'cancel' and not a 'drain', due to its use of the 'Watch' transform, which is a 'SplittableDoFn' that does not support draining.

Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time will be equal to the publishing time during execution. If your pipeline relies on an accurate event time for processing, you should not use this pipeline.

Requirements for this pipeline:

  • Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, as each line within the files will be published as a message to Pub/Sub.
  • The Pub/Sub topic must exist prior to execution.
  • The pipeline runs indefinitely and needs to be terminated manually.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/files/*.json or gs://bucket-name/path/*.csv.
outputTopic The Pub/Sub input topic to write to. The name should be in the format of projects/<project-id>/topics/<topic-name>.

Running the Text Files on Cloud Storage to Pub/Sub (Stream) template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket (for example, path/*.csv)

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket (for example, path/*.csv)

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)

The Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template is a streaming pipeline that reads csv files from a Cloud Storage bucket, calls the Cloud Data Loss Prevention (Cloud DLP) API for de-identification, and writes the de-identified data into the specified BigQuery table. This template supports using both a Cloud DLP inspection template and a Cloud DLP de-identification template . This allows users to inspect for potentially sensitive information and de-identify, as well as de-identify structured data where columns are specified to be de-identified and no inspection is needed.

Requirements for this pipeline:

  • The input data to tokenize must exist
  • The Cloud DLP Templates must exist (for example, DeidentifyTemplate and InspectTemplate). See Cloud DLP templates for more details.
  • The BigQuery dataset must exist

Template parameters

Parameter Description
inputFilePattern The csv file(s) to read input data records from. Wildcarding is also accepted. For example, gs://mybucket/my_csv_filename.csv or gs://mybucket/file-*.csv.
dlpProjectId Cloud DLP project ID that owns the Cloud DLP API resource. This Cloud DLP project can be the same project that owns the Cloud DLP templates, or it can be a separate project. For example, my_dlp_api_project.
deidentifyTemplateName Cloud DLP deidentification template to use for API requests, specified with the pattern projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. For example, projects/my_project/deidentifyTemplates/100.
datasetName BigQuery dataset for sending tokenized results.
batchSize Chunking/Batch size for sending data to inspect and/or detokenize. In the case of a csv file, batchSize is the number of rows in a batch. Users must determine the batch size based on the size of the records and the sizing of the file. Note that the Cloud DLP API has a payload size limit of 524 KB per API call.
inspectTemplateName (Optional) Cloud DLP inspection template to use for API requests, specified with the pattern projects/{template_project_id}/identifyTemplates/{idTemplateId}. For example, projects/my_project/identifyTemplates/100.

Running the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

Replace the following:

  • DLP_API_PROJECT_ID: your Cloud DLP API project ID
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • INPUT_DATA: your input file path
  • DEIDENTIFY_TEMPLATE: the Cloud DLPDeidentify Template number
  • DATASET_NAME: the BigQuery dataset name
  • INSPECT_TEMPLATE_NUMBER: the Cloud DLPInspect Template number
  • BATCH_SIZE_VALUE: the batch size (# of rows per API for csv's)

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • DLP_API_PROJECT_ID: your Cloud DLP API project ID
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • TEMP_LOCATION: the location for writing temporary files (for example, gs://your-bucket/temp)
  • INPUT_DATA: your input file path
  • DEIDENTIFY_TEMPLATE: the Cloud DLPDeidentify Template number
  • DATASET_NAME: the BigQuery dataset name
  • INSPECT_TEMPLATE_NUMBER: the Cloud DLPInspect Template number
  • BATCH_SIZE_VALUE: the batch size (# of rows per API for csv's)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

The Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub template is a streaming pipeline that reads Pub/Sub messages with change data from a MySQL database and writes the records to BigQuery. A Debezium connector captures changes to the MySQL database and publishes the changed data to Pub/Sub. The template then reads the Pub/Sub messages and writes them to BigQuery.

You can use this template to sync MySQL databases and BigQuery tables. The pipeline writes the changed data to a BigQuery staging table and intermittently updates a BigQuery table replicating the MySQL database.

Requirements for this pipeline:

  • The Debezium connector must be deployed.
  • The Pub/Sub messages must be serialized in a Beam Row.

Template parameters

Parameter Description
inputSubscriptions The comma-separated list of Pub/Sub input subscriptions to read from, in the format of <subscription>,<subscription>, ...
changeLogDataset The BigQuery dataset to store the staging tables, in the format of <my-dataset>
replicaDataset The location of the BigQuery dataset to store the replica tables, in the format of <my-dataset>
updateFrequencySecs (Optional) The interval at which the pipeline updates the BigQuery table replicating the MySQL database.

Ru