Run a Hadoop wordcount job on a Dataproc cluster

Cloud Composer 1 | Cloud Composer 2

This tutorial shows how to use Cloud Composer to create an Apache Airflow DAG (Directed Acyclic Graph) that runs an Apache Hadoop wordcount job on a Dataproc cluster.

Objectives

  1. Access your Cloud Composer environment and use the Airflow UI.
  2. Create and view Airflow environment variables.
  3. Create and run a DAG that includes the following tasks:
    1. Creates a Dataproc cluster.
    2. Runs an Apache Hadoop word-count job on the cluster.
    3. Outputs the word-count results to a Cloud Storage bucket.
    4. Deletes the cluster.

Costs

In this document, you use the following billable components of Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

  • Make sure that the following APIs are enabled in your project:

    Console

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • In your project, create a Cloud Storage bucket of any storage class and region to store the results of the Hadoop word-count job.

  • Note the path of the bucket that you created, for example gs://example-bucket. You'll define an Airflow variable for this path and use the variable in the example DAG later in this tutorial.

  • Create a Cloud Composer environment with default parameters. Wait until environment creation is completed. When done, the green check mark displays to the left of the environment name.

  • Note the region where you created your environment, for example us-central. You'll define an Airflow variable for this region and use it in the example DAG to run a Dataproc cluster in the same region.

Set Airflow variables

Set the Airflow variables to use later in the example DAG. For example, you can set Airflow variables in the Airflow UI.

Airflow variable Value
gcp_project The project ID of the project you're using for this tutorial, such as example-project.
gcs_bucket The URI Cloud Storage bucket you created for this tutorial, such as gs://example-bucket.
gce_region The region where you created your environment, such as us-central1. This is the region where your Dataproc cluster will be created.

View the example workflow

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files. The code shown in hadoop_tutorial.py is the workflow code.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operators

To orchestrate the three tasks in the example workflow, the DAG imports the following three Airflow operators:

  • DataprocClusterCreateOperator: Creates a Dataproc cluster.

  • DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.

  • DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.

Dependencies

You organize tasks that you want to run in a way that reflects their relationships and dependencies. The tasks in this DAG run sequentially.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Scheduling

The name of the DAG is composer_hadoop_tutorial, and the DAG runs once each day. Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG is uploaded to the environment's bucket.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Upload the DAG to the environment's bucket

Cloud Composer stores DAGs in the /dags folder in your environment's bucket.

To upload the DAG:

  1. On your local machine, save hadoop_tutorial.py.

  2. In the Google Cloud console, go to the Environments page.

    Go to Environments

  3. In the list of environments, in the DAGs folder column for your environment, click the DAGs link.

  4. Click Upload files.

  5. Select hadoop_tutorial.py on your local machine and click Open.

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes.

Explore DAG runs

View task status

When you upload your DAG file to the dags/ folder in Cloud Storage, Cloud Composer parses the file. When completed successfully, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately.

  1. To see task status, go to the Airflow web interface and click DAGs in the toolbar.

  2. To open the DAG details page, click composer_hadoop_tutorial. This page includes a graphical representation of workflow tasks and dependencies.

  3. To see each task's status, click Graph View and then mouseover the graphic for each task.

Queue the workflow again

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the create_dataproc_cluster graphic.
  2. To reset the three tasks, click Clear and then click OK to confirm.
  3. Click create_dataproc_cluster again in Graph View.
  4. To queue the workflow again, click Run.

View task results

You can also check the status and results of the composer_hadoop_tutorial workflow by going to the following Google Cloud console pages:

  • Dataproc Clusters: to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.

    Go to Dataproc Clusters

  • Dataproc Jobs: to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.

    Go to Dataproc Jobs

  • Cloud Storage Browser: to see the results of the wordcount in the wordcount folder in the Cloud Storage bucket you created for this tutorial.

    Go to Cloud Storage Browser

Cleanup

Delete the resources used in this tutorial:

  1. Delete the Cloud Composer environment, including manually deleting the environment's bucket.

  2. Delete the Cloud Storage bucket that stores the results of the Hadoop word-count job.