Cloud Functions で DAG をトリガーする

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Functions を使用してイベントに応答して Cloud Composer DAG をトリガーする方法について説明します。

Apache Airflow では定期的なスケジュールで DAG が実行されるように設計されていますが、イベントに応答して DAG をトリガーすることもできます。これを行う方法の一つとして、Cloud Run Functions を使用して、指定されたイベントの発生時に Cloud Composer DAG をトリガーする方法があります。

このガイドの例では、Cloud Storage バケットで変更が生じるたびに DAG を実行します。バケット内のオブジェクトが変更されると、関数がトリガーされます。この関数は、Cloud Composer 環境の Airflow REST API にリクエストを行います。Airflow はこのリクエストを処理して DAG を実行します。DAG は変更に関する情報を出力します。

始める前に

環境のネットワーク構成を確認する

このソリューションは、プライベート IP と VPC Service Controls の構成では機能しません。これらの構成では、Cloud Run Functions から Airflow ウェブサーバーへの接続を構成できないためです。

Cloud Composer 2 では、別のアプローチ(Cloud Run Functions と Pub/Sub メッセージを使用して DAG をトリガーする)を使用できます。

プロジェクトでAPI を有効にする

コンソール

Enable the Cloud Composer and Cloud Run functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Run functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Airflow REST API を有効にする

Airflow 2 の場合、安定版の REST API はデフォルトですでに有効になっています。環境で安定した API が無効になっている場合は、安定版の REST API を有効にします

Webserver Access Control を使用して Airflow REST API への API 呼び出しを許可する

Cloud Run Functions は、IPv4 または IPv6 アドレスを使用して Airflow REST API にアクセスできます。

呼び出し元の IP 範囲がわからない場合は、ウェブサーバーのアクセス制御のデフォルトの構成オプション All IP addresses have access (default) を使用して、Cloud Run Functions が誤ってブロックされないようにします。

Cloud Storage バケットを作成する

この例では Cloud Storage バケットの変更に応答して DAG をトリガーするため、この例で使用する新しいバケットを作成します。

Airflow ウェブサーバーの URL を取得する

この例では、Airflow ウェブサーバー エンドポイントに REST API リクエストを送信します。Cloud Functions 関数のコードで Airflow ウェブサーバーの URL を使用します。

コンソール

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境の名前をクリックします。

  3. [環境の詳細] ページで [環境の構成] タブに移動します。

  4. [Airflow ウェブ UI] 項目に Airflow ウェブサーバーの URL が表示されます。

gcloud

次のコマンドを実行します。

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

DAG をお使いの環境にアップロードする

DAG をお使いの環境にアップロードする次の DAG の例では、受信した DAG 実行構成を出力します。このガイドの後半で作成する関数から、この DAG をトリガーします。

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

DAG をトリガーする Cloud Functions をデプロイする

Cloud Run Functions または Cloud Run でサポートされている言語を使用して、Cloud Functions の関数をデプロイできます。このチュートリアルでは、PythonJava で実装された Cloud Functions の関数について説明します。

Cloud Function 構成パラメータを指定する

  • トリガー。この例では、バケットに新しいオブジェクトが作成されたとき、または既存のオブジェクトが上書きされたときに動作するトリガーを選択します。

    • トリガーのタイプ。Cloud Storage。

    • イベントのタイプファイナライズ / 作成

    • バケット。この関数をトリガーする必要があるバケットを選択します。

    • 失敗時に再試行するこの例では、このオプションを無効にすることをおすすめします。本番環境で独自の関数を使用する場合は、このオプションを有効にして一時的なエラーを処理します。

  • [ランタイム、ビルド、接続、セキュリティ設定] セクションの [ランタイム サービス アカウント]。希望に応じて、次のいずれかのオプションを使用します。

    • Compute Engine のデフォルトのサービス アカウントを選択します。デフォルトの IAM 権限では、このアカウントが Cloud Composer 環境にアクセスする関数を実行できます。

    • Composer ユーザーのロールを持つカスタム サービス アカウントを作成し、この関数のランタイム サービス アカウントとして指定します。このオプションは、最小権限の原則に従います。

  • [コード] ステップの [ランタイムとエントリ ポイント]。

    • (Python)この例のコードを追加する際は、Python 3.7 以降のランタイムを選択し、エントリ ポイントとして trigger_dag_gcf を指定します。

    • (Java)この例のコードを追加する際は、Java 17 ランタイムを選択し、エントリ ポイントとして com.example.Example を指定します。

