Google-provided streaming templates

Google provides a set of open-source Cloud Dataflow templates. For general information about templates, see the Overview page. For a list of all Google-provided templates, see the Get started with Google-provided templates page.

This page documents streaming templates:

Cloud Pub/Sub Subscription to BigQuery

The Cloud Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Cloud Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Cloud Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Cloud Pub/Sub and converts them to BigQuery elements.

Requirements for this pipeline:

  • The Cloud Pub/Sub messages must be in JSON format, described here. For example, messages formatted as {"k1":"v1", "k2":"v2"} may be inserted into a BigQuery table with two columns, named k1 and k2, with string data type.
  • The output table must exist prior to running the pipeline.

Template parameters

Parameter Description
inputSubscription The Cloud Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription>.
outputTableSpec The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>

Running the Cloud Pub/Sub Subscription to BigQuery template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Pub/Sub Subscription to BigQuery template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_SUBSCRIPTION_NAME with your Cloud Pub/Sub subscription name.
  • Replace YOUR_DATASET with your BigQuery dataset, and replace YOUR_TABLE_NAME with your BigQuery table name.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_SUBSCRIPTION_NAME with your Cloud Pub/Sub subscription name.
  • Replace YOUR_DATASET with your BigQuery dataset, and replace YOUR_TABLE_NAME with your BigQuery table name.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub Topic to BigQuery

The Cloud Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Cloud Pub/Sub topic and writes them to a BigQuery table. You can use the template as a quick solution to move Cloud Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Cloud Pub/Sub and converts them to BigQuery elements.

Requirements for this pipeline:

  • The Cloud Pub/Sub messages must be in JSON format, described here. For example, messages formatted as {"k1":"v1", "k2":"v2"} may be inserted into a BigQuery table with two columns, named k1 and k2, with string data type.
  • The output table must exist prior to pipeline execution.

Template parameters

Parameter Description
inputTopic The Cloud Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic>.
outputTableSpec The BigQuery output table location, in the format of <my-project>:<my-dataset>.<my-table>

Running the Cloud Pub/Sub Topic to BigQuery template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Pub/Sub Topic to BigQuery template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with your Cloud Pub/Sub topic name.
  • Replace YOUR_DATASET with your BigQuery dataset, and replace YOUR_TABLE_NAME with your BigQuery table name.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with your Cloud Pub/Sub topic name.
  • Replace YOUR_DATASET with your BigQuery dataset, and replace YOUR_TABLE_NAME with your BigQuery table name.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Pub/Sub

The Cloud Pub/Sub to Cloud Pub/Sub template is a streaming pipeline that reads messages from a Cloud Pub/Sub subscription and writes the messages to another Cloud Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Cloud Pub/Sub topic. You can use this template to copy messages from a Cloud Pub/Sub subscription to another Cloud Pub/Sub topic with an optional message filter.

Requirements for this pipeline:

  • The source Cloud Pub/Sub subscription must exist prior to execution.
  • The destination Cloud Pub/Sub topic must exist prior to execution.

Template parameters

Parameter Description
inputSubscription Cloud Pub/Sub subscription to read the input from. For example, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Cloud Pub/Sub topic to write the output to. For example, projects/<project-id>/topics/<topic-name>.
filterKey [Optional] Filter events based on an attribute key. No filters are applied if filterKey is not specified.
filterValue [Optional] Filter attribute value to use in case a filterKey is provided. A null filterValue is used by default.

Running the Cloud Pub/Sub to Cloud Pub/Sub template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Pub/Sub to Cloud Pub/Sub template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_SUBSCRIPTION_NAME with the Cloud Pub/Sub subscription name.
  • Replace YOUR_TOPIC_NAME with the Cloud Pub/Sub topic name.
  • Replace FILTER_KEY with the name of the attribute key to filter messages with.
  • Replace FILTER_VALUE with the attribute value.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_SUBSCRIPTION_NAME with the Cloud Pub/Sub subscription name.
  • Replace YOUR_TOPIC_NAME with the Cloud Pub/Sub topic name.
  • Replace FILTER_KEY with the name of the attribute key to filter messages with.
  • Replace FILTER_VALUE with the attribute value.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Storage Avro

