Cloud Composer 2 で Apache Airflow DAG を実行する
Cloud Composer 1 | Cloud Composer 2
このページでは、Cloud Composer 環境を作成し、Cloud Composer 2 で Apache Airflow DAG を実行する方法について説明します。
Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用方法の詳細を示したこちらのチュートリアルをご覧ください。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Cloud Composer API を有効にします。
環境の作成
Console
Google Cloud Console で、[環境の作成] ページに移動します。
[名前] フィールドに「
example-environment
」と入力します。[ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。
その他の環境構成オプションには、指定されたデフォルト値を使用します。
環境を作成するには、[作成] をクリックします。
環境が作成されるまで待ちます。処理が完了すると、緑色のチェックマークが環境名の横に表示されます。
gcloud
gcloud beta composer environments create ENVIRONMENT_NAME \
--location LOCATION \
--image-version IMAGE_VERSION
次のように置き換えます。
-
ENVIRONMENT_NAME
を環境の名前にする。このクイックスタートではexample-environment
を使用します。 -
LOCATION
を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。 -
IMAGE_VERSION
は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-2.0.9-airflow-2.2.3
を使用して、最新の Cloud Composer 2 イメージを含む環境を作成します。
例:
gcloud beta composer environments create example-environment \
--location us-central1 \
--image-version composer-2.0.9-airflow-2.2.3
Terraform
Terraform を使用してこの環境を構成するには、Terraform 構成に次のリソース ブロックを追加して terraform apply
を実行します。
Terraform を使用した Cloud Composer 環境の作成の詳細については、Terraform のドキュメントをご覧ください。
resource "google_composer_environment" "example" {
provider = google-beta
name = "ENVIRONMENT_NAME"
region = "LOCATION"
config {
software_config {
image_version = "IMAGE_VERSION"
}
}
}
-
ENVIRONMENT_NAME
を環境の名前にする。このクイックスタートではexample-environment
を使用します。 -
LOCATION
を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。 -
IMAGE_VERSION
は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-2.0.9-airflow-2.2.3
を使用して、最新の Cloud Composer 2 イメージを含む環境を作成します。
例:
resource "google_composer_environment" "example" {
name = "example-environment"
region = "us-central1"
config {
software_config {
image_version = "composer-2.0.9-airflow-2.2.3"
}
}
}
環境の詳細を表示する
環境の作成が完了した後は、環境の情報(Cloud Composer のバージョン、Airflow ウェブ インターフェースの URL、Cloud Storage の DAG フォルダなど)を表示できます。
環境情報を表示するには、次のようにします。
Google Cloud Console で [環境] ページに移動します。
[環境の詳細] ページを表示するには、環境の名前
example-environment
をクリックします。
DAG を作成する
Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。
quickstart.py
の Python コードは、次のとおりです。
- DAG(
composer_sample_dag
)を作成します。DAG は 1 日 1 回実行されます。 - タスク(
print_dag_run_conf
)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。
DAG を作成するには、ローカルマシンに quickstart.py
ファイルのコピーを作成します。
DAG を Cloud Storage にアップロードする
Cloud Composer がスケジュールを設定するのは、環境の Cloud Storage バケット内の /dags
フォルダにある DAG のみです。
DAG のスケジュールを設定するには、quickstart.py
をローカルマシンから環境の /dags
フォルダにアップロードします。
Console
Google Cloud Console で [環境] ページに移動します。
example-environment
の DAG フォルダのリンクをクリックして、/dags
フォルダを開きます。バケットの詳細ページで、[ファイルをアップロード] をクリックしてから、
quickstart.py
のローカルコピーを選択します。ファイルをアップロードするには、[開く] をクリックします。
DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG の実行がスケジュールされます。DAG が Airflow ウェブ インターフェースに表示されるまで数分かかる場合があります。
gcloud
gcloud
で quickstart.py
をアップロードするには、次のコマンドを実行します。
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG を Airflow UI に表示する
Cloud Composer 環境ごとに、Airflow ウェブ インターフェースを実行するウェブサーバーを持ちます。Airflow ウェブ インターフェースから DAG を管理できます。
Airflow ウェブ インターフェースで DAG を表示するには、次の手順を行います。
Google Cloud Console で [環境] ページに移動します。
Airflow ウェブ インターフェースを開くには、
example-environment
についての [Airflow] リンクをクリックします。新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。Airflow ツールバーで、[DAGs] ページに移動します。
DAG の詳細ページを開くには、
composer_sample_dag
をクリックします。DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。
Airflow ログのタスク インスタンスの詳細を表示する
スケジュール設定した DAG には、print_dag_run_conf
タスクが含まれています。タスクは、タスク インスタンスの Airflow ログにある DAG 実行の構成を出力します。
タスク インスタンスの詳細を表示するには、次の手順を行います。
Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。
ポインタを
print_dag_run_conf
タスクの上に合わせると、ステータスが表示されます。print_dag_run_conf
タスクをクリックします。タスク インスタンスのコンテキスト メニューで、メタデータを取得して、いくつかの操作を実行できます。
タスク インスタンスのコンテキスト メニューで、[Log] をクリックします。
ログ内で
Running command: ['bash'
を見つけて、bash 演算子からの出力を確認します。[2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo 735'] [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output: [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735 [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with return code 0
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
このチュートリアルで使用したリソースを削除します。
Cloud Composer 環境を削除します。
Google Cloud Console で [環境] ページに移動します。
[
example-environment
] を選択し、[削除] をクリックします。環境が削除されるまで待ちます。
環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。
環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を
us-central1-example-environ-c1616fe8-bucket
にします。
環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。
Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。
環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。
たとえば、このディスクの名前は
pvc-02bc4842-2312-4347-8519-d87bdcd31115
です。Cloud Composer 2 のディスクは常にBalanced persistent disk
タイプで、サイズ 2 GB です。