Executing Templates

After you create and stage your Google Cloud Dataflow template, execute it with the REST API or the gcloud command-line tool. Cloud Dataflow template jobs are deployable from many environments, including Google App Engine standard environment, Google Cloud Functions, and other constrained environments.

Note: In addition to the template file, templated pipeline execution also relies on files that were staged and referenced at the time of template creation. If the staged files are moved or removed, your pipeline execution will fail.

Using the REST API

To execute a template with a REST API request, send an HTTP POST request with your project ID. This request requires authorization, and must include gcsPath, which is set to the Cloud Storage location of the template file. Set parameters to your list of key/value pairs.

Note: To run a Google-provided template, you must also specify a tempLocation where you have write permissions. Set gcsPath and parameters to the template's location and parameters as documented in Google-Provided Templates.

Using the Google API Client Libraries

Consider using the Google API Client Libraries to easily make calls to the Cloud Dataflow REST APIs. This sample script uses the Google API Client Library for Python.

    from googleapiclient.discovery import build
    from oauth2client.client import GoogleCredentials

    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)

    # Set the following variables to your values.
    JOBNAME = 'YOUR_JOB_NAME'
    PROJECT = 'YOUR_PROJECT_ID'
    BUCKET = 'YOUR_BUCKET'
    TEMPLATE = 'YOUR_TEMPLATE_NAME'

    BODY = {
        "jobName": "{jobname}".format(jobname=JOBNAME),
        "gcsPath": "gs://{bucket}/templates/{template}".format(bucket=BUCKET, template=TEMPLATE),
        "parameters": {
            "inputFile" : "gs://{bucket}/input/my_input.txt",
            "outputFile": "gs://{bucket}/output/my_output.txt".format(bucket=BUCKET)
         },
         "environment": {
            "tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
            "zone": "us-central1-f"
         }
    }

    request = service.projects().templates().create(projectId=PROJECT, body=BODY)
    response = request.execute()

    print(response)

Example 1: Custom template, batch job

This example request creates a batch job from a template that reads a text file and writes an output text file.

    POST https://dataflow.googleapis.com/v1b3/projects/{YOUR_PROJECT_ID}/templates
    {
        "jobName": "myjobname",
        "gcsPath": "gs://{YOUR_BUCKET_NAME}/templates/TemplateName"
        "parameters": {
            "inputFile" : "gs://{YOUR_BUCKET_NAME}/input/my_input.txt",
            "outputFile": "gs://{YOUR_BUCKET_NAME}/output/my_output.txt"
        },
        "environment": {
            "tempLocation": "gs://{YOUR_BUCKET_NAME}",
            "zone": "us-central1-f"
        }
    }

The request returns a response with the following format.

    {
        "id": "2016-10-11_17_10_59-1234530157620696789",
        "projectId": "{YOUR_PROJECT_ID}",
        "type": "JOB_TYPE_BATCH",
        "clientRequestId": "20161012000501234_2678"
    }

Example 2: Custom template, streaming job

This example request creates a streaming job from a template that reads from a Pub/Sub topic and writes to a BigQuery table. The BigQuery table must already exist with the appropriate schema.

    POST https://dataflow.googleapis.com/v1b3/projects/{YOUR_PROJECT_ID}/templates
    {
        "jobName": "myjobname",
        "gcsPath":"gs://{YOUR_BUCKET_NAME}/templates/TemplateName",
        "parameters": {
            "topic": "projects/project-identifier/topics/resource-name",
            "table": "my_project:my_dataset.my_table_name"
        },
        "environment": {
            "tempLocation": "gs://{YOUR_BUCKET_NAME}",
            "zone": "us-central1-f"
        }
    }

The request returns a response with the following format.

    {
        "id": "2016-10-11_17_10_59-1234530157620696789",
        "projectId": "{YOUR_PROJECT_ID}",
        "type": "JOB_TYPE_STREAMING",
        "clientRequestId": "20161012000501234_2678"
    }

Using gcloud

Note: To use the gcloud command-line tool to execute templates, you must have Cloud SDK version 138.0.0 or higher.

To execute a custom template with gcloud, use the gcloud beta dataflow jobs run command. You must include the --gcs-location flag, which is set to the Cloud Storage location of the template file. Set parameters to your list of parameters. Your job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.

Note: Executing Google-provided templates with gcloud is not currently supported.

Example 1: Custom template, batch job

This example creates a batch job from a template that reads a text file and writes an output text file.

    gcloud beta dataflow jobs run [YOUR_JOB_NAME] \
        --gcs-location gs://[YOUR_BUCKET]/templates/MyTemplate \
        --parameters=inputFile=gs://[YOUR_BUCKET]/input/my_input.txt, \
                     outputFile=gs://[YOUR_BUCKET]/output/my_output.txt

The request returns a response with the following format.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: {YOUR_PROJECT_ID}
    type: JOB_TYPE_BATCH

Example 2: Custom template, streaming job

This example creates a streaming job from a template that reads from a Cloud Pub/Sub topic and writes to a BigQuery table. The BigQuery table must already exist with the appropriate schema.

    gcloud beta dataflow jobs run [YOUR_JOB_NAME] \
        --gcs-location gs://[YOUR_BUCKET]/templates/MyTemplate \
        --parameters=topic=projects/project-identifier/topics/resource-name, \
                     table=my_project:my_dataset.my_table_name

The request returns a response with the following format.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: {YOUR_PROJECT_ID}
    type: JOB_TYPE_STREAMING

For a complete list of flags for the gcloud beta dataflow jobs run command, see the gcloud reference.

Monitoring and Troubleshooting

The Dataflow Monitoring Interface allows you to monitor your Cloud Dataflow jobs. If a job fails, you can find troubleshooting tips, debugging strategies, and a catalog of common errors in the Troubleshooting Your Pipeline guide.

Send feedback about...

Cloud Dataflow Documentation