The Cloud Pub/Sub to Cloud Storage Avro template is a streaming pipeline that reads data from a Cloud Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket. This pipeline supports optional user provided window duration to be used to perform windowed writes.

Requirements for this pipeline:

  • The input Cloud Pub/Sub topic must exist prior to pipeline execution.

Template parameters

Parameter Description
inputTopic Cloud Pub/Sub Topic to subscribe for message consumption. The topic name should be in the format of projects/<project-id>/topics/<topic-name>.
outputDirectory Output Directory where output Avro Files will be archived. Please add / at the end. For eg: gs://example-bucket/example-directory/.
avroTempDirectory Directory for temporary Avro Files. Please add / at the end. For eg: gs://example-bucket/example-directory/.
outputFilenamePrefix [Optional] Output Filename Prefix for the Avro Files.
outputFilenameSuffix [Optional] Output Filename Suffix for the Avro Files.
outputShardTemplate [Optional] The shard template of the output file. Specified as repeating sequences of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the shard number, or number of shards respectively. Default Template Format is 'W-P-SS-of-NN' when this parameter is not specified.
numShards [Optional] The maximum number of output shards produced when writing.Default maximum number of Shards is 1.
windowDuration [Optional] The window duration in which data will be written. Defaults to 5m. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h).

Running the Cloud Pub/Sub to Cloud Storage Avro template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Pub/Sub to Cloud Storage Avro template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with the Cloud Pub/Sub topic name.
  • Replace YOUR_BUCKET_NAME with the name of your Cloud Storage bucket.
  • Replace FILENAME_PREFIX with the preferred output filename prefix.
  • Replace FILENAME_SUFFIX with the preferred output filename suffix.
  • Replace SHARD_TEMPLATE with the preferred output shard template.
  • Replace NUM_SHARDS with the number of output shards.
  • Replace WINDOW_DURATION with the output window duration.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Avro \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
avroTempDirectory=gs://YOUR_BUCKET_NAME/temp/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
windowDuration=WINDOW_DURATION

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with the Cloud Pub/Sub topic name.
  • Replace YOUR_BUCKET_NAME with the name of your Cloud Storage bucket.
  • Replace FILENAME_PREFIX with the preferred output filename prefix.
  • Replace FILENAME_SUFFIX with the preferred output filename suffix.
  • Replace SHARD_TEMPLATE with the preferred output shard template.
  • Replace NUM_SHARDS with the number of output shards.
  • Replace WINDOW_DURATION with the output window duration.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "avroTempDirectory": "gs://YOUR_BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
       "windowDuration": "WINDOW_DURATION"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Storage Text

The Cloud Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Cloud Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Cloud Pub/Sub for future use. By default, the template generates a new file every 5 minutes.

Requirements for this pipeline:

  • The Cloud Pub/Sub topic must exist prior to execution.
  • The messages published to the topic must be in text format.
  • The messages published to the topic must not contain any newlines. Note that each Cloud Pub/Sub message is saved as a single line in the output file.

Template parameters

Parameter Description
inputTopic The Cloud Pub/Sub topic to read the input from. The topic name should be in the format projects/<project-id>/topics/<topic-name>.
outputDirectory The path and filename prefix for writing output files. For example, gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix The prefix to place on each windowed file. For example, output-
outputFilenameSuffix The suffix to place on each windowed file, typically a file extension such as .txt or .csv.
outputShardTemplate The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data will land into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate will be 00-of-01.

Running the Cloud Pub/Sub to Cloud Storage Text template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Pub/Sub to Cloud Storage Text template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with your Cloud Pub/Sub topic name.
  • Replace YOUR_BUCKET_NAME with the name of your Cloud Storage bucket.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with your Cloud Pub/Sub topic name.
  • Replace YOUR_BUCKET_NAME with the name of your Cloud Storage bucket.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to BigQuery (Stream)

