This page shows you how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

  1. Creates a Cloud Dataproc cluster
  2. Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage
  3. Deletes the cluster

This page also shows you how to access your Cloud Composer environment through the Google Cloud Platform Console, Cloud SDK, and Airflow web interface.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the Manage resources page

  3. Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

    Enable the APIs

  4. To run gcloud beta composer commands in a local terminal window, install the Cloud SDK on your client machine. You can also run the commands in Cloud Shell.
  5. Create a Cloud Composer environment and wait until environment creation completes.

    It can take up to one hour to deploy the Airflow web interface in the Cloud Composer environment. The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.

  6. Create a Cloud Storage bucket in your project and note the bucket path, such as gs://my-bucket, to use later.

Viewing environment information

  1. In the GCP Console, open the Environments page.

    Open the Environments page

  2. Click the name of the environment to see its details.

    The Environment details page provides information, such as the Airflow web interface URL, Kubernetes Engine cluster ID, name of the Cloud Storage bucket, and path for the /dags folder.

Setting Airflow variables

Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following three Airflow variables: gcp_project, gcs_bucket, and gce_zone.

To set Airflow variables using the gcloud command-line tool, use the gcloud beta composer environments run command with the variables sub-command. This gcloud beta composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

     gcloud beta composer environments run ENVIRONMENT_NAME \
     --location LOCATION variables -- \
     --set KEY VALUE


  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located. The gcloud beta composer command requires including the --location flag or setting the default location before running the gcloud command.
  • KEY and VALUE specify the variable and its value to set. Include a space two dashes space ( -- ) between the left-side gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between the KEY and VALUE arguments.

For example:

    gcloud beta composer environments run test-environment
    --location us-central1 variables -- --set gcp-project project-id

To set the three variables, run the gcloud beta composer command once for each row in the following table:

gcp_project your project-id The Google Cloud Platform project you're using for this quickstart.
gcs_bucket gs://my-bucket The Cloud Storage bucket you created for this quickstart.
gce_zone The Compute Engine zone where your Cloud Dataproc cluster will be created, such as us-central1-a. See Available regions & zones.

Defining the workflow

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in quickstart.py is the workflow code--also referred to as the DAG.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        cluster_name='quickstart-cluster-{{ ds_nodash }}',

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

To orchestrate the three workflow tasks, the DAG imports the following operators:

  1. DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.
  2. DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.

The tasks run sequentially.

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

The name of the DAG is quickstart, and the DAG runs once each day.

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.

Uploading the DAG to Cloud Storage

To upload the DAG:

  1. Copy and save quickstart.py on your local client machine.

  2. Upload your local copy of quickstart.py to the Cloud Storage bucket for your Cloud Composer environment.

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.

This guide now refers to the workflow as composer-quickstart.

Using the Airflow UI

To access the Airflow web interface using the GCP Console:

  1. Open the Environments page.

    Open the Environments page

  2. In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.

For information about the Airflow UI, see Accessing the web interface.

Viewing Variables

The variables you set earlier are persisted in your environment. You can view the variables by selecting Admin > Variables from the Airflow UI menu bar.

Exploring DAG runs

When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately.

Click composer-quickstart to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.

Now, in the toolbar, click Graph View and then mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the create_dataproc_cluster graphic.
  2. Click Clear to reset the three tasks and then click OK to confirm.
  3. Click create_dataproc_cluster again in Graph View.
  4. Select Run to re-queue the workflow.

You can also check the status and results of the composer-quickstart workflow by going to the following GCP Console pages:

  • Cloud Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.

  • Cloud Dataproc Jobs to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.

  • Cloud Storage Browser to see the results of the wordcount in the wordcount folder in the Cloud Storage bucket for the environment.

Clean up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this quickstart:

  1. (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
  2. Delete the Cloud Storage bucket you created for this quickstart.
  3. Delete the Cloud Storage bucket for the environment.
  4. Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.
  5. (Optional) Delete the project.

    1. In the GCP Console, go to the Projects page.

      Go to the Projects page

    2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
    3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Google Cloud Composer