This page shows you how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

  1. Creates a Cloud Dataproc cluster
  2. Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage
  3. Deletes the cluster

This page also shows you how to access your Cloud Composer environment through the Google Cloud Platform Console, Cloud SDK, and Airflow web interface.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a Google Cloud Platform project.

    Go to the Manage resources page

  3. Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

    Enable the APIs

  4. To run gcloud composer commands in a local terminal window, install the Cloud SDK on your client machine. You can also run the commands in Cloud Shell, in which the Cloud SDK is preinstalled.
  5. Create a Cloud Composer environment and wait until environment creation completes.

    Open the Create Environment page

    It takes up to one hour to deploy the Airflow web interface and complete the environment creation process. The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.

  6. In your project, create a Cloud Storage bucket of any storage class and region. This bucket stores the results of the Hadoop wordcount job.

    Open the Create a Bucket page

  7. Note the bucket path, such as gs://my-bucket, to use later in a variable definition.

Viewing environment information

  1. In the GCP Console, open the Environments page.

    Open the Environments page

  2. Click the name of the environment to see its details.

    The Environment details page provides information, such as the Airflow web interface URL, Google Kubernetes Engine cluster ID, name of the Cloud Storage bucket, and path for the /dags folder.

    In Airflow, a DAG is a collection of organized tasks that you want to schedule and run. DAGs, also called workflows, are defined in standard Python files. Cloud Composer only schedules the DAGs in the /dags folder. The /dags folder is in the Cloud Storage bucket that Cloud Composer creates automatically when you create your environment.

  3. Note the zone in which you created your environment to use later in a variable definition.

Setting Airflow variables

Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following three Airflow variables: gcp_project, gcs_bucket, and gce_zone.

Using gcloud to set variables

To set Airflow variables using the gcloud command-line tool, use the gcloud composer environments run command with the variables sub-command. This gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

     gcloud composer environments run ENVIRONMENT_NAME \
     --location LOCATION variables -- \
     --set KEY VALUE


  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located. The gcloud composer command requires including the --location flag or setting the default location before running the gcloud command.
  • KEY and VALUE specify the variable and its value to set. Include a space two dashes space ( -- ) between the left-side gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between the KEY and VALUE arguments.

To set the three variables, run the gcloud composer command once for each row in the following table:

gcp_project your project-id The Google Cloud Platform project you're using for this quickstart.
gcs_bucket gs://my-bucket The Cloud Storage bucket you created for this quickstart.
gce_zone The Compute Engine zone for your environment, such as us-central1-a. This is the zone where your Cloud Dataproc cluster will be created. See Available regions & zones.

For example:

    gcloud composer environments run test-environment
    --location us-central1 variables -- --set gcp_project my-project-id

Output similar to the following displays:

kubeconfig entry generated for us-central1-test-environment-a3099dde-gke.
[2018-12-11 05:49:17,222] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-12-11 05:49:19,069] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative resultbackend (i.e. a database).
[2018-12-11 05:49:19,081] {init_.py:51} INFO - Using executor CeleryExecutor

Using gcloud to view a variable

To see the value of a variable, run the Airflow CLI sub-command variables with the get argument or use the Airflow UI.

For example:

    gcloud composer environments run test-environment
    --location us-central1 variables -- --get gcs_bucket

Viewing the sample workflow

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in quickstart.py is the workflow code.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        cluster_name='quickstart-cluster-{{ ds_nodash }}',

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

To orchestrate the three workflow tasks, the DAG imports the following operators:

  1. DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.
  2. DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.

The tasks run sequentially.

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

The name of the DAG is composer_sample_quickstart, and the DAG runs once each day.

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.

Uploading the DAG to Cloud Storage

To upload the DAG:

  1. Copy and save quickstart.py on your local client machine.

  2. Upload your local copy of quickstart.py to the dags/ folder in the Cloud Storage bucket for your Cloud Composer environment.

For example:

gcloud composer environments storage dags import \
  --environment my-environment  --location us-central1 \
  --source test-dags/quickstart.py

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.

Using the Airflow UI

To access the Airflow web interface using the GCP Console:

  1. Open the Environments page.

    Open the Environments page

  2. In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.

For information about the Airflow UI, see Accessing the web interface.

Viewing Variables

The variables you set earlier are persisted in your environment. You can view the variables by selecting Admin > Variables from the Airflow UI menu bar.

Exploring DAG runs

When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately.

Click composer_sample_quickstart to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.

Now, in the toolbar, click Graph View and then mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the create_dataproc_cluster graphic.
  2. Click Clear to reset the three tasks and then click OK to confirm.
  3. Click create_dataproc_cluster again in Graph View.
  4. Select Run to re-queue the workflow.

You can also check the status and results of the composer-sample-quickstart workflow by going to the following GCP Console pages:

  • Cloud Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.

  • Cloud Dataproc Jobs to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.

  • Cloud Storage Browser to see the results of the wordcount in the wordcount folder in the Cloud Storage bucket you created for this quickstart.

Clean up

To avoid incurring charges to your GCP account for the resources used in this quickstart:

  1. (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
  2. Delete the Cloud Storage bucket you created for this quickstart.
  3. Delete the Cloud Storage bucket for the environment.
  4. Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.
  5. (Optional) Delete the project.

    1. In the GCP Console, go to the Projects page.

      Go to the Projects page

    2. In the project list, select the project you want to delete and click Delete .
    3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next

Trang này có hữu ích không? Hãy cho chúng tôi biết đánh giá của bạn:

Gửi phản hồi về...

Google Cloud Composer