The Cloud Storage Text to BigQuery (Stream) pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and output the result to BigQuery.

Requirements for this pipeline:

  • Create a JSON formatted BigQuery schema file that describes your output table.
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • Create a JavaScript (.js) file with your UDF function that supplies the logic to transform the lines of text. Note that your function must return a JSON string.

    For example, this function splits each line of a CSV file and returns a JSON string after transforming the values.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Template parameters

Parameter Description
javascriptTextTransformGcsPath Cloud Storage location of your JavaScriptt UDF. For example: gs://my_bucket/my_function.js.
JSONPath Cloud Storage location of your BigQuery schema file, described as a JSON. For example: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName The name of the JavaScript function you wish to call as your UDF. For example: transform.
outputTable The fully qualified BigQuery table. For example: my-project:dataset.table
inputFilePattern Cloud Storage location of the text you'd like to process. For example: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Temporary directory for BigQuery loading process. For example: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table for messages failed to reach the output table(aka. Deadletter table). For example: my-project:dataset.my-deadletter-table. If it doesn't exist, it will be created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.

Running the Cloud Storage Text to BigQuery (Stream) template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Storage Text to BigQuery template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_JAVASCRIPT_FUNCTION with the name of your UDF.
  • Replace PATH_TO_BIGQUERY_SCHEMA_JSON with the Cloud Storage path to the JSON file containing the schema definition.
  • Replace PATH_TO_JAVASCRIPT_UDF_FILE with the Cloud Storage path to the .js file containing your JavaScript code.
  • Replace PATH_TO_YOUR_TEXT_DATA with your Cloud Storage path to your text dataset.
  • Replace BIGQUERY_TABLE with your BigQuery table name.
  • Replace BIGQUERY_DEADLETTER_TABLE with your BigQuery deadletter table name.
  • Replace PATH_TO_TEMP_DIR_ON_GCS with your Cloud Storage path to the temp directory.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_DEADLETTER_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_JAVASCRIPT_FUNCTION with the name of your UDF.
  • Replace PATH_TO_BIGQUERY_SCHEMA_JSON with the Cloud Storage path to the JSON file containing the schema definition.
  • Replace PATH_TO_JAVASCRIPT_UDF_FILE with the Cloud Storage path to the .js file containing your JavaScript code.
  • Replace PATH_TO_YOUR_TEXT_DATA with your Cloud Storage path to your text dataset.
  • Replace BIGQUERY_TABLE with your BigQuery table name.
  • Replace BIGQUERY_DEADLETTER_TABLE with your BigQuery deadletter table name.
  • Replace PATH_TO_TEMP_DIR_ON_GCS with your Cloud Storage path to the temp directory.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_YOUR_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_DEADLETTER_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Cloud Pub/Sub (Stream)

This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Cloud Pub/Sub topic. The template publishes records in a newline-delimited file containing JSON records or CSV file to a Cloud Pub/Sub topic for real-time processing. You can use this template to replay data to Cloud Pub/Sub.

Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time will be equal to the publishing time during execution. If your pipeline relies on an accurate event time for processing, you should not use this pipeline.

