Use Flex Templates

Stay organized with collections Save and categorize content based on your preferences.

This tutorial shows you how to create and run a Dataflow Flex Template job with a custom Docker image using Google Cloud CLI. This tutorial walks you through a streaming pipeline example that reads JSON-encoded messages from Pub/Sub, transforms message data with Beam SQL, and writes the results to a BigQuery table. In this tutorial, you store your container image in Artifact Registry. Flex Templates can also use prebuilt images stored in private registries.

To learn more about Flex Templates, see Dataflow templates.

Objectives

  • Build a Docker container image.
  • Create and run a Dataflow Flex Template.

Costs

This tutorial uses billable components of Google Cloud, including:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Artifact Registry
  • Cloud Build
  • BigQuery

Use the Pricing Calculator to generate a cost estimate based on your projected usage.

Before you begin

This section shows you how to enable APIs, create a service account, and grant an Owner role to the service account. In a production environment, don't grant the Owner role. Instead, use the appropriate Dataflow-specific permissions and roles. To learn more, see Understanding Flex Template permissions.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Cloud project:

      gcloud projects create PROJECT_ID
    • Select the Cloud project that you created:

      gcloud config set project PROJECT_ID
  5. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  6. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Artifact Registry, Cloud Scheduler, and Cloud Build APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub cloudresourcemanager.googleapis.com appengine.googleapis.com artifactregistry.googleapis.com cloudscheduler.googleapis.com cloudbuild.googleapis.com
  7. Create authentication credentials for your Google Account:

    gcloud auth application-default login
  8. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Cloud project:

      gcloud projects create PROJECT_ID
    • Select the Cloud project that you created:

      gcloud config set project PROJECT_ID
  12. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  13. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Artifact Registry, Cloud Scheduler, and Cloud Build APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub cloudresourcemanager.googleapis.com appengine.googleapis.com artifactregistry.googleapis.com cloudscheduler.googleapis.com cloudbuild.googleapis.com
  14. Create authentication credentials for your Google Account:

    gcloud auth application-default login
  15. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  16. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles: roles/dataflow.admin, roles/dataflow.worker, roles/bigquery.dataEditor, roles/pubsub.editor, roles/storage.objectAdmin, and roles/artifactregistry.reader.

    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace PROJECT_NUMBER with your project number. To find your project number, see Identify projects.
    • Replace SERVICE_ACCOUNT_ROLE with each individual role.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. See Clean up for more detail.

Create the example source and sink

This section explain how to create the following:

  • A streaming source of data using Pub/Sub
  • A dataset to load the data into BigQuery

Create a Cloud Storage bucket

Use the gsutil mb command:

gsutil mb gs://BUCKET_NAME

Replace BUCKET_NAME with a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.

Create a Pub/Sub topic and a subscription to that topic

Use the Google Cloud CLI:

gcloud pubsub topics create TOPIC_ID
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
  • Replace TOPIC_ID with a name for your Pub/Sub topic.
  • Replace SUBSCRIPTION_ID with a name for your Pub/Sub subscription.

Create Cloud Scheduler jobs

In this step, we use the Google Cloud CLI to create and run a Cloud Scheduler job that publishes "positive ratings" and "negative ratings."

  1. Create a Cloud Scheduler job for this Google Cloud project.
         gcloud scheduler jobs create pubsub positive-ratings-publisher \
           --schedule="* * * * *" \
           --location=REGION \
           --topic="TOPIC_ID" \
           --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
  2. Replace REGION with the regional endpoint for deploying your Dataflow job. The value of the REGION variable must be a valid region name. For more information about regions and locations, see Dataflow locations. This creates and runs a publisher for "positive ratings" that publishes one message per minute.
  3. Start the Cloud Scheduler job.
    gcloud scheduler jobs run --location=REGION positive-ratings-publisher
    
  4. Create and run another similar publisher for "negative ratings" that publishes one message every two minutes.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --location=REGION  \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run --location=REGION negative-ratings-publisher
    

Create a BigQuery dataset

Use the bq mk command:

  bq --location=REGION mk \
  PROJECT_ID:DATASET_ID
  • Replace PROJECT_ID with the project ID of your project.
  • Replace DATASET_ID with a name for your dataset. For more information about naming datasets, see Name datasets in Creating datasets.

Download the code sample and change directories

Download the code sample and then change directories.

Java

Clone the java-docs-samples repository.

  git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

Navigate to the code sample for this tutorial.

  cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

Python

Clone the python-docs-samples repository.

  git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Navigate to the code sample for this tutorial.

  cd python-docs-samples/dataflow/flex-templates/streaming_beam

Set up your development environment

