在 Dataproc 集群上运行 Hadoop WordCount 作业

本教程介绍如何使用 Cloud Composer 创建使用 Google Cloud Console 在 Dataproc 集群上运行 Apache Hadoop WordCount 作业的 Apache Airflow DAG(工作流)。

目标

  1. 访问您的 Cloud Composer 环境并使用 Airflow 网页界面。
  2. 创建和查看 Airflow 环境变量。
  3. 创建和运行包含以下任务的 DAG
    1. 创建 Dataproc 集群
    2. 在该集群上运行 Apache Hadoop WordCount 作业
    3. 将 WordCount 结果输出到 Cloud Storage 存储分区
    4. 删除集群

费用

本教程使用 Google Cloud 的以下收费组件:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

系统最多需要 25 分钟才能创建您的环境。完成本教程大约需要 1 小时时间。请使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

  4. 启用 Cloud Composer, Cloud Dataproc, and Cloud Storage API。

    启用 API

  5. 在您的项目中,创建任意存储类别的 Cloud Storage 存储分区和区域,以用于存储 Hadoop WordCount 作业的结果。
  6. 请记住您所创建的存储分区的路径,例如 gs://my-bucket。您将为此路径定义 Airflow 变量,并在示例 DAG 中使用该变量。

创建环境

  1. 在 Cloud Console 中,转到“创建环境”页面。

    打开“创建环境”页面

  2. 名称字段中,输入 example-environment

  3. 位置下拉列表中,为 Cloud Composer 环境选择一个区域。如需了解如何选择区域,请参阅可用区域

  4. 对于其他环境配置选项,请使用提供的默认值。

  5. 如需创建此环境,请点击创建

  6. 等待环境创建操作完成。完成后,相应环境名称左侧会显示绿色对勾标记。

查看环境详情

环境创建完成后,您可以查看环境的部署信息,例如 Cloud Composer 和 Python 版本、Airflow 网页界面网址以及 Google Kubernetes Engine 集群 ID。

如需查看部署信息,请执行以下操作:

  1. 在 Cloud Console 中,转到“环境”页面。

    打开“环境”页面

  2. 如需查看“环境详情”页面,请点击 example-environment

  3. 请记住您在其中创建了环境的地区,例如 us-central-1c。您将为此地区定义 Airflow 变量,并在示例 DAG 中使用该变量。

设置 Airflow 变量

Airflow 变量是 Airflow 特有的一个概念,此类变量不同于环境变量。在此步骤中,您将使用 Airflow 网页界面设置三个 Airflow 变量,以便稍后在示例 DAG 中使用。

如需设置变量,请执行以下操作:

  1. 访问 Cloud Console 中的 Airflow 网页界面:

    1. 在 Cloud Console 中,转到环境页面。

      打开“环境”页面

    2. example-environmentAirflow Web 服务器列中,点击 Airflow 链接。Airflow 网页界面会在一个新窗口中打开。

  2. 在 Airflow 网页界面中设置变量:

    1. 在工具栏中,点击 Admin > Variables
    2. 如需创建新变量,请点击 Create
    3. 对于以下所有变量,请输入键值对,然后点击 Save。“List”标签页会显示所有 Airflow 变量。
      gcp_project 您在本教程中使用的 Google Cloud Platform 项目的 ID,例如 composer-test
      gcs_bucket 您为本教程创建的 Cloud Storage 存储分区,例如 gs://my-bucket
      gce_zone 您的环境所在的 Compute Engine 地区,例如 us-central1-c。这是将在其中创建 Dataproc 集群的地区。请参阅可用区域和地区

查看示例工作流

Airflow DAG 是您要安排和运行的有序任务的集合。 DAG 在标准 Python 文件中定义。hadoop_tutorial.py 中显示的代码就是工作流代码。

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

运算符

