Airflow Summit 2023
Join the Airflow community on September 19—21 during the Airflow Summit 2023 conference to learn more about Airflow and share your expertise. Call for papers is now open

Running a Hadoop wordcount job on a Dataproc cluster

Stay organized with collections Save and categorize content based on your preferences.

This tutorial shows how to use Cloud Composer to create an Apache Airflow DAG (workflow) that runs an Apache Hadoop wordcount job on a Dataproc cluster using the Google Cloud console.

Objectives

  1. Access your Cloud Composer environment and use the Airflow web interface.
  2. Create and view Airflow environment variables.
  3. Create and run a DAG that includes the following tasks:
    1. Creates a Dataproc cluster
    2. Runs an Apache Hadoop word-count job on the cluster
    3. Outputs the word-count results to a Cloud Storage bucket
    4. Deletes the cluster

Costs

This tutorial uses the following billable components of Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

It takes up to 25 minutes for the system to create your environment. This tutorial can take approximately 1 hour to complete.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  7. Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

    Enable the APIs

  8. In your project, create a Cloud Storage bucket of any storage class and region to store the results of the Hadoop word-count job.
  9. Note the path of the bucket that you created, for example gs://my-bucket. You'll define an Airflow variable for this path and use the variable in the example DAG.

Creating an environment

  1. In the Google Cloud console, go to the Create environment page.

    Open the Create environment page

  2. In the Name field, enter example-environment.

  3. In the Location drop-down list, select a region for the Cloud Composer environment. See Available regions for information on selecting a region.

  4. For other environment configuration options, use the provided defaults.

  5. To create the environment, click Create.

  6. Wait until environment creation is completed. When done, the green check mark displays to the left of the environment name.

Viewing environment details

After environment creation is completed, you can view your environment's deployment information, such as the Cloud Composer and Python versions, the URL for the Airflow web interface, and the Google Kubernetes Engine cluster ID.

To view deployment information:

  1. In the Google Cloud console, go to the Environments page.

    Open the Environments page

  2. To view Environment details page, click example-environment.

  3. Note the zone in which you created your environment, for example us-central-1c. You'll define an Airflow variable for this zone and use it in the example DAG.

Setting Airflow variables

Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll use the Airflow web interface to set three Airflow variables to use later in the example DAG.

To set variables:

  1. Access the Airflow web interface in Google Cloud console:

    1. In the Google Cloud console, go to the Environments page.

      Open the Environments page

    2. In the Airflow webserver column for example-environment, click the Airflow link. The Airflow web interface opens in a new window.

  2. Set variables in the Airflow web interface:

    1. In the toolbar, click Admin > Variables.
    2. To create a new variable, click Create.
    3. For each of the following variables, enter the Key-Value pair and click Save. All Airflow variables display on the List tab.
      KEY VALUE
      gcp_project The project Id of the Google Cloud Platform project you're using for this tutorial, such as composer-test.
      gcs_bucket The Cloud Storage bucket you created for this tutorial, such as gs://my-bucket.
      gce_region The region for your environment, such as us-central1. This is the region where your Dataproc cluster will be created. See Available regions and zones.

Viewing the example workflow

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files. The code shown in hadoop_tutorial.py is the workflow code.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = os.path.join(
    '{{ var.value.gcs_bucket }}', 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": '{{ var.value.gcp_project }}'},
    "placement": {"cluster_name": 'composer-hadoop-tutorial-cluster-{{ ds_nodash }}'},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-2"
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-2"
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': '{{ var.value.gcp_project }}',
    'region': '{{ var.value.gce_region }}',

}


with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        cluster_config=CLUSTER_CONFIG,
        region='{{ var.value.gce_region }}'
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id='run_dataproc_hadoop',
        job=HADOOP_JOB)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        region='{{ var.value.gce_region }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = os.path.join(
    '{{ var.value.gcs_bucket }}', 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': '{{ var.value.gcp_project }}'
}

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        region='{{ var.value.gce_region }}',
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        region='{{ var.value.gce_region }}',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        region='{{ var.value.gce_region }}',

        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operators

An operator is a template for a single task in a workflow. To orchestrate the three tasks in the example workflow, the DAG imports the following three operators:

  1. DataprocClusterCreateOperator: Creates a Dataproc cluster.
  2. DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.

Dependencies

You organize tasks that you want to run in a way that reflects their relationships and dependencies. The tasks in this DAG run sequentially. In this example, the relationship is set in the direction that the Python bitshift operator points (>>).

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Scheduling

The name of the DAG is composer_hadoop_tutorial, and the DAG runs once each day. Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.

Airflow 2

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Airflow 1

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Uploading the DAG to Cloud Storage

Cloud Composer schedules only the DAGs in the DAGs folder. The DAGs folder is in the Cloud Storage bucket that Cloud Composer creates automatically for your environment.

To upload the DAG:

  1. On your local machine, save hadoop_tutorial.py.
  2. In the Google Cloud console, go to the Environments page.

    Open the Environments page

  3. In the DAGs folder column for example-environment, click the DAGs link. The DAGs folder in Cloud Storage opens.

  4. Click Upload files.

  5. Select hadoop_tutorial.py on your local machine and click Open.

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes.

Exploring DAG runs

Viewing task status

When you upload your DAG file to the dags/ folder in Cloud Storage, Cloud Composer parses the file. When completed successfully, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately.

  1. To see task status, go to the Airflow web interface and click DAGs in the toolbar.

  2. To open the DAG details page, click composer_hadoop_tutorial. This page includes a graphical representation of workflow tasks and dependencies.

  3. To see each task's status, click Graph View and then mouseover the graphic for each task.

Queuing the workflow again

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the create_dataproc_cluster graphic.
  2. To reset the three tasks, click Clear and then click OK to confirm.
  3. Click create_dataproc_cluster again in Graph View.
  4. To queue the workflow again, click Run.

Viewing task results

You can also check the status and results of the composer_hadoop_tutorial workflow by going to the following Google Cloud console pages:

  • Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.

  • Dataproc Jobs to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.

  • Cloud Storage Browser to see the results of the wordcount in the wordcount folder in the Cloud Storage bucket you created for this tutorial.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. If the project that you plan to delete is attached to an organization, expand the Organization list in the Name column.
  3. In the project list, select the project that you want to delete, and then click Delete.
  4. In the dialog, type the project ID, and then click Shut down to delete the project.

Alternatively, you can delete the resources used in this tutorial:

  1. Delete the Cloud Composer environment.
  2. Delete the Cloud Storage bucket for the Cloud Composer environment. Deleting the Cloud Composer environment does not delete its bucket.
  3. Delete the Pub/Sub topics for Cloud Composer (composer-agent and composer-backend).

What's next