在 Dataproc 集群上运行 Hadoop WordCount 作业

Cloud Composer 1 | Cloud Composer 2

本教程介绍如何使用 Cloud Composer 创建在 Dataproc 集群上运行 Apache Hadoop Wordcount 作业的 Apache Airflow DAG(有向无环图)。

目标

  1. 访问您的 Cloud Composer 环境并使用 Airflow 界面
  2. 创建和查看 Airflow 环境变量。
  3. 创建和运行包含以下任务的 DAG
    1. 创建 Dataproc 集群。
    2. 在集群上运行 Apache Hadoop 字数统计作业。
    3. 将字数统计结果输出到 Cloud Storage 存储桶。
    4. 删除集群。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  • 确保您的项目中启用了以下 API:

    控制台

    启用 Dataproc, Cloud Storage API。

    启用 API

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • 在您的项目中,为任何存储类别和区域创建 Cloud Storage 存储桶,用于存储 Hadoop 字数统计作业的结果。

  • 记下您创建的存储桶的路径,例如 gs://example-bucket。在本教程后面部分,您将为此路径定义 Airflow 变量,并在示例 DAG 中使用该变量。

  • 使用默认参数创建 Cloud Composer 环境。等待环境创建操作完成。完成后,相应环境名称左侧将显示绿色对勾标记。

  • 记下您创建了环境的区域,例如 us-central。您将为此区域定义一个 Airflow 变量,并在示例 DAG 中使用该变量在同一区域中运行 Dataproc 集群。

设置 Airflow 变量

设置 Airflow 变量,以便稍后在示例 DAG 中使用。例如,您可以在 Airflow 界面中设置 Airflow 变量。

Airflow 变量
gcp_project 您在本教程中使用的项目的项目 ID,例如 example-project
gcs_bucket 您为本教程创建的 URI Cloud Storage 存储桶,例如 gs://example-bucket.
gce_region 您在其中创建了环境的区域,例如 us-central1。这是将创建 Dataproc 集群的区域。

查看示例工作流程

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。hadoop_tutorial.py 中显示的代码是工作流代码。

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

运算符

为了编排示例工作流中的三项任务,DAG 导入以下三个 Airflow 运算符:

  • DataprocClusterCreateOperator:用于创建 Dataproc 集群。

  • DataProcHadoopOperator:提交 Hadoop Wordcount 作业并将结果写入 Cloud Storage 存储桶。

  • DataprocClusterDeleteOperator:删除集群,以避免持续产生 Compute Engine 费用。

依赖项

您可以反映任务之间的关系和依赖关系来组织要运行的任务。此 DAG 中的任务会按顺序运行。

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

调度

此 DAG 的名称为 composer_hadoop_tutorial,此 DAG 每天运行一次。由于传入 default_dag_argsstart_date 设置为 yesterday,因此 Cloud Composer 会安排工作流在 DAG 上传到环境存储桶后立即启动。

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

将该 DAG 上传到环境的存储桶

Cloud Composer 将 DAG 存储在环境存储桶的 /dags 文件夹中。

如需上传 DAG,请执行以下操作:

  1. 在本地机器上,保存 hadoop_tutorial.py

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表的 DAGs 文件夹列中,点击环境对应的 DAGs 链接。

  4. 点击上传文件

  5. 在本地机器上选择 hadoop_tutorial.py,然后点击打开

Cloud Composer 会将此 DAG 添加到 Airflow,并自动安排此 DAG。DAG 会在 3 到 5 分钟内发生更改。

探索 DAG 运行作业

查看任务状态

当您将 DAG 文件上传到 Cloud Storage 中的 dags/ 文件夹时,Cloud Composer 会解析该文件。成功完成后,工作流的名称会显示在 DAG 列表中,并且工作流会加入队列以立即运行。

  1. 如需查看任务状态,请转到 Airflow 网页界面并点击工具栏中的 DAGs

  2. 如需打开 DAG 详情页面,请点击 composer_hadoop_tutorial。此页面包含工作流任务和依赖项的图形表示。

  3. 如需查看每个任务的状态,请点击 Graph View,然后将鼠标悬停在每个任务对应的图形上。

再次将工作流加入队列

如需从 Graph View 重新运行工作流,请执行以下操作:

  1. 在 Airflow 界面的“Graph View”中,点击 create_dataproc_cluster 图形。
  2. 如需重置三个任务,请点击 Clear,然后点击 OK 进行确认。
  3. 在“Graph View”中,再次点击 create_dataproc_cluster
  4. 如需将工作流重新加入队列,请点击 Run

查看任务结果

您还可以转到以下 Google Cloud 控制台页面,查看 composer_hadoop_tutorial 工作流的状态和结果:

  • Dataproc 集群:用于监控集群的创建和删除。请注意,由工作流创建的集群是临时的:该集群仅在工作流期间存在,并会在最后一个工作流任务中被删除。

    转到 Dataproc 集群

  • Dataproc 作业:查看或监控 Apache Hadoop wordcount 作业。点击作业 ID 以查看作业日志输出。

    转到 Dataproc 作业

  • Cloud Storage 浏览器:可在您为本教程创建的 Cloud Storage 存储桶的 wordcount 文件夹中查看 Wordcount 的结果。

    转到 Cloud Storage 浏览器

清理

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境,包括手动删除该环境的存储桶。

  2. 删除 Cloud Storage 存储桶(用于存储 Hadoop 字数统计作业的结果)。