Airflow REST API にアクセスする

Cloud Composer 1 | Cloud Composer 2

Apache Airflow には REST API インターフェースがあり、DAG 実行とタスクに関する情報の取得、DAG の更新、Airflow 構成の取得、接続の追加と削除、ユーザーの一覧表示などのタスクを実行に使用できます。

Cloud Functions で Airflow REST API を使用する例については、Cloud Functions を使用した DAG のトリガーをご覧ください。

Airflow REST API のバージョン

Cloud Composer 2 では次の Airflow REST API バージョンを使用できます。

  • Airflow 2 は、安定版の REST API を使用します。試験運用版の REST API の Airflow によるサポートは終了しました。

  • Airflow 構成のオーバーライド経由で有効にすると、試験運用版の REST API を引き続き Airflow 2 で使用できます。詳細については、以下をご覧ください。

始める前に

Cloud Composer API を有効にします。

API を有効にする

安定版の Airflow REST API を有効にする

安定版の REST API は、Airflow 2 でデフォルトですでに有効になっています。

Cloud Composer は独自の API 認証バックエンドを使用します。

認可は、Airflow が提供する標準の方法で機能します。新しいユーザーが API を使用して認可すると、デフォルトで、ユーザーのアカウントに Op ロールが付与されます。

以下の Airflow 構成オプションをオーバーライドすることで、安定版の REST API を有効または無効にできます。また、デフォルトのユーザーロールを変更することもできます。

セクション キー
api auth_backend airflow.composer.api.backend.composer_auth 安定版の REST API を無効にするには、airflow.api.auth.backend.deny_all に変更します。
api composer_auth_user_registration_role Op その他のロールも指定できます。

試験運用版の Airflow REST API を有効にする

試験運用版 API では、API 認証機能はデフォルトで無効になっています。Airflow ウェブサーバーは、すべてのリクエストを拒否します。

API 認証機能と Airflow 2 試験運用版 API を有効にするには、次の Airflow 構成オプションをオーバーライドします。

セクション キー
api auth_backend airflow.api.auth.backend.default デフォルトは airflow.composer.api.backend.composer_auth です。
api enable_experimental_api True デフォルト値は False です。

api-auth_backend 構成オプションを airflow.api.auth.backend.default に設定すると、Airflow ウェブサーバーは認証を行うことなくすべての API リクエストを受け入れます。Airflow ウェブサーバー自体は認証を必要としませんが、独自の認証レイヤを備える Identity-Aware Proxy によって保護されます。

Airflow REST API を呼び出す

このセクションでは、安定版の Airflow REST API で DAG をトリガーするために使用できる Python スクリプトの例を示します。

次の例の内容を composer2_airflow_rest_api.py という名前のファイルに入力し、パラメータで Airflow UI URL、DAG の名前、DAG 実行構成を指定します。

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests

# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])

def make_composer2_web_server_request(url: str, method: str = "GET", **kwargs: Any) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)

def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

if __name__ == "__main__":

    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

サービス アカウントを使用して Airflow REST API にアクセスする

Airflow データベースでは、メール フィールドの長さが 64 文字に制限されています。サービス アカウントのメールアドレスが 64 文字を超える場合があります。通常の方法で、このようなサービス アカウントの Airflow ユーザーを作成することはできません。

回避策として、Airflow ユーザーをサービス アカウントに事前登録できます。これを行うには、accounts.google.com:NUMERIC_USER_ID をユーザー名として使用し、一意の文字列をメールアドレスとして使用します。

  1. サービス アカウントの NUMERIC_USER_ID を取得するには、次のコマンドを実行します。

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    次のように置き換えます。

    • SA_NAME は、サービス アカウント名に置き換えます。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
  2. サービス アカウントの Op ロールを持つ Airflow ユーザーを作成します。

    Airflow UI

    1. Airflow UI に移動します。Airflow ユーザーには Admin のロールが必要です。

    2. [セキュリティ] > [ユーザーの一覧表示] の順に移動し、[新しいレコードを追加する] をクリックします。

    3. ユーザー名として accounts.google.com:NUMERIC_USER_ID を指定します。NUMERIC_USER_ID は、前のステップで取得したユーザー ID に置き換えます。

    4. メールアドレスとして一意の識別子を指定します。任意の一意の文字列を使用できます。

    5. ユーザーのロールを指定します。例:Op

    6. ユーザーの姓と名を指定します。任意の文字列を使用できます。

    7. [保存] をクリックします。

    gcloud

    次の Airflow CLI コマンドを実行します。

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    次のように置き換えます。

    • ENVIRONMENT_NAME を環境の名前にする。
    • LOCATION は、環境が配置されているリージョン。
    • NUMERIC_USER_ID は、前のステップで取得したユーザー ID に置き換えます。
    • UNIQUE_ID は Airflow ユーザーの ID に置き換えます。任意の一意の文字列を使用できます。
  3. サービス アカウントに Airflow ユーザーを作成すると、サービス アカウントとして認証された呼び出し元が事前登録ユーザーとして認識され、Airflow にログインします。

次のステップ