This page shows you how to create and run a Cloud Composer workflow. The workflow is as follows:

  1. Creates a Cloud Dataproc cluster
  2. Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage
  3. Deletes the cluster

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the Manage resources page

  3. Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

    Enable the APIs

  4. Create a Cloud Storage bucket in your project.
  5. Create a Cloud Composer environment.
  6. The following command executes the Airflow CLI's variables sub-command in your environment to set an Airflow variable. These Airflow variables are an Airflow-specific concept that is distinct from environment variables. When entering the command, note that there is a space two dashes space ( -- ) between the left-side gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments, and there is a space between the key and val arguments.
    gcloud beta composer environments run environment-name variables -- \
      --set key val

    Run the above command in a local or Cloud Shell terminal window once for each row in the table below to create the following three variables (gcp_project, gcs_bucket, and gce_zone).

    Key Val
    gcp_project your project-id (the Google Cloud Platform project you are using for this quickstart)
    gcs_bucket gs://my-bucket (the Cloud Storage you created for this quickstart)
    gce_zone The Compute Engine zone where your Cloud Dataproc cluster will be created, such as us-central1-a (see Available regions & zones)

Defining the workflow

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). Our workflow code is in quickstart.py, listed below.


"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        cluster_name='quickstart-cluster-{{ ds_nodash }}',

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

This DAG imports the following three operators to orchestrate three workflow tasks:

  1. DataprocClusterCreateOperator: create a Cloud Dataproc cluster.
  2. DataProcHadoopOperator: submit a Hadoop wordcount job and write results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: delete the cluster to avoid incurring ongoing Compute Engine charges.

These tasks are scheduled to run sequentially.

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

The name of the Dag is composer-quickstart, and it is scheduled to run once each day.

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

Since since the start_date passed in to default_dag_args is set to yesterday, the workflow will be scheduled to start immediately when it is uploaded into the Cloud Composer service (see Using the Airflow UI).

Upload the DAG to Cloud Storage

  1. Copy quickstart.py, then save it on your local machine.

  2. Upload your local copy of quickstart.py to the Cloud Storage bucket for your Cloud Composer environment by following Managing DAGs.

Using the Airflow UI

View Variables

The variables you set in Before you begin, Step 5, are persisted in your environment. You can view them by selecting Admin→Variables from the Airflow UI top menu.

Explore DAG runs

When you upload your DAG file to the dags/ folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow (composer-quickstart) appears in the DAG listing, and the workflow is queued to run immediately.

Click on composer-quickstart to open the DAG details page, which includes a graphical representation of workflow tasks and dependencies.

Now, click on Graph View, then mouseover the graphic for each task to see its status. Note that the border around each task also indicates its status (green border = running; red = failed, etc.).

You can also check the status and results of the composer-quickstart workflow by going to the following GCP Console pages:

  • Cloud Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow, and is deleted as part of the last workflow task.

  • Cloud Dataproc Jobs to view/monitor the Apache Hadoop wordcount job. Click on the Job ID to see job log output.

  • Cloud Storage Browser to the results of the wordcount in the wordcount/ folder in the Cloud Storage bucket you are using for this quickstart (Before you begin, Step 3).

Clean up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this quickstart:

Cloud Composer has done the bulk of the cleanup for you by deleting the cluster after the completion of the Hadoop wordcount. However, you can perform the additional cleanup tasks when you are ready to do so:

  1. Since the workflow is scheduled to run daily, pause the DAG from the Cloud Composer UI. To remove the workflow from Cloud Composer, delete quickstart.py from the dags/ folder in your gcsDagLocation bucket in Cloud Storage (see Upload the DAG to Cloud Storage, Step 2, to find the location of this bucket).
  2. Delete the Cloud Storage bucket used for this quickstart (created in Before you begin, Step 3).

What's next

Send feedback about...

Google Cloud Composer