Airflow REST API にアクセスする

Cloud Composer 1 | Cloud Composer 2

Apache Airflow には REST API インターフェースがあり、DAG 実行とタスクに関する情報の取得、DAG の更新、Airflow 構成の取得、接続の追加と削除、ユーザーの一覧表示などのタスクを実行に使用できます。

Cloud Functions で Airflow REST API を使用する例については、Cloud Functions を使用した DAG のトリガーをご覧ください。

Airflow REST API のバージョン

Cloud Composer 1 では次の Airflow REST API バージョンを使用できます。

  • Airflow 2 は、安定版の REST API を使用します。試験運用版の REST API の Airflow によるサポートは終了しました。

  • Airflow 構成のオーバーライド経由で有効にすると、試験運用版の REST API を引き続き Airflow 2 で使用できます。詳細については、以下をご覧ください。

始める前に

Cloud Composer API を有効にします。

API を有効にする

安定版の Airflow REST API を有効にする

Airflow 2

安定版の REST API は、Airflow 2 でデフォルトですでに有効になっています。

Cloud Composer では、Identity-Aware Proxy と統合された独自の API 認証バックエンドが使用されます。

認可は、Airflow が提供する標準の方法で機能します。新しいユーザーが API を使用して認可すると、デフォルトで、ユーザーのアカウントに Op ロールが付与されます。

以下の Airflow 構成オプションをオーバーライドすることで、安定版の REST API を有効または無効にできます。また、デフォルトのユーザーロールを変更することもできます。

セクション キー
api auth_backend airflow.composer.api.backend.composer_auth 安定版の REST API を無効にするには、airflow.api.auth.backend.deny_all に変更します。
api composer_auth_user_registration_role Op その他のロールも指定できます。

Airflow 1

安定した REST API は、Airflow 1 では使用できません。代わりに、試験運用版の REST API を使用できます。

試験運用版の Airflow REST API を有効にする

Airflow 2

試験運用版 API では、API 認証機能はデフォルトで無効になっています。Airflow ウェブサーバーは、すべてのリクエストを拒否します。

API 認証機能と Airflow 2 試験運用版 API を有効にするには、次の Airflow 構成オプションをオーバーライドします。

セクション キー
api auth_backend airflow.api.auth.backend.default デフォルトは airflow.composer.api.backend.composer_auth です。
api enable_experimental_api True デフォルト値は False です。

Airflow 1

Airflow 1.10.11 以降のバージョンでは API 認証機能がデフォルトで無効になっています。 Airflow ウェブサーバーは、すべてのリクエストを拒否します。リクエストを使用して DAG をトリガーするため、この機能を有効にします。

Airflow 1 で API 認証機能を有効にするには、次の Airflow 構成オプションをオーバーライドします。

セクション キー
api auth_backend airflow.api.auth.backend.default デフォルト値は airflow.api.auth.backend.deny_all です。

api-auth_backend 構成オプションを airflow.api.auth.backend.default に設定すると、Airflow ウェブサーバーは認証を行うことなくすべての API リクエストを受け入れます。Airflow ウェブサーバー自体は認証を必要としませんが、独自の認証レイヤを備える Identity-Aware Proxy によって保護されます。

Airflow REST API を呼び出す

IAM プロキシの client_id を取得する

Airflow REST API エンドポイントにリクエストを送信するには、Airflow ウェブサーバーを保護する IAM プロキシのクライアント ID が関数に必要です。

Cloud Composer は、この情報を直接提供しません。代わりに、認証されていないリクエストを Airflow ウェブサーバーに送信し、リダイレクト URL からクライアント ID を取得します。

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL を Airflow ウェブ インターフェースの URL に置き換えます。

出力で、client_id に続く文字列を検索します。次に例を示します。

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

次のコードを get_client_id.py というファイルに保存します。project_idlocationcomposer_environment に値を入力してから、Cloud Shell またはローカル環境でコードを実行します。

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = ("This script is intended to be used with Composer 1 environments. "
                     "In Composer 2, the Airflow Webserver is not in the tenant project, "
                     "so there is no tenant client ID. "
                     "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details.")
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

client_id を使用して Airflow REST API を呼び出す

次の項目を置き換えます。

  • client_id 変数の値を、前の手順で取得した client_id の値に置き換えます。
  • webserver_id 変数の値をテナント プロジェクト ID に置き換えます。これは、.appspot.com より前の Airflow ウェブ インターフェース URL の一部です。前のステップで Airflow ウェブ インターフェース URL を取得しました。
  • 使用する Airflow REST API のバージョンを指定します。

    • 安定版の Airflow REST API を使用する場合は、USE_EXPERIMENTAL_API 変数を False に設定します。
    • 試験運用版の Airflow REST API を使用する場合、変更は必要ありません。USE_EXPERIMENTAL_API 変数はすでに True に設定されています。

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

サービス アカウントを使用して Airflow REST API にアクセスする

Airflow データベースでは、メール フィールドの長さが 64 文字に制限されています。サービス アカウントのメールアドレスが 64 文字を超える場合があります。通常の方法で、このようなサービス アカウントの Airflow ユーザーを作成することはできません。

回避策として、Airflow ユーザーをサービス アカウントに事前登録できます。それには、accounts.google.com:NUMERIC_USER_ID をユーザー名として使用し、一意の文字列をメールとして使用します。

  1. サービス アカウントの NUMERIC_USER_ID を取得するには、次のコマンドを実行します。

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    次のように置き換えます。

    • SA_NAME は、サービス アカウント名に置き換えます。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
  2. サービス アカウントの Op ロールを持つ Airflow ユーザーを作成します。

    Airflow UI

    1. Airflow UI に移動します。Airflow ユーザーには Admin のロールが必要です。

    2. [管理者] > [ユーザー] に移動し、[作成] をクリックします。

    3. ユーザー名として accounts.google.com:NUMERIC_USER_ID を指定します。NUMERIC_USER_ID は、前の手順で取得したユーザー ID に置き換えます。

    4. メールアドレスとして一意の識別子を指定します。任意の一意の文字列を使用できます。

    5. ユーザーのロールを指定します。例:Op

    6. ユーザーの姓と名を指定します。任意の文字列を使用できます。

    7. [保存] をクリックします。

    gcloud

    Airflow 2 で、Airflow CLI コマンドを実行します。

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    次のように置き換えます。

    • ENVIRONMENT_NAME を環境の名前にする。
    • LOCATION は、環境が配置されているリージョン。
    • NUMERIC_USER_ID は、前のステップで取得したユーザー ID に置き換えます。
    • UNIQUE_ID は Airflow ユーザーの識別子に置き換えます。任意の一意の文字列を使用できます。
  3. サービス アカウントに Airflow ユーザーを作成すると、サービス アカウントとして認証された呼び出し元が事前登録ユーザーとして認識され、Airflow にログインします。

次のステップ