Cloud Functions と Airflow REST API を使用して Cloud Composer DAG をトリガーする

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Functions を使用してイベントに応答して Cloud Composer DAG をトリガーする方法について説明します。

Apache Airflow では定期的なスケジュールで DAG が実行されるように設計されていますが、イベントに応答して DAG をトリガーすることもできます。これを行う方法の一つとして、Cloud Functions を使用して、指定されたイベントの発生時に Cloud Composer DAG をトリガーする方法があります。

このガイドの例では、Cloud Storage バケットで変更が生じるたびに DAG を実行します。バケット内のオブジェクトが変更されると、関数がトリガーされます。この関数は、Cloud Composer 環境の Airflow REST API にリクエストを行います。Airflow はこのリクエストを処理して DAG を実行します。DAG は変更に関する情報を出力します。

始める前に

環境のネットワーク構成を確認する

このソリューションは、プライベート IP 構成と VPC Service Controls 構成では機能しません。これらの構成では、Cloud Functions から Airflow ウェブサーバーへの接続を構成できないためです。

Cloud Composer 2 では、別のアプローチを使用できます。Cloud Functions と Pub/Sub メッセージを使用して DAG をトリガーする

プロジェクトでAPI を有効にする

コンソール

Cloud Composer and Cloud Functions API を有効にします。

API を有効にする

gcloud

Cloud Composer and Cloud Functions API を有効にします。

gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com

Airflow REST API を有効にする

Airflow のバージョンに応じて、以下のように操作します。

Webserver Access Control を使用して Airflow REST API への API 呼び出しを許可する

Cloud Functions は、IPv4 または IPv6 アドレスを使用して Airflow REST API にアクセスできます。

呼び出し元の IP 範囲がわからない場合は、ウェブサーバーのアクセス制御のデフォルトの構成オプション All IP addresses have access (default) を使用して、Cloud Functions が誤ってブロックされないようにします。

Cloud Storage バケットを作成する

この例では Cloud Storage バケットの変更に応答して DAG をトリガーするため、この例で使用する新しいバケットを作成します。

Airflow ウェブサーバーの URL を取得する

この例では、Airflow ウェブサーバー エンドポイントに REST API リクエストを行います。 Cloud Functions の関数のコードで、Airflow ウェブ インターフェース URL の一部(.appspot.com の前)を使用します。

Console

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境の名前をクリックします。

  3. [環境の詳細] ページで [環境の構成] タブに移動します。

  4. [Airflow ウェブ UI] 項目に Airflow ウェブサーバーの URL が表示されます。

gcloud

次のコマンドを実行します。

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

IAM プロキシの client_id を取得する

Airflow REST API エンドポイントにリクエストを送信するには、Airflow ウェブサーバーを保護する Identity and Access Management プロキシのクライアント ID が関数に必要です。

Cloud Composer は、この情報を直接提供しません。代わりに、認証されていないリクエストを Airflow ウェブサーバーに送信し、リダイレクト URL からクライアント ID を取得します。

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL を Airflow ウェブ インターフェースの URL に置き換えます。

出力で、client_id に続く文字列を検索します。次に例を示します。

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

次の内容を get_client_id.py という名前のコードに保存します。 project_idlocationcomposer_environment の値を入力してから、Cloud Shell またはローカル環境でコードを実行します。

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

DAG をお使いの環境にアップロードする

DAG をお使いの環境にアップロードする次の DAG の例では、受信した DAG 実行構成を出力します。このガイドの後半で作成する関数から、この DAG をトリガーします。

import datetime

import airflow
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

DAG をトリガーする Cloud Functions をデプロイする

Cloud Functions や Cloud Run でサポートされている任意の言語を使用して、Cloud Functions の関数をデプロイできます。このチュートリアルでは、PythonJava で実装された Cloud Functions の関数について説明します。

Cloud Function 構成パラメータを指定する

  • トリガー。この例では、バケットに新しいオブジェクトが作成されたとき、または既存のオブジェクトが上書きされたときに動作するトリガーを選択します。

    • トリガーのタイプ。Cloud Storage。

    • イベントのタイプファイナライズ / 作成

    • バケット。この関数をトリガーする必要があるバケットを選択します。

    • 失敗時に再試行するこの例では、このオプションを無効にすることをおすすめします。本番環境で独自の関数を使用する場合は、このオプションを有効にして一時的なエラーを処理します。

  • [ランタイム、ビルド、接続、セキュリティ設定] セクションの [ランタイム サービス アカウント]。希望に応じて、次のいずれかのオプションを使用します。

    • Compute Engine のデフォルトのサービス アカウントを選択します。デフォルトの IAM 権限では、このアカウントが Cloud Composer 環境にアクセスする関数を実行できます。

    • Composer ユーザーのロールを持つカスタム サービス アカウントを作成し、この関数のランタイム サービス アカウントとして指定します。このオプションは、最小権限の原則に従います。

  • [コード] ステップの [ランタイムとエントリ ポイント]。この例のコードを追加する際は、Python 3.7 以降のランタイムを選択し、エントリ ポイントとして trigger_dag を指定します。

要件を追加する

requirements.txt ファイルで依存関係を指定します。

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

main.py ファイルに次のコードを追加し、次のように置き換えます。

  • client_id 変数の値を、前の手順で取得した client_id の値に置き換えます。

  • webserver_id 変数の値をテナント プロジェクト ID に置き換えます。これは、.appspot.com より前の Airflow ウェブ インターフェース URL の一部です。前に Airflow ウェブ インターフェース URL を取得しました。

  • 使用する Airflow REST API のバージョンを指定します。

    • 安定版の Airflow REST API を使用する場合は、USE_EXPERIMENTAL_API 変数を False に設定します。
    • 試験運用版の Airflow REST API を使用している場合は、変更する必要はありません。USE_EXPERIMENTAL_API 変数はすでに True に設定されています。


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text

# END COPIED IAP CODE

関数をテストする

関数と DAG が意図したとおりに機能することを確認するには、

  1. 関数がデプロイされるまで待ちます。
  2. Cloud Storage バケットにファイルをアップロードします。別の方法として、Google Cloud コンソールで [関数をテストする] アクションを選択して、関数を手動でトリガーすることもできます。
  3. Airflow ウェブ インターフェースの DAG ページを確認します。DAG には、有効な、またはすでに完了した DAG 実行が 1 つ必要です。
  4. Airflow UI で、この実行のタスクログを確認します。print_gcs_info タスクが、関数から受信したデータをログに出力することがわかります。
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

次のステップ