运算符是工作流中单个任务的模板。为了编排示例工作流程中的三个任务,DAG 导入以下三个运算符:

  1. DataprocClusterCreateOperator:用于创建 Dataproc 集群。
  2. DataProcHadoopOperator:用于提交 Hadoop Wordcount 作业并将结果写入 Cloud Storage 存储分区中。
  3. DataprocClusterDeleteOperator:用于删除集群,以避免产生持续的 Compute Engine 费用。

依赖项

您可以采用一种能够反映任务关系和依赖项的方式来组织要运行的任务。此 DAG 中的任务会按顺序执行。 在此示例中,任务关系按照 Python bitshift 运算符 所指向的方向 (>>) 设置。

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

时间安排

此 DAG 的名称为 composer_hadoop_tutorial,它会每天运行一次。 由于传递到 default_dag_argsstart_date 设置为 yesterday,因此 Cloud Composer 会安排工作流在 DAG 上传后立即开始运行。

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

将 DAG 上传到 Cloud Storage

Cloud Composer 只会安排 DAGs 文件夹中的 DAG。DAG 文件夹位于 Cloud Composer 自动为您的环境创建的 Cloud Storage 存储分区中。

如需上传 DAG,请执行以下操作:

  1. 在本地机器上,保存 hadoop_tutorial.py
  2. 在 Cloud Console 中,转到环境页面。

    打开“环境”页面

  3. example-environmentDAG 文件夹列中,点击 DAG 链接。Cloud Storage 中的 DAGs 文件夹随即会打开。

  4. 点击上传文件

  5. 在本地机器上选择 hadoop_tutorial.py,然后点击打开

Cloud Composer 会将此 DAG 添加到 Airflow,并自动安排此 DAG。DAG 会在 3 到 5 分钟内发生更改。

探索 DAG 运行

查看任务状态

当您将 DAG 文件上传到 Cloud Storage 中的 dags/ 文件夹时,Cloud Composer 会解析该文件。成功完成后,工作流的名称会显示在 DAG 列表中,并且工作流会排入队列以立即运行。

  1. 如需查看任务状态,请转到 Airflow 网页界面并点击工具栏中的 DAGs

  2. 如需打开 DAG 详情页面,请点击 composer_hadoop_tutorial。此页面包含工作流任务和依赖项的图形表示。

  3. 如需查看每个任务的状态,请点击 Graph View,然后将鼠标悬停在每个任务对应的图形上。

将工作流重新加入队列

如需从 Graph View 重新运行工作流,请执行以下操作:

  1. 在 Airflow 界面的“Graph View”中,点击 create_dataproc_cluster 图形。
  2. 如需重置三个任务,请点击 Clear,然后点击 OK 进行确认。
  3. 在“Graph View”中,再次点击 create_dataproc_cluster
  4. 如需将工作流重新加入队列,请点击 Run

查看任务结果

您还可以转到以下 Cloud Console 页面来检查 composer_hadoop_tutorial 工作流的状态和结果:

  • Dataproc 集群:可以监控集群的创建和删除。请注意,由工作流创建的集群是临时性的,也就是说,此类集群仅在工作流的持续期间内存在,并且将在最后一个工作流任务的执行过程中删除。

  • Dataproc 作业:可以查看或监控 Apache Hadoop Wordcount 作业。点击“作业 ID”即可查看作业日志输出。

  • Cloud Storage 浏览器:可以在您为本教程创建的 Cloud Storage 存储分区所含的 wordcount 文件夹中查看 WordCount 的结果。

清理

为避免因本教程中使用的资源导致您的 Google Cloud Platform 帐号产生费用,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 如果您打算删除的项目已附加到某个组织,请从页面顶部的组织列表中选择该组织。
  3. 在项目列表中,选择要删除的项目,然后点击删除
  4. 在对话框中输入项目 ID,然后点击关闭以删除项目。

或者,您可以删除本教程中使用的资源

  1. 删除 Cloud Composer 环境
  2. 删除 Cloud Composer 环境的 Cloud Storage 存储分区。删除 Cloud Composer 环境不会删除其存储分区。
  3. 删除 Cloud Composer 的 Pub/Sub 主题composer-agentcomposer-backend))。

后续步骤