Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、Functions を使用してイベントに応答して Cloud Composer DAG をトリガーする方法について説明します。
Apache Airflow では定期的なスケジュールで DAG が実行されるように設計されていますが、イベントに応答して DAG をトリガーすることもできます。これを行う方法の一つとして、Cloud Run Functions を使用して、指定されたイベントの発生時に Cloud Composer DAG をトリガーする方法があります。
このガイドの例では、Cloud Storage バケットで変更が生じるたびに DAG を実行します。バケット内のオブジェクトが変更されると、関数がトリガーされます。この関数は、Cloud Composer 環境の Airflow REST API にリクエストを行います。Airflow はこのリクエストを処理して DAG を実行します。DAG は変更に関する情報を出力します。
始める前に
環境のネットワーク構成を確認する
このソリューションは、プライベート IP と VPC Service Controls の構成では機能しません。これらの構成では、Cloud Run Functions から Airflow ウェブサーバーへの接続を構成できないためです。
Cloud Composer 2 では、別のアプローチ(Cloud Run Functions と Pub/Sub メッセージを使用して DAG をトリガーする)を使用できます。
プロジェクトでAPI を有効にする
コンソール
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Airflow REST API を有効にする
Airflow 2 の場合、安定版の REST API はデフォルトですでに有効になっています。環境で安定した API が無効になっている場合は、安定版の REST API を有効にします。
Webserver Access Control を使用して Airflow REST API への API 呼び出しを許可する
Cloud Run Functions は、IPv4 または IPv6 アドレスを使用して Airflow REST API にアクセスできます。
呼び出し元の IP 範囲がわからない場合は、ウェブサーバーのアクセス制御のデフォルトの構成オプション All IP addresses have access (default)
を使用して、Cloud Run Functions が誤ってブロックされないようにします。
Cloud Storage バケットを作成する
この例では Cloud Storage バケットの変更に応答して DAG をトリガーするため、この例で使用する新しいバケットを作成します。
Airflow ウェブサーバーの URL を取得する
この例では、Airflow ウェブサーバー エンドポイントに REST API リクエストを送信します。Cloud Functions 関数のコードで Airflow ウェブサーバーの URL を使用します。
コンソール
Google Cloud Console で [環境] ページに移動します。
環境の名前をクリックします。
[環境の詳細] ページで [環境の構成] タブに移動します。
[Airflow ウェブ UI] 項目に Airflow ウェブサーバーの URL が表示されます。
gcloud
次のコマンドを実行します。
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUr
i)'
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
DAG をお使いの環境にアップロードする
DAG をお使いの環境にアップロードする次の DAG の例では、受信した DAG 実行構成を出力します。このガイドの後半で作成する関数から、この DAG をトリガーします。
DAG をトリガーする Cloud Functions をデプロイする
Cloud Run Functions または Cloud Run でサポートされている言語を使用して、Cloud Functions の関数をデプロイできます。このチュートリアルでは、Python と Java で実装された Cloud Functions の関数について説明します。
Cloud Function 構成パラメータを指定する
トリガー。この例では、バケットに新しいオブジェクトが作成されたとき、または既存のオブジェクトが上書きされたときに動作するトリガーを選択します。
トリガーのタイプ。Cloud Storage。
イベントのタイプ。ファイナライズ / 作成。
バケット。この関数をトリガーする必要があるバケットを選択します。
失敗時に再試行するこの例では、このオプションを無効にすることをおすすめします。本番環境で独自の関数を使用する場合は、このオプションを有効にして一時的なエラーを処理します。
[ランタイム、ビルド、接続、セキュリティ設定] セクションの [ランタイム サービス アカウント]。希望に応じて、次のいずれかのオプションを使用します。
Compute Engine のデフォルトのサービス アカウントを選択します。デフォルトの IAM 権限では、このアカウントが Cloud Composer 環境にアクセスする関数を実行できます。
Composer ユーザーのロールを持つカスタム サービス アカウントを作成し、この関数のランタイム サービス アカウントとして指定します。このオプションは、最小権限の原則に従います。
[コード] ステップの [ランタイムとエントリ ポイント]。
(Python)この例のコードを追加する際は、Python 3.7 以降のランタイムを選択し、エントリ ポイントとして
trigger_dag_gcf
を指定します。(Java)この例のコードを追加する際は、Java 17 ランタイムを選択し、エントリ ポイントとして
com.example.Example
を指定します。
要件を追加する
Python
requirements.txt
ファイルで依存関係を指定します。
Java
Google Cloud Functions UI で生成された pom.xml
の dependencies
セクションに、次の依存関係を追加します。
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-docs</artifactId>
<version>v1-rev20210707-1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.14.0</version>
</dependency>
Python
Airflow REST API を使用して DAG をトリガーするコードを追加します。composer2_airflow_rest_api.py
という名前のファイルを作成し、Airflow REST API 呼び出しを行うコードをこのファイルに配置します。
変数を変更しないでください。Cloud Functions は、main.py
ファイルからこのファイルをインポートします。
main.py
ファイルに次のコードを入力します。web_server_url
変数の値を、前の手順で取得した Airflow ウェブサーバーのアドレスに置き換えます。
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'composer
_sample_trigger_response_dag'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
Java
Example.java
ファイルに次のコードを入力します。webServerUrl
変数の値を、前の手順で取得した Airflow ウェブサーバーのアドレスに置き換えます。
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.example;
import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
/**
* Cloud Function that triggers an Airflow DAG in response to an event (in
* this case a Cloud Storage event).
*/
public class Example implement<s Backgr>oundFunctionGcsEvent {
private static final Logger logger = Logger.getLogger(Example.class.getName());
// TODO(developer): replace with your values
// Replace webServerUrl with the Airflow web server address. To obtain this
// URL, run the following command for your environment:
// gcloud composer environments describe example-environment \
// --location=your-composer-region \
// --format="value(config.airflowUri)"
@Override
public void accept(GcsEvent event, Context context) throws Exception {
String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
String dagName = "composer_sample_trigger_response_dag";
String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);
logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
dagName, event.name));
logger.info(String.format("Triggering DAG via the following URL: %s", url));
GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
HttpRequestFactory reques<tFactory =
< new NetH>>ttpTransport().crea<teRequestFa<ctory(credenti>>alsAdapter)<;
MapStri>ng, MapString, Stri<ng json = new >HashMapString, MapString, String();
MapString, String conf = new HashMapString, String();
conf.put("bucket", event.bucket);
conf.put("name", event.name);
conf.put("generation", event.generation);
conf.put("operation", context.eventType());
json.put("conf", conf);
HttpContent content = new JsonHttpContent(new GsonFactory(), json);
HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
request.getHeaders().setContentType("application/json");
HttpResponse response;
try {
response = request.execute();
int statusCode = response.getStatusCode();
logger.info("Response code: " + statusCode);
logger.info(response.parseAsString());
} catch (HttpResponseException e) {
// https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
logger.info("Received HTTP exception");
logger.info(e.getLocalizedMessage());
logger.info("- 400 error: wrong arguments passed to Airflow API");
logger.info("- 401 error: check if service account has Composer User role");
logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
logger.info("- 404 error: check Web Server URL");
} catch (Exception e) {
logger.info("Received exception");
logger.info(e.getLocalizedMessage());
}
}
/** Details of the storage event. */
public static class GcsEvent {
/** Bucket name. */
String bucket;
/** Object name. */
String name;
/** Object version. */
String generation;
}
}
関数をテストする
関数と DAG が意図したとおりに機能することを確認するには、
- 関数がデプロイされるまで待ちます。
- Cloud Storage バケットにファイルをアップロードします。別の方法として、Google Cloud コンソールで [関数をテストする] アクションを選択して、関数を手動でトリガーすることもできます。
- Airflow ウェブ インターフェースの DAG ページを確認します。DAG には、有効な、またはすでに完了した DAG 実行が 1 つ必要です。
- Airflow UI で、この実行のタスクログを確認します。
print_gcs_info
タスクが、関数から受信したデータをログに出力することがわかります。
Python
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
Java
[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0