要件を追加する

Python

requirements.txt ファイルで依存関係を指定します。

google-auth==2.19.1
requests==2.32.2

Java

Google Cloud Functions UI で生成された pom.xmldependencies セクションに、次の依存関係を追加します。

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-docs</artifactId>
      <version>v1-rev20210707-1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-credentials</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-oauth2-http</artifactId>
      <version>1.14.0</version>
    </dependency>

Python

Airflow REST API を使用して DAG をトリガーするコードを追加します。composer2_airflow_rest_api.py という名前のファイルを作成し、Airflow REST API 呼び出しを行うコードをこのファイルに配置します。

変数は変更しないでください。Cloud Functions は、main.py ファイルからこのファイルをインポートします。

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

main.py ファイルに次のコードを入力します。web_server_url 変数の値を、前の手順で取得した Airflow ウェブサーバーのアドレスに置き換えます。

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""

from typing import Any

import composer2_airflow_rest_api

def trigger_dag_gcf(data, context=None):
    """
    Trigger a DAG and pass event data.

    Args:
      data: A dictionary containing the data for the event. Its format depends
      on the event.
      context: The context object for the event.

    For more information about the arguments, see:
    https://cloud.google.com/functions/docs/writing/background#function_parameters
    """

    # TODO(developer): replace with your values
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )
    # Replace with the ID of the DAG that you want to run.
    dag_id = 'composer_sample_trigger_response_dag'

    composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)

Java

Example.java ファイルに次のコードを入力します。webServerUrl 変数の値を、前の手順で取得した Airflow ウェブサーバーのアドレスに置き換えます。


// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.example;

import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
 * Cloud Function that triggers an Airflow DAG in response to an event (in
 * this case a Cloud Storage event).
 */
public class Example implements BackgroundFunction<GcsEvent> {
  private static final Logger logger = Logger.getLogger(Example.class.getName());

  // TODO(developer): replace with your values
  // Replace webServerUrl with the Airflow web server address. To obtain this
  // URL, run the following command for your environment:
  // gcloud composer environments describe example-environment \
  //  --location=your-composer-region \
  //  --format="value(config.airflowUri)"
  @Override
  public void accept(GcsEvent event, Context context) throws Exception {
    String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
    String dagName = "composer_sample_trigger_response_dag";
    String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);

    logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
      dagName, event.name));
    logger.info(String.format("Triggering DAG via the following URL: %s", url));

    GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
        .createScoped("https://www.googleapis.com/auth/cloud-platform");
    HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
    HttpRequestFactory requestFactory =
      new NetHttpTransport().createRequestFactory(credentialsAdapter);

    Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
    Map<String, String> conf = new HashMap<String, String>();
    conf.put("bucket", event.bucket);
    conf.put("name", event.name);
    conf.put("generation", event.generation);
    conf.put("operation", context.eventType());
    json.put("conf", conf);
    HttpContent content = new JsonHttpContent(new GsonFactory(), json);
    HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
    request.getHeaders().setContentType("application/json");
    HttpResponse response;
    try {
      response = request.execute();
      int statusCode = response.getStatusCode();
      logger.info("Response code: " + statusCode);
      logger.info(response.parseAsString());
    } catch (HttpResponseException e) {
      // https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
      logger.info("Received HTTP exception");
      logger.info(e.getLocalizedMessage());
      logger.info("- 400 error: wrong arguments passed to Airflow API");
      logger.info("- 401 error: check if service account has Composer User role");
      logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
      logger.info("- 404 error: check Web Server URL");
    } catch (Exception e) {
      logger.info("Received exception");
      logger.info(e.getLocalizedMessage());
    }
  }

  /** Details of the storage event. */
  public static class GcsEvent {
    /** Bucket name. */
    String bucket;
    /** Object name. */
    String name;
    /** Object version. */
    String generation;
  }
}

関数をテストする

関数と DAG が意図したとおりに機能することを確認するには、

  1. 関数がデプロイされるまで待ちます。
  2. Cloud Storage バケットにファイルをアップロードします。別の方法として、Google Cloud コンソールで [関数をテストする] アクションを選択して、関数を手動でトリガーすることもできます。
  3. Airflow ウェブ インターフェースの DAG ページを確認します。DAG には、有効な、またはすでに完了した DAG 実行が 1 つ必要です。
  4. Airflow UI で、この実行のタスクログを確認します。print_gcs_info タスクが、関数から受信したデータをログに出力することがわかります。

Python

[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

Java

[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0

次のステップ