Java

  1. Download and install the Java Development Kit (JDK) version 11. Verify that the JAVA_HOME environment variable is set and points to your JDK installation.
  2. Download and install Apache Maven by following Maven's installation guide for your specific operating system.
  3. (Optional) Run the Apache Beam pipeline locally for development.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=PROJECT_ID \
          --inputSubscription=SUBSCRIPTION_ID \
          --outputTable=PROJECT_ID:DATASET_ID.TABLE_ID \
          --tempLocation=gs://BUCKET_NAME/samples/dataflow/temp"

    Replace TABLE_ID with a name for your table.

  4. Build the Java project into an Uber JAR file.
      mvn clean package
  5. (Optional) Note the size of the Uber JAR file compared to the original file.
      ls -lh target/*.jar
    This Uber JAR file has all the dependencies embedded in it. You can run this file as a standalone application with no external dependencies on other libraries.

Python

Use the Apache Beam SDK for Python.

Create and build a container image

  1. You must create an Artifact Registry repository before you can upload artifacts. Each repository can contain artifacts for a single supported format.

    All repository content is encrypted using either Google-managed or customer-managed encryption keys. Artifact Registry uses Google-managed encryption keys by default and no configuration is required for this option.

    You must have at least Artifact Registry Writer access to the repository.

    Run the following command to create a new repository. The command uses the --async flag and returns immediately, without waiting for the operation in progress to complete.

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=REGION \
        --async 

    Replace REPOSITORY with a name for your repository. For each repository location in a project, repository names must be unique.

  2. (Optional) Create the Dockerfile.

    Java

    Note that the Flex Template container built using the Dockerfile is used only to create a job graph and start the Dataflow job. The packages installed in Flex Template containers are not available in the Beam container.

    For packages to be part of the Beam container, you must specify the packages as part of the requirements.txt file. Don't specify apache-beam as part of the requirements.txt file. The Beam container already has apache-beam.

    You can customize the Dockerfile from this tutorial. The starter file looks like the following:

      FROM gcr.io/dataflow-templates-base/java11-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      RUN mkdir -p ${WORKDIR}/src/main/java/org/apache/beam/samples
      WORKDIR ${WORKDIR}
    
      COPY pom.xml .
      COPY StreamingBeamSql.java  /dataflow/template/src/main/java/org/apache/beam/samples/
    
      ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"
      ENV FLEX_TEMPLATE_JAVA_CLASSPATH="${WORKDIR}/src/main/java/org/apache/beam/samples/StreamingBeamSql.java"
    
      ENTRYPOINT ["/opt/google/dataflow/java_template_launcher"]

    This Dockerfile contains the FROM, ENV, and COPY commands, which you can read about in the Dockerfile reference.

    Python

    As part of the Dockerfile, you need to install apache-beam to generate the job graph. Note that the Flex Template container built using the Dockerfile is used only to create a job graph and start the Dataflow job. The packages installed in Flex Template containers are not available in the Beam container.

    For packages to be part of the Beam container, you must specify them as part of the requirements.txt file. Ensure that you do not specify apache-beam as part of the requirements.txt file. The Beam container already has apache-beam. To see an example file, in the python-docs-samples repository, view the requirements-test.txt file.

    Note that the Flex Template base image python3-template-launcher-base is based on Python 3.7. Other base images are available, such as python38-template-launcher-base for Python 3.8 and python39-template-launcher-base for Python 3.9.

    You can customize the Dockerfile from this tutorial. The starter file looks like the following:

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    This Dockerfile contains the FROM, ENV, and COPY commands, which you can read about in the Dockerfile reference.

  3. Before you can push or pull images, configure Docker to authenticate requests for Artifact Registry. To set up authentication to Docker repositories, run the following command:
    gcloud auth configure-docker REGION-docker.pkg.dev

    The command updates your Docker configuration. You can now connect with Artifact Registry in your Google Cloud project to push images.

  4. Build the Docker image using a Dockerfile with Cloud Build.

    gcloud builds submit --tag REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest .
    

    This command builds the file and pushes it to your Artifact Registry repository.

Metadata

You can extend your template with additional metadata so that custom parameters are validated when the template is run. If you want to create metadata for your template, follow these steps:

  1. Create a metadata.json file using the parameters in Metadata parameters.

    To view an example, see Example metadata file.

  2. Store the metadata file in Cloud Storage in the same folder as the template.

Metadata parameters

Parameter key Required Description of the value
name Yes The name of your template.
description No A short paragraph of text describing the template.
parameters No An array of additional parameters that the template uses. An empty array is used by default.
name Yes The name of the parameter that is used in your template.
label Yes A human readable string that is used in the Google Cloud console to label the parameter.
helpText Yes A short paragraph of text that describes the parameter.
isOptional No false if the parameter is required and true if the parameter is optional. Unless set with a value, isOptional defaults to false. If you do not include this parameter key for your metadata, the metadata becomes a required parameter.
regexes No An array of POSIX-egrep regular expressions in string form that is used to validate the value of the parameter. For example, ["^[a-zA-Z][a-zA-Z0-9]+"] is a single regular expression that validates that the value starts with a letter and then has one or more characters. An empty array is used by default.

Example metadata file

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "([^:]+:)?[^.]+[.].+"
      ]
    }
  ]
}

You can download metadata files for the Google-provided templates from the Dataflow template directory.

Create a Flex Template

To run a template, you need to create a template spec file in a Cloud Storage bucket that contains all of the necessary information to run the job, such as the SDK information and metadata.

This tutorial uses the example metadata file, which contains additional information for the template such as the name, description, and input parameters fields.

Use the gcloud dataflow flex-template build command to create a Flex Template named streaming-beam-sql.json in your Cloud Storage bucket. In this command, you specify the file parameters, including the file name and storage location.

Java

  gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json \
      --image-gcr-path "REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest" \
      --sdk-language "JAVA" \
      --flex-template-base-image JAVA11 \
      --metadata-file "metadata.json" \
      --jar "target/streaming-beam-sql-1.0.jar" \
      --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

Python

  gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json \
     --image "REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest" \
     --sdk-language "PYTHON" \
     --metadata-file "metadata.json"
The template is now available through the template file in the Cloud Storage location that you specified.

Run a Flex Template pipeline

You can now run the Apache Beam pipeline in Dataflow by referring to the template file and passing the template parameters required by the pipeline.
  1. In your shell or terminal, run the template:

    Java

    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json" \
        --parameters inputSubscription="SUBSCRIPTION_ID" \
        --parameters outputTable="PROJECT_ID:DATASET_ID.TABLE_ID" \
        --region "REGION"

    Python

    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json" \
        --parameters input_subscription="projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID" \
        --parameters output_table="PROJECT_ID:DATASET_ID.TABLE_ID" \
        --region "REGION"

    Replace TABLE_ID with a name for your table.

    Alternatively, you can run the template with a

    REST API request:

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'SUBSCRIPTION_ID'",
            "outputTable": "'PROJECT_ID:DATASET_ID.TABLE_ID'"
          },
          "containerSpecGcsPath": "'gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'PROJECT_ID'/subscriptions/'SUBSCRIPTION_ID'",
            "output_table": "'PROJECT_ID:DATASET_ID.TABLE_ID'"
          },
          "containerSpecGcsPath": "'gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json'"
        }
      }'
  2. After you execute the command to run the Flex Template, the Dataflow returns a Job ID with the job status Queued. It might take several minutes before the job status reaches Running and you can access the job graph.
  3. Check the results in BigQuery by running the following query:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.DATASET_ID.TABLE_ID"'`'
    
    While this pipeline is running, you can see new rows appended into the BigQuery table every minute.

Clean up

After you've finished this tutorial, you can clean up the resources you created on Google Cloud so you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Clean up the Flex Template resources

  1. Stop the Dataflow pipeline.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "REGION" \
      | xargs gcloud dataflow jobs cancel --region "REGION"
  2. Delete the template spec file from Cloud Storage.
    gsutil rm gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json
  3. Delete the Artifact Registry repository.
    gcloud artifacts repositories delete REPOSITORY \
    --location=REGION --async

Clean up Google Cloud project resources

  1. Delete the Cloud Scheduler jobs.
    gcloud scheduler jobs delete negative-ratings-publisher --location=REGION
    
    gcloud scheduler jobs delete positive-ratings-publisher --location=REGION
    
  2. Delete the Pub/Sub subscription and topic.
    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  3. Delete the BigQuery table.
    bq rm -f -t PROJECT_ID:DATASET_ID.TABLE_ID
    
  4. Delete the BigQuery dataset, this alone does not incur any charges.
    bq rm -r -f -d PROJECT_ID:DATASET_ID
    
  5. Delete the Cloud Storage bucket, this alone does not incur any charges.

Revoke credentials

  1. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  2. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke
  3. Revoke the roles that you granted to the Compute Engine default service account.

    Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/bigquery.dataEditor
    • roles/pubsub.editor
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE

Limitations

The following limitations apply to Flex Templates jobs:

  • You must use a Google-provided base image to package your containers using Docker. For a list of applicable images, see Flex Template base images.
  • The program that constructs the pipeline must exit after run is called in order for the pipeline to start.
  • waitUntilFinish (Java) and wait_until_finish (Python) are not supported.

What's next