Dataproc クラスタでの Hadoop ワードカウント ジョブの実行

このチュートリアルでは、Cloud Composer を使用して Apache Airflow DAG(ワークフロー)を作成する方法を説明します。このワークフローは、Google Cloud Console を使用して Dataproc クラスタ上で Apache Hadoop ワードカウント ジョブを実行します。

目標

  1. Cloud Composer 環境にアクセスし、Airflow ウェブ インターフェースを使用します。
  2. Airflow 環境変数を作成して表示します。
  3. 次のタスクを含む DAG を作成、実行します。
    1. Dataproc クラスタを作成します。
    2. クラスタ上で Apache Hadoop ワードカウント ジョブを実行します。
    3. ワードカウントの結果を Cloud Storage バケットに出力します。
    4. クラスタを削除します。

費用

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Cloud Composer
  • Dataproc
  • Cloud Storage

システムによる環境の作成には最大で 25 分かかります。このチュートリアルの所要時間は約 1 時間です。料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Composer, Cloud Dataproc, and Cloud Storage API を有効にします。

    API を有効にする

  5. プロジェクトで、Hadoop ワードカウント ジョブの結果を格納する任意のストレージ クラスとリージョンの Cloud Storage バケットを作成します。
  6. 作成したバケットのパスをメモします(gs://my-bucket など)。このパスの Airflow 変数を定義して、例の DAG でその変数を使用します。

環境の作成

  1. Cloud Console で、[環境の作成] ページに移動します。

    [環境の作成] ページを開く

  2. [名前] フィールドに「example-environment」と入力します。

  3. [ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。

  4. その他の環境構成オプションには、指定されたデフォルト値を使用します。

  5. 環境を作成するには、[作成] をクリックします。

  6. 環境の作成が完了するまで待ちます。処理が完了すると、緑色のチェックマークが環境名の左側に表示されます。

環境の詳細の表示

環境の作成が完了した後は、環境のデプロイ情報(Cloud Composer と Python のバージョン、Airflow ウェブ インターフェースの URL、Google Kubernetes Engine のクラスタ ID など)を表示できます。

デプロイ情報を表示するには、次の手順を行います。

  1. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  2. [環境の詳細] ページを表示するには、example-environment をクリックします。

  3. 環境を作成したゾーンをメモします(us-central-1c など)。このゾーンの Airflow 変数を定義して、例の DAG の中でその変数を使用します。

Airflow 変数の設定

Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。このステップでは、Airflow ウェブ インターフェースを使用して、後で例の DAG で使用する 3 つの Airflow 変数を設定します。

変数を設定するには、次の手順を行います。

  1. Cloud Console で Airflow ウェブ インターフェースにアクセスします。

    1. Cloud Console で、[環境] ページに移動します。

      [環境] ページを開く

    2. example-environment の [Airflow ウェブサーバー] 列で [Airflow] リンクをクリックします。新しいウィンドウに Airflow ウェブ インターフェースが表示されます。

  2. Airflow ウェブ インターフェースで変数を設定します。

    1. ツールバーで、[Admin] > [Variables] をクリックします。
    2. [Create] をクリックして、新しい変数を作成します。
    3. 次の各変数に対して、Key-Value ペアを入力してから、[Save] をクリックします。すべての Airflow 変数がリストタブに表示されます。
      KEY VALUE
      gcp_project このチュートリアルで使用している Google Cloud Platform プロジェクトのプロジェクト ID(composer-test など)。
      gcs_bucket このチュートリアル用に作成した Cloud Storage バケット(gs://my-bucket など)。
      gce_zone 環境の Compute Engine ゾーン(us-central1-c など)。これは、Dataproc クラスタが作成されるゾーンです。使用可能なリージョンとゾーンをご覧ください。

ワークフローの例の表示

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。 DAG は、標準の Python ファイルで定義されます。hadoop_tutorial.py に含まれるコードは、ワークフロー コードです。

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

演算子

演算子は、ワークフロー内の 1 つのタスクのテンプレートです。ワークフロー例の 3 つのタスクをオーケストレーションするため、DAG は次の 3 つの演算子をインポートします。

  1. DataprocClusterCreateOperator: Dataproc クラスタを作成します。
  2. DataProcHadoopOperator: Hadoop のワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。
  3. DataprocClusterDeleteOperator: クラスタを削除して、現在使用中の Compute Engine の利用料金が発生しないようにします。

依存関係

実行するタスクを関係と依存状態が反映されるように編成してください。この DAG 内のタスクが順次実行されます。 この例では、Python のビットシフト演算子が指す方向(>>)に関係が設定されます。

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

スケジュール

DAG の名前は composer_hadoop_tutorial です。この DAG は 1 日 1 回実行されます。 default_dag_args に渡される start_dateyesterday に設定されているため、DAG のアップロードの直後に開始されるように、Cloud Composer によってこのワークフローのスケジュールが設定されます。

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

DAG の Cloud Storage へのアップロード

Cloud Composer がスケジュールを設定するのは、DAG フォルダ内の DAG のみです。DAG フォルダは、Cloud Composer が環境に対して自動的に作成する Cloud Storage バケット内にあります。

DAG をアップロードするには:

  1. ローカルマシン上で hadoop_tutorial.py を保存します。
  2. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  3. 環境の例の [DAG のフォルダ] 列にある [DAG] リンクをクリックします。Cloud Storage の DAG フォルダが開きます。

  4. [ファイルをアップロード] をクリックします。

  5. ローカルマシン上の hadoop_tutorial.py を選択して、[開く] をクリックします。

Cloud Composer により、DAG が Airflow に追加されて自動的にスケジュール設定されます。DAG の変更は 3~5 分以内に行われます。

DAG の実行状況の確認

タスクのステータスの表示

DAG ファイルを Cloud Storage の dags/ フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。正常に完了すれば、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。

  1. タスクのステータスを確認するには、Airflow ウェブ インターフェースに移動して、ツールバーの [DAG] をクリックします。

  2. DAG の詳細ページを開くには、composer_hadoop_tutorial をクリックします。このページでは、ワークフローのタスクと依存関係が図で示されています。

  3. 各タスクのステータスを確認するには、[Graph View] をクリックしてから、各タスクのグラフィックにカーソルを合わせます。

ワークフローの再キューイング

[Graph View] からワークフローをもう一度実行するには、次の手順を行います。

  1. Airflow UI のグラフビューで、create_dataproc_cluster グラフィックをクリックします。
  2. [Clear] をクリックしてから [OK] をクリックして、3 つのタスクをリセットします。
  3. グラフビューで create_dataproc_cluster をもう一度クリックします。
  4. [Run] をクリックして、ワークフローをもう一度キューに入れます。

タスクの結果の表示

次の Cloud Console ページに移動して、composer_hadoop_tutorial ワークフローのステータスと結果を確認することもできます。

  • Dataproc クラスタ。クラスタの作成と削除をモニタリングします。ワークフローによって作成されるクラスタは、一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフロー タスクの一部として削除されます。

  • Dataproc ジョブ。Apache Hadoop のワードカウント ジョブを表示またはモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。

  • Cloud Storage ブラウザ。このチュートリアル用に作成した Cloud Storage バケット内の wordcount フォルダのワードカウントの結果を表示します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. 削除するプロジェクトが組織に関連付けられている場合、ページ上部の [組織] リストから組織を選択します。
  3. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  4. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

あるいは、このチュートリアルで使用したリソースを削除することもできます。

  1. Cloud Composer 環境を削除します
  2. Cloud Composer 環境の Cloud Storage バケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
  3. Cloud Composer の Pub/Sub トピックを削除します(composer-agentcomposer-backend))。

次のステップ