Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、Functions を使用してイベントに応答して Cloud Composer DAG をトリガーする方法について説明します。
Apache Airflow では定期的なスケジュールで DAG が実行されるように設計されていますが、イベントに応答して DAG をトリガーすることもできます。これを行う方法の一つとして、Cloud Run Functions を使用して、指定されたイベントの発生時に Cloud Composer DAG をトリガーする方法があります。
このガイドの例では、Cloud Storage バケットで変更が生じるたびに DAG を実行します。バケット内のオブジェクトが変更されると、関数がトリガーされます。この関数は、Cloud Composer 環境の Airflow REST API にリクエストを行います。Airflow はこのリクエストを処理して DAG を実行します。DAG は変更に関する情報を出力します。
始める前に
環境のネットワーク構成を確認する
このソリューションは、プライベート IP と VPC Service Controls の構成では機能しません。これらの構成では、Cloud Run Functions から Airflow ウェブサーバーへの接続を構成できないためです。
Cloud Composer 2 では、別のアプローチ(Cloud Run Functions と Pub/Sub メッセージを使用して DAG をトリガーする)を使用できます。
プロジェクトでAPI を有効にする
コンソール
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Airflow REST API を有効にする
Airflow のバージョンに応じて、以下のように操作します。
- Airflow 2 の場合、安定版の REST API はデフォルトですでに有効になっています。環境で安定した API が無効になっている場合は、安定版の REST API を有効にします。
- Airflow 1 の場合は、試験運用版の REST API を有効にします。
Webserver Access Control を使用して Airflow REST API への API 呼び出しを許可する
Cloud Run Functions は、IPv4 または IPv6 アドレスを使用して Airflow REST API にアクセスできます。
呼び出し元の IP 範囲がわからない場合は、ウェブサーバーのアクセス制御のデフォルトの構成オプション All IP addresses have access (default)
を使用して、Cloud Run Functions が誤ってブロックされないようにします。
Cloud Storage バケットを作成する
この例では Cloud Storage バケットの変更に応答して DAG をトリガーするため、この例で使用する新しいバケットを作成します。
Airflow ウェブサーバーの URL を取得する
この例では、Airflow ウェブサーバー エンドポイントに REST API リクエストを行います。Cloud Functions の関数のコードでは、.appspot.com
より前の部分で Airflow ウェブ インターフェース URL の一部を使用します。
コンソール
Google Cloud Console で [環境] ページに移動します。
環境の名前をクリックします。
[環境の詳細] ページで [環境の構成] タブに移動します。
[Airflow ウェブ UI] 項目に Airflow ウェブサーバーの URL が表示されます。
gcloud
次のコマンドを実行します。
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
次のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
IAM プロキシの client_id を取得する
Airflow REST API エンドポイントにリクエストを送信するには、Airflow ウェブサーバーを保護する Identity and Access Management プロキシのクライアント ID が関数に必要です。
Cloud Composer は、この情報を直接提供しません。代わりに、認証されていないリクエストを Airflow ウェブサーバーに送信し、リダイレクト URL からクライアント ID を取得します。
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
AIRFLOW_URL
を Airflow ウェブ インターフェースの URL に置き換えます。
出力で、client_id
に続く文字列を検索します。次に例を示します。
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
次の内容を get_client_id.py
という名前のコードに保存します。
project_id
、location
、composer_environment
の値を入力し、Cloud Shell またはローカル環境でコードを実行します。
DAG をお使いの環境にアップロードする
DAG をお使いの環境にアップロードする次の DAG の例では、受信した DAG 実行構成を出力します。このガイドの後半で作成する関数から、この DAG をトリガーします。
DAG をトリガーする Cloud Functions をデプロイする
Cloud Run Functions または Cloud Run でサポートされている言語を使用して、Cloud Functions の関数をデプロイできます。このチュートリアルでは、Python と Java で実装された Cloud Functions の関数について説明します。
Cloud Function 構成パラメータを指定する
トリガー。この例では、バケットに新しいオブジェクトが作成されたとき、または既存のオブジェクトが上書きされたときに動作するトリガーを選択します。
トリガーのタイプ。Cloud Storage。
イベントのタイプ。ファイナライズ / 作成。
バケット。この関数をトリガーする必要があるバケットを選択します。
失敗時に再試行するこの例では、このオプションを無効にすることをおすすめします。本番環境で独自の関数を使用する場合は、このオプションを有効にして一時的なエラーを処理します。
[ランタイム、ビルド、接続、セキュリティ設定] セクションの [ランタイム サービス アカウント]。希望に応じて、次のいずれかのオプションを使用します。
Compute Engine のデフォルトのサービス アカウントを選択します。デフォルトの IAM 権限では、このアカウントが Cloud Composer 環境にアクセスする関数を実行できます。
Composer ユーザーのロールを持つカスタム サービス アカウントを作成し、この関数のランタイム サービス アカウントとして指定します。このオプションは、最小権限の原則に従います。
[コード] ステップの [ランタイムとエントリ ポイント]。この例のコードを追加する際は、Python 3.7 以降のランタイムを選択し、エントリ ポイントとして
trigger_dag
を指定します。
要件を追加する
requirements.txt
ファイルで依存関係を指定します。
main.py
ファイルに次のコードを追加し、次のように置き換えます。
client_id
変数の値を、前の手順で取得したclient_id
の値に置き換えます。webserver_id
変数の値をテナント プロジェクト ID に置き換えます。これは、.appspot.com
より前の Airflow ウェブ インターフェース URL の一部です。以前に Airflow ウェブ インターフェース URL を取得しています。使用する Airflow REST API のバージョンを指定します。
- 安定版の Airflow REST API を使用する場合は、
USE_EXPERIMENTAL_API
変数をFalse
に設定します。 - 試験運用版の Airflow REST API を使用している場合は、変更する必要はありません。
USE_EXPERIMENTAL_API
変数はすでにTrue
に設定されています。
- 安定版の Airflow REST API を使用する場合は、
関数をテストする
関数と DAG が意図したとおりに機能することを確認するには、
- 関数がデプロイされるまで待ちます。
- Cloud Storage バケットにファイルをアップロードします。別の方法として、Google Cloud コンソールで [関数をテストする] アクションを選択して、関数を手動でトリガーすることもできます。
- Airflow ウェブ インターフェースの DAG ページを確認します。DAG には、有効な、またはすでに完了した DAG 実行が 1 つ必要です。
- Airflow UI で、この実行のタスクログを確認します。
print_gcs_info
タスクが、関数から受信したデータをログに出力することがわかります。
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h