このチュートリアルでは、Cloud Composer を使用して Apache Airflow DAG(ワークフロー)を作成する方法を説明します。このワークフローは、Google Cloud Console を使用して Dataproc クラスタ上で Apache Hadoop ワードカウント ジョブを実行します。
目標
- Cloud Composer 環境にアクセスし、Airflow ウェブ インターフェースを使用します。
- Airflow 環境変数を作成して表示します。
- 次のタスクを含む DAG を作成、実行します。
- Dataproc クラスタを作成します。
- クラスタ上で Apache Hadoop ワードカウント ジョブを実行します。
- ワードカウントの結果を Cloud Storage バケットに出力します。
- クラスタを削除します。
費用
このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。
- Cloud Composer
- Dataproc
- Cloud Storage
システムによる環境の作成には最大で 25 分かかります。このチュートリアルの所要時間は約 1 時間です。料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。
始める前に
- Google アカウントにログインします。
Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する。
- Cloud Composer, Cloud Dataproc, and Cloud Storage API を有効にします。
- プロジェクトで、Hadoop ワードカウント ジョブの結果を格納する任意のストレージ クラスとリージョンの Cloud Storage バケットを作成します。
- 作成したバケットのパスをメモします(
gs://my-bucket
など)。このパスの Airflow 変数を定義して、例の DAG でその変数を使用します。
環境の作成
Cloud Console で、[環境の作成] ページに移動します。
[名前] フィールドに「
example-environment
」と入力します。[ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。
その他の環境構成オプションには、指定されたデフォルト値を使用します。
環境を作成するには、[作成] をクリックします。
環境の作成が完了するまで待ちます。処理が完了すると、緑色のチェックマークが環境名の左側に表示されます。
環境の詳細の表示
環境の作成が完了した後は、環境のデプロイ情報(Cloud Composer と Python のバージョン、Airflow ウェブ インターフェースの URL、Google Kubernetes Engine のクラスタ ID など)を表示できます。
デプロイ情報を表示するには、次の手順を行います。
Cloud Console で、[環境] ページに移動します。
[環境の詳細] ページを表示するには、
example-environment
をクリックします。環境を作成したゾーンをメモします(
us-central-1c
など)。このゾーンの Airflow 変数を定義して、例の DAG の中でその変数を使用します。
Airflow 変数の設定
Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。このステップでは、Airflow ウェブ インターフェースを使用して、後で例の DAG で使用する 3 つの Airflow 変数を設定します。
変数を設定するには、次の手順を行います。
Cloud Console で Airflow ウェブ インターフェースにアクセスします。
Cloud Console で、[環境] ページに移動します。
example-environment
の [Airflow ウェブサーバー] 列で [Airflow] リンクをクリックします。新しいウィンドウに Airflow ウェブ インターフェースが表示されます。
Airflow ウェブ インターフェースで変数を設定します。
- ツールバーで、[Admin] > [Variables] をクリックします。
- [Create] をクリックして、新しい変数を作成します。
- 次の各変数に対して、Key-Value ペアを入力してから、[Save] をクリックします。すべての Airflow 変数がリストタブに表示されます。
KEY VALUE gcp_project このチュートリアルで使用している Google Cloud Platform プロジェクトのプロジェクト ID( composer-test
など)。gcs_bucket このチュートリアル用に作成した Cloud Storage バケット( gs://my-bucket
など)。gce_zone 環境の Compute Engine ゾーン( us-central1-c
など)。これは、Dataproc クラスタが作成されるゾーンです。使用可能なリージョンとゾーンをご覧ください。
ワークフローの例の表示
Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。
DAG は、標準の Python ファイルで定義されます。hadoop_tutorial.py
に含まれるコードは、ワークフロー コードです。
演算子
演算子は、ワークフロー内の 1 つのタスクのテンプレートです。ワークフロー例の 3 つのタスクをオーケストレーションするため、DAG は次の 3 つの演算子をインポートします。
DataprocClusterCreateOperator
: Dataproc クラスタを作成します。DataProcHadoopOperator
: Hadoop のワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。DataprocClusterDeleteOperator
: クラスタを削除して、現在使用中の Compute Engine の利用料金が発生しないようにします。
依存関係
実行するタスクを関係と依存状態が反映されるように編成してください。この DAG 内のタスクが順次実行されます。
この例では、Python のビットシフト演算子が指す方向(>>
)に関係が設定されます。
スケジュール
DAG の名前は composer_hadoop_tutorial
です。この DAG は 1 日 1 回実行されます。
default_dag_args
に渡される start_date
が yesterday
に設定されているため、DAG のアップロードの直後に開始されるように、Cloud Composer によってこのワークフローのスケジュールが設定されます。
DAG の Cloud Storage へのアップロード
Cloud Composer がスケジュールを設定するのは、DAG フォルダ内の DAG のみです。DAG フォルダは、Cloud Composer が環境に対して自動的に作成する Cloud Storage バケット内にあります。
DAG をアップロードするには:
- ローカルマシン上で
hadoop_tutorial.py
を保存します。 Cloud Console で、[環境] ページに移動します。
環境の例の [DAG のフォルダ] 列にある [DAG] リンクをクリックします。Cloud Storage の DAG フォルダが開きます。
[ファイルをアップロード] をクリックします。
ローカルマシン上の
hadoop_tutorial.py
を選択して、[開く] をクリックします。
Cloud Composer により、DAG が Airflow に追加されて自動的にスケジュール設定されます。DAG の変更は 3~5 分以内に行われます。
DAG の実行状況の確認
タスクのステータスの表示
DAG ファイルを Cloud Storage の dags/
フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。正常に完了すれば、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。
タスクのステータスを確認するには、Airflow ウェブ インターフェースに移動して、ツールバーの [DAG] をクリックします。
DAG の詳細ページを開くには、
composer_hadoop_tutorial
をクリックします。このページでは、ワークフローのタスクと依存関係が図で示されています。各タスクのステータスを確認するには、[Graph View] をクリックしてから、各タスクのグラフィックにカーソルを合わせます。
ワークフローの再キューイング
[Graph View] からワークフローをもう一度実行するには、次の手順を行います。
- Airflow UI のグラフビューで、
create_dataproc_cluster
グラフィックをクリックします。 - [Clear] をクリックしてから [OK] をクリックして、3 つのタスクをリセットします。
- グラフビューで
create_dataproc_cluster
をもう一度クリックします。 - [Run] をクリックして、ワークフローをもう一度キューに入れます。

タスクの結果の表示
次の Cloud Console ページに移動して、composer_hadoop_tutorial
ワークフローのステータスと結果を確認することもできます。
Dataproc クラスタ。クラスタの作成と削除をモニタリングします。ワークフローによって作成されるクラスタは、一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフロー タスクの一部として削除されます。
Dataproc ジョブ。Apache Hadoop のワードカウント ジョブを表示またはモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
Cloud Storage ブラウザ。このチュートリアル用に作成した Cloud Storage バケット内の
wordcount
フォルダのワードカウントの結果を表示します。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。
- Cloud Console で [リソースの管理] ページに移動します。
- 削除するプロジェクトが組織に関連付けられている場合、ページ上部の [組織] リストから組織を選択します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
あるいは、このチュートリアルで使用したリソースを削除することもできます。
- Cloud Composer 環境を削除します。
- Cloud Composer 環境の Cloud Storage バケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
- Cloud Composer の Pub/Sub トピックを削除します(
composer-agent
とcomposer-backend)
)。
次のステップ
- チュートリアルに取り組みます。
- Cloud Composer の概要をご覧いただけます。
- Google Cloud のその他の機能を試す。チュートリアルをご覧ください。
- Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用方法の詳細を示したこちらの Airflow ウェブサイトのチュートリアルをご覧ください。