Using Flex Templates

This tutorial shows you how to create and run a Dataflow Flex Template job with a custom Docker image using gcloud command-line tool. This tutorial walks you through a streaming pipeline example that reads JSON-encoded messages from Pub/Sub, transforms message data with Beam SQL, and writes the results to a BigQuery table.

To learn more about Flex Templates, see Dataflow templates.

Objectives

  • Build a Docker container image.
  • Create and run a Dataflow Flex Template.

Costs

This tutorial uses billable components of Google Cloud, including:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Use the Pricing Calculator to generate a cost estimate based on your projected usage.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Cloud Console, go to the Create service account page.

      Go to Create service account
    2. Select a project.
    3. In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Click the Select a role field.

      Under Quick access, click Basic, then click Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Cloud Console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. See Cleaning up for more detail.

Creating the example source and sink

This section explain how to create the following:

  • A streaming source of data using Pub/Sub
  • A dataset to load the data into BigQuery

Create a Cloud Storage bucket

Use the gsutil mb command:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Create a Pub/Sub topic and a subscription to that topic

Use the gcloud command-line tool:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Create a Cloud Scheduler job

In this step, we use the gcloud command-line tool to create and run a Cloud Scheduler job that publishes "positive ratings" and "negative ratings."

  1. Create a Cloud Scheduler job for this Google Cloud project.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. This creates and runs a publisher for "positive ratings" that publishes 1 message per minute.
  3. Start the Cloud Scheduler job.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Create and run another similar publisher for "negative ratings" that publishes 1 message every 2 minutes.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Create a BigQuery dataset

Use the bq mk command:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Downloading the code sample

  1. Download the code sample.

    Java

    Clone the java-docs-samples repository and navigate to the code sample for this tutorial.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clone the python-docs-samples repository and navigate to the code sample for this tutorial.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Export the TEMPLATE_IMAGE for this tutorial.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Setting up your development environment

Java

  1. Download and install the Java Development Kit (JDK) version 11. Verify that the JAVA_HOME environment variable is set and points to your JDK installation.
  2. Download and install Apache Maven by following Maven's installation guide for your specific operating system.
  3. (Optional) Run the Apache Beam pipeline locally for development.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Build the Java project into an Uber JAR file.
      mvn clean package
  5. (Optional) Note the size of the Uber JAR file compared to the original file.
      ls -lh target/*.jar
    This Uber JAR file has all the dependencies embedded in it. You can run this file as a standalone application with no external dependencies on other libraries.

Python

Use the Apache Beam SDK for Python.

Python only: Creating and building a container image

This section contains steps for Python users. If you are using Java, skip the following steps.

If your job fails to run and the error message A Timeout in polling error message is displayed, see the troubleshooting steps.

  1. (Optional) Enable Kaniko cache use by default.
    gcloud config set builds/use_kaniko True
    
    Kaniko caches container build artifacts, so using this option speeds up subsequent builds.
  2. (Optional) Create the Dockerfile. You can customize the Dockerfile from this tutorial. The starter file looks like the following:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    This Dockerfile contains the FROM, ENV, and COPY commands, which you can read about in the Dockerfile reference.

    Images starting with gcr.io/PROJECT/ are saved into your project's Container Registry, where the image is accessible to other Google Cloud products.
  3. Build the Docker image using a Dockerfile with Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Metadata

You can extend your template with additional metadata so that custom parameters are validated when the template is run. If you want to create metadata for your template, follow these steps:

  1. Create a metadata.json file using the parameters in Metadata parameters.

    To view an example, see Example metadata file.

  2. Store the metadata file in Cloud Storage in the same folder as the template.

Metadata parameters

Parameter key Required Description of the value
name Yes The name of your template.
description No A short paragraph of text describing the template.
parameters No An array of additional parameters that the template uses. An empty array is used by default.
name Yes The name of the parameter that is used in your template.
label Yes A human readable string that is used in the Cloud Console to label the parameter.
helpText Yes A short paragraph of text that describes the parameter.
isOptional No false if the parameter is required and true if the parameter is optional. Unless set with a value, isOptional defaults to false. If you do not include this parameter key for your metadata, the metadata becomes a required parameter.
regexes No An array of POSIX-egrep regular expressions in string form that is used to validate the value of the parameter. For example, ["^[a-zA-Z][a-zA-Z0-9]+"] is a single regular expression that validates that the value starts with a letter and then has one or more characters. An empty array is used by default.

Example metadata file

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

You can download metadata files for the Google-provided templates from the Dataflow template directory.

Creating a Flex Template

To run a template, you need to create a template spec file in a Cloud Storage containing all of the necessary information to run the job, such as the SDK information and metadata.

This tutorial uses the example metadata file, which contains additional information for the template such as the name, description, and input parameters fields.

  1. Create a template spec file containing all of the information necessary to run the job, such as the SDK information and metadata.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Build the Flex Template.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

The template is now available through the template file in the Cloud Storage location that you specified.

Running a Flex Template pipeline

You can now run the Apache Beam pipeline in Dataflow by referring to the template file and passing the template parameters required by the pipeline.

  1. In your shell or terminal, run the template:

    Java

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Alternatively, you can run the template with a REST API request:

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'$PROJECT'/subscriptions/'$SUBSCRIPTION'",
            "output_table": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
  2. After you execute the command to run the Flex Template, the Dataflow returns a Job ID with the job status Queued. It might take several minutes before the job status reaches Running and you can access the job graph.
  3. Check the results in BigQuery by running the following query:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    While this pipeline is running, you can see new rows appended into the BigQuery table every minute.

Cleaning up

After you've finished this tutorial, you can clean up the resources you created on Google Cloud so you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Clean up the Flex Template resources

  1. Stop the Dataflow pipeline.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Delete the template spec file from Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Delete the Flex Template container image from Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Clean up Google Cloud project resources

  1. Delete the Cloud Scheduler jobs.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Delete the Pub/Sub subscription and topic.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Delete the BigQuery table.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Delete the BigQuery dataset, this alone does not incur any charges.

    The following command also deletes all tables in the dataset. The tables and data cannot be recovered.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Delete the Cloud Storage bucket, this alone does not incur any charges.

    The following command also deletes all objects in the bucket. These objects cannot be recovered.

    gsutil rm -r gs://$BUCKET
    

Limitations

The following limitations apply to Flex Templates jobs:

  • You must use a Google-provided base image to package your containers using Docker. For a list of applicable images, see Flex Template base images.
  • The program that constructs the pipeline must exit after run is called in order for the pipeline to start.
  • waitUntilFinish (Java) and wait_until_finish (Python) are not supported.

What's next