Requirements for this pipeline:

  • Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, as each line within the files will be published as a message to Cloud Pub/Sub.
  • The Cloud Pub/Sub topic must exist prior to execution.
  • The pipeline runs indefinitely and needs to be terminated manually.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/files/*.json.
outputTopic The Cloud Pub/Sub input topic to write to. The name should be in the format of projects/<project-id>/topics/<topic-name>.

Running the Cloud Storage Text to Cloud Pub/Sub (Stream) template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Cloud Storage Text to Cloud Pub/Sub (Stream) template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with your Cloud Pub/Sub topic name.
  • Replace YOUR_BUCKET_NAME with the name of your Cloud Storage bucket.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_TOPIC_NAME with your Cloud Pub/Sub topic name.
  • Replace YOUR_BUCKET_NAME with the name of your Cloud Storage bucket.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream)

The Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery template is a streaming pipeline that reads csv files from a Cloud Storage bucket, calls the Cloud Data Loss Prevention (Cloud DLP) API for de-identification, and writes the de-identified data into the specified BigQuery table. This template supports using both a Cloud DLP inspection template and a Cloud DLP de-identification template . This allows users to inspect for potentially sensitive information and de-identify, as well as de-identify structured data where columns are specified to be de-identified and no inspection is needed.

Requirements for this pipeline:

  • The input data to tokenize must exist
  • The Cloud DLP Templates must exist (for example, DeidentifyTemplate and InspectTemplate). See Cloud DLP templates for more details.
  • The BigQuery dataset must exist

Template parameters

Parameter Description
inputFilePattern The csv file(s) to read input data records from. Wildcarding is also accepted. For example, gs://mybucket/my_csv_filename.csv or gs://mybucket/file-*.csv.
dlpProjectId Cloud DLP project ID that owns the Cloud DLP API resource. This Cloud DLP project can be the same project that owns the Cloud DLP templates, or it can be a separate project. For example, my_dlp_api_project.
deidentifyTemplateName Cloud DLP deidentification template to use for API requests, specified with the pattern projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. For example, projects/my_project/deidentifyTemplates/100.
datasetName BigQuery dataset for sending tokenized results.
batchSize Chunking/Batch size for sending data to inspect and/or detokenize. In the case of a csv file, batchSize is the number of rows in a batch. Users must determine the batch size based on the size of the records and the sizing of the file. Note that the Cloud DLP API has a payload size limit of 524 KB per API call.
inspectTemplateName [Optional] Cloud DLP inspection template to use for API requests, specified with the pattern projects/{template_project_id}/identifyTemplates/{idTemplateId}. For example, projects/my_project/identifyTemplates/100.

Running the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery template

CONSOLE

Run from the Google Cloud Platform Console
  1. Go to the Cloud Dataflow page in the GCP Console.
  2. Go to the Cloud Dataflow page
  3. Click Create job from template.
  4. Cloud Platform Console Create Job From Template Button
  5. Select the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) template from the Cloud Dataflow template drop-down menu.
  6. Enter a job name in the Job Name field. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  7. Enter your parameter values in the provided parameter fields.
  8. Click Run Job.

GCLOUD

Run from the gcloud command-line tool

Note: To use the gcloud command-line tool to run templates, you must have Cloud SDK version 138.0.0 or higher.

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

You must replace the following values in this example:

  • Replace YOUR_TEMPLATE_PROJECT_ID with your template project ID.
  • Replace YOUR_DLP_API_PROJECT_ID with your Cloud DLP API project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_INPUT_DATA with your input file path.
  • Replace YOUR_DEIDENTIFY_TEMPLATE with the Cloud DLPDeidentify Template number.
  • Replace YOUR_DATASET_NAME with the BigQuery dataset name.
  • Replace YOUR_INSPECT_TEMPLATE with the Cloud DLPInspect Template number
  • Replace BATCH_SIZE_VALUE with the batch size (# of rows per API for csv's).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

Run from the REST API

When running this template, you'll need the Cloud Storage path to the template:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

To run this template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization.

You must replace the following values in this example:

  • Replace YOUR_TEMPLATE_PROJECT_ID with your template project ID.
  • Replace YOUR_DLP_API_PROJECT_ID with your Cloud DLP API project ID.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace YOUR_INPUT_DATA with your input file path.
  • Replace YOUR_DEIDENTIFY_TEMPLATE with the Cloud DLPDeidentify Template number.
  • Replace YOUR_DATASET_NAME with the BigQuery dataset name.
  • Replace YOUR_INSPECT_TEMPLATE with the Cloud DLPInspect Template number
  • Replace BATCH_SIZE_VALUE with the batch size (# of rows per API for csv's).
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}
Oliko tästä sivusta apua? Kerro mielipiteesi

Palautteen aihe:

Tämä sivu
Cloud Dataflow
Tarvitsetko apua? Siirry tukisivullemme.