在 Dataproc 集群上运行 Hadoop WordCount 作业

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本教程介绍如何使用 Cloud Composer 创建在 Dataproc 集群上运行 Apache Hadoop WordCount 作业的 Apache Airflow DAG(有向无环图)。

目标

  1. 访问您的 Cloud Composer 环境并使用 Airflow 界面
  2. 创建和查看 Airflow 环境变量。
  3. 创建和运行包含以下任务的 DAG
    1. 创建 Dataproc 集群。
    2. 在该集群上运行 Apache Hadoop WordCount 作业。
    3. 将单词数结果输出到 Cloud Storage 存储分区。
    4. 删除集群。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  • 确保已在项目中启用以下 API:

    控制台

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • 在您的项目中,创建任意存储类别的 Cloud Storage 存储桶和区域,以用于存储 Hadoop WordCount 作业的结果。

  • 记下您创建的存储桶的路径,例如 gs://example-bucket。您将为此路径定义一个 Airflow 变量, 使用本教程后面部分的示例 DAG 中的变量。

  • 创建一个 Cloud Composer 环境,默认环境 参数。等待环境创建操作完成。完成后, 相应环境名称左侧会显示绿色对勾标记。

  • 请记住您在其中创建了环境的区域,例如 us-central。您将为此区域定义一个 Airflow 变量,并在示例 DAG 中使用该变量在同一区域运行 Dataproc 集群。

设置 Airflow 变量

设置 Airflow 变量 以便稍后在示例 DAG 中使用。例如,您 可以在 Airflow 界面中设置 Airflow 变量。

Airflow 变量
gcp_project 项目的项目 ID 例如 example-project
gcs_bucket 您为本教程创建的 URI 中的 Cloud Storage 存储分区。 例如 gs://example-bucket.
gce_region 您创建环境的区域,例如 us-central1。 这是将在其中创建 Dataproc 集群的区域。

查看示例工作流

Airflow DAG 是您要调度的一组有序任务 并运行DAG 在标准 Python 文件中定义。代码如下所示: hadoop_tutorial.py 是工作流代码。

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

运算符

为了编排示例工作流程中的三个任务,DAG 导入以下三个 Airflow 运算符:

  • DataprocClusterCreateOperator:用于创建 Dataproc 集群。

  • DataProcHadoopOperator:提交 Hadoop Wordcount 作业并写入结果 Cloud Storage 存储桶

  • DataprocClusterDeleteOperator:用于删除集群,以避免产生持续的 Compute Engine 费用。

依赖项

您可以采用一种能够反映任务关系和依赖项的方式来组织要运行的任务。此 DAG 中的任务将按顺序运行。

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

时间安排

此 DAG 的名称为 composer_hadoop_tutorial,此 DAG 各运行一次 。由于传递到 default_dag_argsstart_date 设置为 yesterday,因此 Cloud Composer 会安排工作流在 DAG 上传到环境的存储桶后立即开始运行。

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

将该 DAG 上传到环境的存储桶

Cloud Composer 将 DAG 存储在您的/dags 环境的存储桶中。

如需上传 DAG,请执行以下操作:

  1. 在本地机器上,保存 hadoop_tutorial.py

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表的 DAGs 文件夹列中, 环境中,点击 DAG 链接。

  4. 点击上传文件

  5. 在本地机器上选择 hadoop_tutorial.py,然后点击打开

Cloud Composer 会将此 DAG 添加到 Airflow,并自动安排此 DAG。DAG 会在 3 到 5 分钟内发生更改。

探索 DAG 运行

查看任务状态

当您将 DAG 文件上传到 Cloud Storage 中的 dags/ 文件夹时,Cloud Composer 会解析该文件。成功完成后,工作流的名称会显示在 DAG 列表中,并且工作流会排入队列以立即运行。

  1. 如需查看任务状态,请转到 Airflow 网页界面并点击工具栏中的 DAGs

  2. 如需打开 DAG 详情页面,请点击 composer_hadoop_tutorial。这个 页面包含工作流任务和 依赖项

  3. 如需查看每个任务的状态,请点击 Graph View,然后将鼠标悬停在每个任务对应的图形上。

将工作流重新加入队列

如需从 Graph View 重新运行工作流,请执行以下操作:

  1. 在 Airflow 界面的“Graph View”中,点击 create_dataproc_cluster 图形。
  2. 如需重置三个任务,请点击 Clear,然后点击 OK 进行确认。
  3. 在“Graph View”中,再次点击 create_dataproc_cluster
  4. 如需将工作流重新加入队列,请点击 Run

查看任务结果

您还可以转到以下 Google Cloud 控制台页面来检查 composer_hadoop_tutorial 工作流的状态和结果:

  • Dataproc 集群:可以监控集群的创建和删除。请注意,工作流创建的集群是临时的: 仅在工作流期间存在,并且会在 最后一个工作流任务。

    转到 Dataproc 集群

  • Dataproc 作业:可以查看或监控 Apache Hadoop Wordcount 作业。点击“作业 ID”即可查看作业日志输出。

    转到 Dataproc 作业

  • Cloud Storage 浏览器:在 您创建的 Cloud Storage 存储桶中的 wordcount 文件夹

    转到 Cloud Storage 浏览器

清理

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境,包括 手动删除环境的存储桶

  2. 删除您要在其中 存储 Hadoop 字数统计作业的结果。