Airflow REST API にアクセスする

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow には REST API インターフェースがあり、DAG 実行とタスクに関する情報の取得、DAG の更新、Airflow 構成の取得、接続の追加と削除、ユーザーの一覧表示などのタスクを実行に使用できます。

Cloud Run functions で Airflow REST API を使用する例については、Cloud Run functions を使用した DAG のトリガーをご覧ください。

Airflow REST API のバージョン

  • Airflow 2 は、安定版の REST API を使用します。
  • 試験運用版の REST API の Airflow によるサポートは終了しました。

安定版の Airflow REST API を構成する

安定版の REST API は、Airflow 2 でデフォルトで有効になっています。 Cloud Composer は独自の API 認証バックエンドを使用します。

認可は、Airflow が提供する標準の方法で機能します。新しいユーザーが API を使用して認可すると、デフォルトで、ユーザーのアカウントに Op ロールが付与されます。

以下の Airflow 構成オプションをオーバーライドすることで、安定版の REST API を有効または無効にできます。また、デフォルトのユーザーロールを変更することもできます。

セクション キー メモ
api (Airflow 2.2.5 以前)auth_backend
(Airflow 2.3.0 以降)auth_backends
airflow.composer.api.backend.composer_auth 安定版の REST API を無効にするには、airflow.api.auth.backend.deny_all に変更します。
api composer_auth_user_registration_role Op その他のロールも指定できます。

ウェブサーバー アクセス制御を使用して Airflow REST API への API 呼び出しを許可する

Airflow REST API の呼び出しに使用される方法に応じて、呼び出し元のメソッドは IPv4 または IPv6 のアドレスを使用できます。ウェブサーバー アクセス制御を使用して、Airflow REST API への IP トラフィックのブロックを解除します。

Airflow REST API の呼び出しが送信される IP アドレスがわからない場合は、デフォルトの構成オプション All IP addresses have access (default) を使用します。

Airflow REST API を呼び出す

このセクションでは、安定版の Airflow REST API で DAG をトリガーするために使用できる Python スクリプトの例を示します。

次の例の内容を composer2_airflow_rest_api.py という名前のファイルに入力し、次の変数を設定します。

  • dag_id: DAG ソースファイルで定義されている DAG の名前。
  • dag_config: DAG 実行の構成。
  • web_server_url: Airflow ウェブサーバーの URL。形式は https://<web-server-id>.composer.googleusercontent.com です。

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

サービス アカウントを使用して Airflow REST API にアクセスする

Airflow データベースでは、メールフィールドの長さが 64 文字に制限されています。サービス アカウントのメールアドレスが 64 文字を超える場合があります。通常の方法で、このようなサービス アカウントの Airflow ユーザーを作成することはできません。このようなサービス アカウントのために Airflow ユーザーが存在しない場合、Airflow REST API にアクセスすると HTTP エラー 401 と 403 が発生します。

回避策として、サービス アカウントの Airflow ユーザーを事前登録できます。これを行うには、ユーザー名として accounts.google.com:NUMERIC_USER_ID を使用し、一意の文字列をメールアドレスとして使用します。

  1. サービス アカウントの NUMERIC_USER_ID を取得するには、次のコマンドを実行します。

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    以下のように置き換えます。

    • SA_NAME は、サービス アカウント名に置き換えます。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
  2. サービス アカウントの Op ロールを持つ Airflow ユーザーを作成します。

    Airflow UI

    1. Airflow UI に移動します。

    2. [セキュリティ] > [ユーザーの一覧表示] の順に移動し、[新しいレコードを追加する] をクリックします。このページを開くには、Airflow ユーザーに Admin ロールが必要です。

    3. ユーザー名として accounts.google.com:NUMERIC_USER_ID を指定します。NUMERIC_USER_ID は、前の手順で取得したユーザー ID に置き換えます。

    4. メールとして固有識別子を指定します。任意の一意の文字列を使用できます。

    5. ユーザーのロールを指定する。例: Op

    6. [有効] チェックボックスがオンになっていることを確認します。

    7. ユーザーの姓と名を指定します。任意の文字列を使用できます。

    8. [保存] をクリックします。

    gcloud

    次の Airflow CLI コマンドを実行します。

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    以下のように置き換えます。

    • ENVIRONMENT_NAME を環境の名前にする。
    • LOCATION は、環境が配置されているリージョン。
    • NUMERIC_USER_ID は、前のステップで取得したユーザー ID に置き換えます。
    • UNIQUE_ID は、Airflow ユーザーの ID に置き換えます。任意の一意の文字列を使用できます。
  3. サービス アカウントの Airflow ユーザーを作成すると、サービス アカウントとして認証された呼び出し元は事前登録済みユーザーとして認識され、Airflow にログインします。

Airflow REST API コンポーネントのスケーリング

Airflow REST API と Airflow UI エンドポイントは、Airflow ウェブサーバー内で実行されます。REST API を集中的に使用する場合は、予想される負荷に基づいて、Airflow ウェブサーバーで使用可能な CPU とメモリの量を増やすことを検討してください。

次のステップ