Cloud Composer 2 で Apache Airflow DAG を実行する
Cloud Composer 1 | Cloud Composer 2
このページでは、Cloud Composer 環境を作成し、Cloud Composer 2 で Apache Airflow DAG を実行する方法について説明します。
Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用方法の詳細を示したこちらのチュートリアルをご覧ください。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud Composer API を有効にします。
環境の作成
コンソール
Google Cloud Console で、[環境の作成] ページに移動します。
プロジェクトで環境を作成するときに、Cloud Composer サービス エージェントに環境のサービス アカウントに対する必要な権限がない場合は、Cloud Composer サービス アカウントに必要な権限を付与するセクションが表示されます。
環境のサービス アカウントの新しいプリンシパルとして Cloud Composer サービス エージェント アカウントを追加し、Cloud Composer v2 API サービス エージェント拡張機能の(
roles/composer.ServiceAgentV2Ext
)ロールを付与します。環境の目的のサービス アカウントを使用していることを確認し、[付与] をクリックします。
[名前] フィールドに「
example-environment
」と入力します。[ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。
その他の環境構成オプションには、指定されたデフォルト値を使用します。
環境を作成するには、[作成] をクリックします。
環境が作成されるまで待ちます。処理が完了すると、緑色のチェックマークが環境名の横に表示されます。
gcloud
環境のサービス アカウントに新しいプリンシパルとして Cloud Composer サービス エージェント アカウントを追加し、Cloud Composer v2 API サービス エージェント拡張機能(roles/composer.ServiceAgentV2Ext
)のロールを付与します。
デフォルトでは、環境ではデフォルトの Compute Engine サービス アカウントを使用します。
# Get current project's project number
PROJECT_NUMBER=gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
新しい環境を作成する:
gcloud composer environments create ENVIRONMENT_NAME \
--location LOCATION \
--image-version IMAGE_VERSION
次のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。このクイックスタートではexample-environment
を使用します。LOCATION
を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。IMAGE_VERSION
は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-2.5.1-airflow-2.6.3
を使用して、最新の Cloud Composer 2 イメージで環境を作成します。
例:
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.5.1-airflow-2.6.3
Terraform
Terraform を使用してこの環境を構成するには、Terraform 構成に次のリソース ブロックを追加して terraform apply
を実行します。
このリソース ブロックを利用するには、Terraform が使用するサービス アカウントに、composer.environments.create
権限が有効になっているロールが割り当てられている必要があります。Terraform のサービス アカウントの詳細については、Google プロバイダの構成リファレンスをご覧ください。
Terraform を使用した Cloud Composer 環境の作成の詳細については、Terraform のドキュメントをご覧ください。
resource "google_composer_environment" "example" {
provider = google-beta
name = "ENVIRONMENT_NAME"
region = "LOCATION"
config {
software_config {
image_version = "IMAGE_VERSION"
}
}
}
ENVIRONMENT_NAME
を環境の名前にする。このクイックスタートではexample-environment
を使用します。LOCATION
を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。IMAGE_VERSION
は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-2.5.1-airflow-2.6.3
を使用して、最新の Cloud Composer 2 イメージで環境を作成します。
例:
resource "google_composer_environment" "example" {
name = "example-environment"
region = "us-central1"
config {
software_config {
image_version = "composer-2.5.1-airflow-2.6.3"
}
}
}
環境の詳細を表示する
環境の作成が完了した後は、環境の情報(Cloud Composer のバージョン、Airflow ウェブ インターフェースの URL、Cloud Storage の DAG フォルダなど)を表示できます。
環境情報を表示するには、次のようにします。
Google Cloud Console で [環境] ページに移動します。
[環境の詳細] ページを表示するには、環境の名前
example-environment
をクリックします。
DAG を作成する
Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。
quickstart.py
の Python コードは、次のとおりです。
- DAG(
composer_sample_dag
)を作成します。DAG は 1 日 1 回実行されます。 - タスク(
print_dag_run_conf
)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。
DAG を作成するには、ローカルマシンに quickstart.py
ファイルのコピーを作成します。
DAG を Cloud Storage にアップロードする
Cloud Composer がスケジュールを設定するのは、環境の Cloud Storage バケット内の /dags
フォルダにある DAG のみです。
DAG のスケジュールを設定するには、quickstart.py
をローカルマシンから環境の /dags
フォルダにアップロードします。
コンソール
Google Cloud Console で [環境] ページに移動します。
example-environment
の DAG フォルダのリンクをクリックして、/dags
フォルダを開きます。バケットの詳細ページで、[ファイルをアップロード] をクリックしてから、
quickstart.py
のローカルコピーを選択します。ファイルをアップロードするには、[開く] をクリックします。
DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG の実行がスケジュールされます。DAG が Airflow ウェブ インターフェースに表示されるまで数分かかる場合があります。
gcloud
gcloud
で quickstart.py
をアップロードするには、次のコマンドを実行します。
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG を Airflow UI に表示する
Cloud Composer 環境ごとに、Airflow ウェブ インターフェースを実行するウェブサーバーを持ちます。Airflow ウェブ インターフェースから DAG を管理できます。
Airflow ウェブ インターフェースで DAG を表示するには、次の手順を行います。
Google Cloud Console で [環境] ページに移動します。
Airflow ウェブ インターフェースを開くには、
example-environment
についての [Airflow] リンクをクリックします。新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。Airflow ツールバーで、[DAGs] ページに移動します。
DAG の詳細ページを開くには、
composer_sample_dag
をクリックします。図 1: Airflow UI の DAG ページ(クリックして拡大) DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。
図 2: composer_sample_dags DAG のツリービュー
Airflow ログのタスク インスタンスの詳細を表示する
スケジュール設定した DAG には、print_dag_run_conf
タスクが含まれています。タスクは、タスク インスタンスの Airflow ログにある DAG 実行の構成を出力します。
タスク インスタンスの詳細を表示するには、次の手順を行います。
Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。
ポインタを
print_dag_run_conf
タスクの上に合わせると、ステータスが表示されます。図 3.print_dag_run_conf タスクのステータス print_dag_run_conf
タスクをクリックします。タスク インスタンスのコンテキスト メニューで、メタデータを取得して、いくつかの操作を実行できます。
図 4. composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー タスク インスタンスのコンテキスト メニューで、[Log] をクリックします。
ログ内で
Running command: ['bash'
を見つけて、bash 演算子からの出力を確認します。[2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo 735'] [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output: [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735 [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with return code 0
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
このチュートリアルで使用したリソースを削除します。
Cloud Composer 環境を削除します。
Google Cloud Console で [環境] ページに移動します。
[
example-environment
] を選択し、[削除] をクリックします。環境が削除されるまで待ちます。
環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。
環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を
us-central1-example-environ-c1616fe8-bucket
にします。
環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。
Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。
環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。
たとえば、このディスクの名前は
pvc-02bc4842-2312-4347-8519-d87bdcd31115
です。Cloud Composer 2 のディスクは常にBalanced persistent disk
タイプで、サイズ 2 GB です。