Airflow REST API にアクセスする

Cloud Composer 1 | Cloud Composer 2

Apache Airflow には REST API インターフェースがあり、DAG 実行とタスクに関する情報の取得、DAG の更新、Airflow 構成の取得、接続の追加と削除、ユーザーの一覧表示などのタスクを実行に使用できます。

Cloud Functions で Airflow REST API を使用する例については、Cloud Functions を使用した DAG のトリガーをご覧ください。

Airflow REST API のバージョン

Cloud Composer 2 では、次の Airflow REST API のバージョンを使用できます。

  • Airflow 2 は、安定版の REST API を使用します。試験運用版の REST API の Airflow によるサポートは終了しました。

  • Airflow 構成のオーバーライド経由で有効にすると、試験運用版の REST API を引き続き Airflow 2 で使用できます。詳細については、以下をご覧ください。

始める前に

Enable the Cloud Composer API.

Enable the API

安定版の Airflow REST API を有効にする

安定版の REST API は、Airflow 2 でデフォルトですでに有効になっています。

Cloud Composer は独自の API 認証バックエンドを使用します。

認可は、Airflow が提供する標準の方法で機能します。新しいユーザーが API を使用して認可すると、デフォルトで、ユーザーのアカウントに Op ロールが付与されます。

以下の Airflow 構成オプションをオーバーライドすることで、安定版の REST API を有効または無効にできます。また、デフォルトのユーザーロールを変更することもできます。

セクション キー メモ
api (Airflow 2.2.5 以前)auth_backend
(Airflow 2.3.0 以降)auth_backends
airflow.composer.api.backend.composer_auth 安定版の REST API を無効にするには、airflow.api.auth.backend.deny_all に変更します。
api composer_auth_user_registration_role Op その他のロールも指定できます。

試験運用版の Airflow REST API を有効にする

試験運用版 API では、API 認証機能はデフォルトで無効になっています。Airflow ウェブサーバーは、すべてのリクエストを拒否します。

API 認証機能と Airflow 2 試験運用版 API を有効にするには、次の Airflow 構成オプションをオーバーライドします。

セクション キー メモ
api (Airflow 2.2.5 以前)auth_backend
(Airflow 2.3.0 以降)auth_backends
airflow.api.auth.backend.default デフォルトは airflow.composer.api.backend.composer_auth です。
api enable_experimental_api True デフォルトは False です。

この構成オプションを airflow.api.auth.backend.default に設定すると、Airflow ウェブサーバーは認証を行うことなくすべての API リクエストを受け入れます。Airflow ウェブサーバー自体は認証を必要としませんが、独自の認証レイヤを備える Identity-Aware Proxy によって保護されます。

Webserver Access Control を使用して Airflow REST API への API 呼び出しを許可する

Airflow REST API を呼び出すメソッドに応じて、呼び出し元のメソッドは IPv4 または IPv6 アドレスのいずれかを使用できます。Webserver Access Control を使用して、Airflow REST API への IP トラフィックのブロックを解除します。

Airflow REST API の呼び出しが送信される IP アドレスがわからない場合は、デフォルトの構成オプション All IP addresses have access (default) を使用します。

Airflow REST API を呼び出す

このセクションでは、安定版の Airflow REST API で DAG をトリガーするために使用できる Python スクリプトの例を示します。

次の例の内容を composer2_airflow_rest_api.py という名前のファイルに入力し、パラメータで Airflow UI URL、DAG の名前、DAG 実行構成を指定します。

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests

# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])

def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)

def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

たとえば、次の構成は正しくありません

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

サービス アカウントを使用して Airflow REST API にアクセスする

Airflow データベースでは、メール フィールドの長さが 64 文字に制限されています。サービス アカウントのメールアドレスが 64 文字を超える場合があります。通常の方法で、このようなサービス アカウントの Airflow ユーザーを作成することはできません。このようなサービス アカウントのために Airflow ユーザーが存在しない場合、Airflow REST API にアクセスすると HTTP エラー 401 と 403 が発生します。

回避策として、サービス アカウントの Airflow ユーザーを事前登録できます。そのためには、ユーザー名に accounts.google.com:NUMERIC_USER_ID を使用し、メールアドレスに一意の文字列を使用します。

  1. サービス アカウントの NUMERIC_USER_ID を取得するには、次のコマンドを実行します。

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    次のように置き換えます。

    • SA_NAME は、サービス アカウント名に置き換えます。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
  2. サービス アカウントの Op ロールを持つ Airflow ユーザーを作成します。

    Airflow UI

    1. Airflow UI に移動する

    2. [セキュリティ] > [ユーザーの一覧表示] の順に移動し、[新しいレコードを追加する] をクリックします。このページを開くには、Airflow ユーザーに Admin ロールが必要です。

    3. ユーザー名として accounts.google.com:NUMERIC_USER_ID を指定します。NUMERIC_USER_ID を前のステップで取得したユーザー ID に置き換えます。

    4. メールとして固有識別子を指定します。任意の一意の文字列を使用できます。

    5. ユーザーのロールを指定します。例: Op

    6. [有効] チェックボックスがオンになっていることを確認します。

    7. ユーザーの姓と名を指定します。任意の文字列を使用できます。

    8. [保存] をクリックします。

    gcloud

    次の Airflow CLI コマンドを実行します。

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    次のように置き換えます。

    • ENVIRONMENT_NAME を環境の名前にする。
    • LOCATION は、環境が配置されているリージョン。
    • NUMERIC_USER_ID は、前のステップで取得したユーザー ID に置き換えます。
    • UNIQUE_ID は、Airflow ユーザーの ID に置き換えます。任意の一意の文字列を使用できます。
  3. サービス アカウントの Airflow ユーザーを作成すると、サービス アカウントとして認証された呼び出し元が事前登録ユーザーとして認識され、Airflow にログインされます。

Airflow REST API コンポーネントのスケーリング

Airflow REST API と Airflow UI エンドポイントは、コンポーネント(Airflow ウェブサーバーなど)内で実行されます。REST API を集中使用する場合は、CPU とメモリのパラメータを増やして、Airflow ウェブサーバー リソースを予想される負荷に合わせて調整することを検討してください。

次のステップ