在 Cloud Dataproc 叢集中執行 Hadoop 字數計算工作

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程說明如何使用 Cloud Composer 建立 Apache Airflow DAG (有向無環圖),在 Dataproc 叢集上執行 Apache Hadoop 字數統計工作。

目標

  1. 存取 Cloud Composer 環境並使用 Airflow UI
  2. 建立及查看 Airflow 環境變數。
  3. 建立並執行包含下列工作的 DAG
    1. 建立 Dataproc 叢集。
    2. 在叢集上執行 Apache Hadoop 字數計算工作。
    3. 將字數統計結果輸出至 Cloud Storage bucket。
    4. 刪除叢集。

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

事前準備

  • 請確認專案已啟用下列 API:

    主控台

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • 在專案中建立 Cloud Storage bucket,儲存類別和區域不限,用來儲存 Hadoop 字數統計工作的結果。

  • 請記下您建立的值區路徑,例如 gs://example-bucket。您將為這個路徑定義 Airflow 變數,並在本教學課程稍後的範例 DAG 中使用該變數。

  • 建立 Cloud Composer 環境,並使用預設參數。等待環境建立完成。完成後,環境名稱左側會顯示綠色勾號。

  • 請記下您建立環境的區域,例如 us-central。您將為這個地區定義 Airflow 變數,並在範例 DAG 中使用該變數,在同一個地區執行 Dataproc 叢集。

設定 Airflow 變數

設定 Airflow 變數,供範例 DAG 後續使用。舉例來說,您可以在 Airflow UI 中設定 Airflow 變數。

Airflow 變數
gcp_project 您在本教學課程中使用的專案專案 ID,例如 example-project
gcs_bucket 您為本教學課程建立的 URI Cloud Storage bucket,例如 gs://example-bucket.
gce_region 建立環境的地區,例如 us-central1。 這是要建立 Dataproc 叢集的區域。

查看範例工作流程

Airflow DAG 是您要排定及執行的有組織工作集合。DAG 定義於標準 Python 檔案中,hadoop_tutorial.py 中的程式碼是工作流程的程式碼。

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

運算子

為了自動化調度管理範例工作流程中的三項工作,DAG 會匯入下列三項 Airflow 運算子:

  • DataprocClusterCreateOperator:建立 Dataproc 叢集。

  • DataProcHadoopOperator:提交 Hadoop 字數統計工作,並將結果寫入 Cloud Storage 值區。

  • DataprocClusterDeleteOperator:刪除叢集,避免產生持續的 Compute Engine 費用。

依附元件

您會以反映工作關係和依附元件的方式,安排要執行的工作。這個 DAG 中的工作會依序執行。

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

排程

DAG 名稱為 composer_hadoop_tutorial,且每天執行一次。由於傳遞至 default_dag_argsstart_date 設為 yesterday,因此 Cloud Composer 會排定工作流程,在 DAG 上傳至環境的 bucket 後立即啟動。

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

將 DAG 上傳至環境的 bucket

Cloud Composer 會將 DAG 儲存在環境 bucket 的 /dags 資料夾中。

如要上傳 DAG:

  1. 在本機電腦上儲存 hadoop_tutorial.py

  2. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  3. 在環境清單中,按一下環境的「DAGs folder」欄位中的「DAGs」連結。

  4. 按一下「上傳」檔案。

  5. 選取本機上的 hadoop_tutorial.py,然後按一下「開啟」

Cloud Composer 會將 DAG 新增至 Airflow,並自動排定 DAG 的執行時間。DAG 會在 3 到 5 分鐘內產生變化。

探索 DAG 執行作業

查看工作狀態

將 DAG 檔案上傳到 Cloud Storage 中的 dags/ 資料夾後,Cloud Composer 會剖析檔案。成功完成後,工作流程名稱會顯示在 DAG 清單中,且工作流程會排入要立即執行的作業佇列。

  1. 如要查看工作狀態,請前往 Airflow 網頁介面,然後點選工具列中的「DAGs」

  2. 如要開啟 DAG 詳細資料頁面,請按一下 composer_hadoop_tutorial。此頁面會透過圖形呈現工作流程工作和依附元件。

  3. 如要查看各項工作的狀態,請按一下「圖表檢視」,然後將滑鼠游標懸停在各項工作的圖表上。

再次將工作流程加入佇列

如要再次從「圖表檢視畫面」執行工作流程:

  1. 在 Airflow 使用者介面的「圖表」檢視畫面中,按一下 create_dataproc_cluster 圖示。
  2. 如要重設這三項工作,請按一下「清除」,然後按一下「確定」確認。
  3. 在「圖表」檢視畫面中再次點選 create_dataproc_cluster
  4. 如要再次將工作流程排入佇列,請按一下「執行」

查看工作結果

您也可以前往下列控制台頁面,查看工作流程的狀態和結果:composer_hadoop_tutorial Google Cloud

  • Dataproc 叢集:監控叢集建立和刪除作業。請注意,工作流程建立的叢集是暫時性的,只會在工作流程執行期間存在,並在最後一個工作流程工作完成後刪除。

    前往「Dataproc Clusters」(Dataproc 叢集) 頁面

  • Dataproc 工作:查看或監控 Apache Hadoop 字數統計工作。按一下「工作 ID」即可查看工作記錄輸出內容。

    前往 Dataproc 工作

  • Cloud Storage 瀏覽器:查看您為本教學課程建立的 Cloud Storage bucket 中 wordcount 資料夾內的字數統計結果。

    前往 Cloud Storage 瀏覽器

清除所用資源

刪除本教學課程中使用的資源

  1. 刪除 Cloud Composer 環境,包括手動刪除環境的值區。

  2. 刪除 Cloud Storage bucket,其中儲存了 Hadoop 字數統計工作的結果。