Dataproc クラスタで Hadoop ワードカウント ジョブを実行する

Cloud Composer 1 | Cloud Composer 2

このチュートリアルでは、Cloud Composer を使用して Apache Airflow DAG(有向非巡回グラフ)を作成し、Dataproc クラスタで Apache Hadoop ワードカウント ジョブを実行する方法を説明します。

目標

  1. Cloud Composer 環境にアクセスし、Airflow UI を使用します。
  2. Airflow 環境変数を作成して表示します。
  3. 次のタスクを含む DAG を作成、実行します。
    1. Dataproc クラスタを作成します。
    2. クラスタ上で Apache Hadoop ワードカウント ジョブを実行します。
    3. ワードカウントの結果を Cloud Storage バケットに出力します。
    4. クラスタを削除します。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Cloud Composer
  • Dataproc
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

準備

  • プロジェクトで次の API が有効になっていることを確認します。

    コンソール

    Dataproc, Cloud Storage API を有効にします。

    API を有効にする

    gcloud

    Dataproc, Cloud Storage API を有効にします。

    gcloud services enable dataproc.googleapis.comstorage-component.googleapis.com

  • プロジェクトで、Hadoop ワードカウント ジョブの結果を格納する任意のストレージ クラスとリージョンの Cloud Storage バケットを作成します。

  • 作成したバケットのパスをメモします(gs://example-bucket など)。このパスの Airflow 変数を定義して、この変数をこのチュートリアルの後半の DAG の例で使用します。

  • デフォルトのパラメータで Cloud Composer 環境を作成します。環境の作成が完了するまで待ちます。処理が完了すると、緑色のチェックマークが環境名の左側に表示されます。

  • 環境を作成したリージョンをメモします(us-central など)。このリージョンに対して Airflow 変数を定義し、サンプルの DAG で同じリージョンに Dataproc クラスタを実行します。

Airflow 変数を設定する

後で例の DAG で使用するAirflow 変数を設定します。たとえば、Airflow UI で Airflow 変数を設定できます。

Airflow 変数
gcp_project このチュートリアルで使用しているプロジェクトのプロジェクト IDexample-project など)。
gcs_bucket このチュートリアル用に作成した Cloud Storage バケット(gs://example-bucket. など)。
gce_region 環境を作成したリージョン(us-central1 など)。 これは、Dataproc クラスタが作成されるリージョンです。

ワークフローの例を表示する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。hadoop_tutorial.py に含まれるコードは、ワークフロー コードです。

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

オペレーター

ワークフロー例の 3 つのタスクをオーケストレーションするため、DAG は次の 3 つの Airflow オペレーターをインポートします。

  • DataprocClusterCreateOperator: Dataproc クラスタを作成します。

  • DataProcHadoopOperator: Hadoop のワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。

  • DataprocClusterDeleteOperator: クラスタを削除して、現在使用中の Compute Engine の利用料金が発生しないようにします。

依存関係

実行するタスクを関係と依存状態が反映されるように編成してください。この DAG 内のタスクが順次実行されます。

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

スケジュール設定

DAG の名前は composer_hadoop_tutorial です。この DAG は 1 日 1 回実行されます。 default_dag_args に渡される start_dateyesterday に設定されているため、DAG が環境のバケットにアップロードされた直後に、Cloud Composer によってワークフローがスケジュールされます。

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

DAG を環境のバケットにアップロードする

Cloud Composer は、環境のバケット内の /dags フォルダに DAG を保存します。

DAG をアップロードするには:

  1. ローカルマシン上で hadoop_tutorial.py を保存します。

  2. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  3. 環境のリストにある環境の [DAG のフォルダ] 列で、[DAG] リンクをクリックします。

  4. [ファイルをアップロード] をクリックします。

  5. ローカルマシン上の hadoop_tutorial.py を選択して、[開く] をクリックします。

Cloud Composer により、DAG が Airflow に追加されて自動的にスケジュール設定されます。DAG の変更は 3~5 分以内に行われます。

DAG の実行状況を確認する

タスクのステータスを表示する

DAG ファイルを Cloud Storage の dags/ フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。正常に完了すれば、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。

  1. タスクのステータスを確認するには、Airflow ウェブ インターフェースに移動して、ツールバーの [DAG] をクリックします。

  2. DAG の詳細ページを開くには、composer_hadoop_tutorial をクリックします。このページでは、ワークフローのタスクと依存関係が図で示されます。

  3. 各タスクのステータスを確認するには、[Graph View] をクリックしてから、各タスクのグラフィックにカーソルを合わせます。

ワークフローを再度キューに入れる

グラフビューからワークフローをもう一度実行するには、次の手順を行います。

  1. Airflow UI のグラフビューで、create_dataproc_cluster グラフィックをクリックします。
  2. [Clear] をクリックしてから [OK] をクリックして、3 つのタスクをリセットします。
  3. グラフビューで create_dataproc_cluster をもう一度クリックします。
  4. [Run] をクリックして、ワークフローをもう一度キューに入れます。

タスクの結果を表示する

次の Google Cloud コンソール ページに移動して、composer_hadoop_tutorial ワークフローのステータスと結果を確認することもできます。

  • Dataproc クラスタ: クラスタの作成と削除をモニタリングします。ワークフローによって作成されるクラスタは、一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフロー タスクの一部として削除されます。

    Dataproc クラスタに移動

  • Dataproc ジョブ: Apache Hadoop のワードカウント ジョブを表示またはモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。

    Dataproc ジョブに移動

  • Cloud Storage ブラウザ: このチュートリアル用に作成した Cloud Storage バケット内の wordcount フォルダのワードカウントの結果を表示します。

    Cloud Storage ブラウザに移動

クリーンアップ

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します(環境のバケットを手動で削除します)。

  2. Hadoop ワードカウント ジョブの結果を格納する Cloud Storage バケットを削除します。