This tutorial shows how to use Cloud Composer to create an Apache Airflow DAG (workflow) that runs an Apache Hadoop wordcount job on a Dataproc cluster using the Google Cloud console.
Objectives
- Access your Cloud Composer environment and use the Airflow web interface.
- Create and view Airflow environment variables.
- Create and run a DAG that includes the following tasks:
- Creates a Dataproc cluster
- Runs an Apache Hadoop word-count job on the cluster
- Outputs the word-count results to a Cloud Storage bucket
- Deletes the cluster
Costs
This tutorial uses the following billable components of Google Cloud:
- Cloud Composer
- Dataproc
- Cloud Storage
To generate a cost estimate based on your projected usage,
use the pricing calculator.
It takes up to 25 minutes for the system to create your environment. This tutorial can take approximately 1 hour to complete.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.
- In your project, create a Cloud Storage bucket of any storage class and region to store the results of the Hadoop word-count job.
- Note the path of the bucket that you created, for example
gs://my-bucket
. You'll define an Airflow variable for this path and use the variable in the example DAG.
Creating an environment
In the Google Cloud console, go to the Create environment page.
In the Name field, enter
example-environment
.In the Location drop-down list, select a region for the Cloud Composer environment. See Available regions for information on selecting a region.
For other environment configuration options, use the provided defaults.
To create the environment, click Create.
Wait until environment creation is completed. When done, the green check mark displays to the left of the environment name.
Viewing environment details
After environment creation is completed, you can view your environment's deployment information, such as the Cloud Composer and Python versions, the URL for the Airflow web interface, and the Google Kubernetes Engine cluster ID.
To view deployment information:
In the Google Cloud console, go to the Environments page.
To view Environment details page, click
example-environment
.Note the zone in which you created your environment, for example
us-central-1c
. You'll define an Airflow variable for this zone and use it in the example DAG.
Setting Airflow variables
Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll use the Airflow web interface to set three Airflow variables to use later in the example DAG.
To set variables:
Access the Airflow web interface in Google Cloud console:
In the Google Cloud console, go to the Environments page.
In the Airflow webserver column for
example-environment
, click the Airflow link. The Airflow web interface opens in a new window.
Set variables in the Airflow web interface:
- In the toolbar, click Admin > Variables.
- To create a new variable, click Create.
- For each of the following variables, enter the Key-Value pair and click
Save. All Airflow variables display on the List tab.
KEY VALUE gcp_project The project Id of the Google Cloud Platform project you're using for this tutorial, such as composer-test
.gcs_bucket The Cloud Storage bucket you created for this tutorial, such as gs://my-bucket
.gce_region The region for your environment, such as us-central1
. This is the region where your Dataproc cluster will be created. See Available regions and zones.
Viewing the example workflow
An Airflow DAG is a collection of organized tasks that you want to schedule and run.
DAGs are defined in standard Python files. The code shown in hadoop_tutorial.py
is
the workflow code.
Airflow 2
Airflow 1
Operators
An operator is a template for a single task in a workflow. To orchestrate the three tasks in the example workflow, the DAG imports the following three operators:
DataprocClusterCreateOperator
: Creates a Dataproc cluster.DataProcHadoopOperator
: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.DataprocClusterDeleteOperator
: Deletes the cluster to avoid incurring ongoing Compute Engine charges.
Dependencies
You organize tasks that you want to run in a way that reflects their relationships
and dependencies. The tasks in this DAG run sequentially.
In this example, the relationship is set in the direction that the Python bitshift operator
points (>>
).
Airflow 2
Airflow 1
Scheduling
The name of the DAG is composer_hadoop_tutorial
, and the DAG runs once each day.
Because the start_date
that is passed in to default_dag_args
is
set to yesterday
, Cloud Composer schedules the workflow
to start immediately after the DAG uploads.
Airflow 2
Airflow 1
Uploading the DAG to Cloud Storage
Cloud Composer schedules only the DAGs in the DAGs folder. The DAGs folder is in the Cloud Storage bucket that Cloud Composer creates automatically for your environment.
To upload the DAG:
- On your local machine, save
hadoop_tutorial.py
. In the Google Cloud console, go to the Environments page.
In the DAGs folder column for example-environment, click the DAGs link. The DAGs folder in Cloud Storage opens.
Click Upload files.
Select
hadoop_tutorial.py
on your local machine and click Open.
Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes.
Exploring DAG runs
Viewing task status
When you upload your DAG file to the dags/
folder in Cloud Storage, Cloud Composer
parses the file. When completed successfully, the name of the workflow
appears in the DAG listing, and the workflow is queued to run immediately.
To see task status, go to the Airflow web interface and click DAGs in the toolbar.
To open the DAG details page, click
composer_hadoop_tutorial
. This page includes a graphical representation of workflow tasks and dependencies.To see each task's status, click Graph View and then mouseover the graphic for each task.
Queuing the workflow again
To run the workflow again from the Graph View:
- In the Airflow UI Graph View, click the
create_dataproc_cluster
graphic. - To reset the three tasks, click Clear and then click OK to confirm.
- Click
create_dataproc_cluster
again in Graph View. - To queue the workflow again, click Run.

Viewing task results
You can also check the status and results of the composer_hadoop_tutorial
workflow by going to the following Google Cloud console pages:
Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.
Dataproc Jobs to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.
Cloud Storage Browser to see the results of the wordcount in the
wordcount
folder in the Cloud Storage bucket you created for this tutorial.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
- In the Google Cloud console, go to the Manage resources page.
- If the project that you plan to delete is attached to an organization, expand the Organization list in the Name column.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Alternatively, you can delete the resources used in this tutorial:
- Delete the Cloud Composer environment.
- Delete the Cloud Storage bucket for the Cloud Composer environment. Deleting the Cloud Composer environment does not delete its bucket.
- Delete the Pub/Sub topics for Cloud Composer
(
composer-agent
andcomposer-backend)
.
What's next
- Work through the tutorials.
- Get an overview of Cloud Composer.
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.
- If you're new to Airflow, check out this tutorial on the Airflow website for more information about concepts, objects, and their usage.