存取 Airflow REST API

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 具有 REST API 介面,可用來執行各種工作,例如取得 DAG 執行和工作相關資訊、更新 DAG、取得 Airflow 設定、新增及刪除連線,以及列出使用者。

如需搭配使用 Airflow REST API 與 Cloud Run 函式的範例,請參閱「使用 Cloud Run 函式觸發 DAG」。

Airflow REST API 版本

設定穩定版 Airflow REST API

在 Airflow 2 中,穩定版 REST API 預設為啟用。 Cloud Composer 使用自己的 API 驗證後端

授權作業會以 Airflow 提供的標準方式進行。新使用者透過 API 授權時,帳戶預設會取得 Op 角色。

您可以覆寫下列 Airflow 設定選項,啟用或停用穩定版 REST API,或變更預設使用者角色:

區段 附註
api (Airflow 2.2.5 和更早版本) auth_backend
(Airflow 2.3.0 和更新版本) auth_backends
airflow.composer.api.backend.composer_auth 如要停用穩定版 REST API,請改用 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

使用網路伺服器存取權控管,允許對 Airflow REST API 進行 API 呼叫

視呼叫 Airflow REST API 的方法而定,呼叫端方法可以使用 IPv4 或 IPv6 位址。請記得使用網路伺服器存取權控管,解除封鎖 Airflow REST API 的 IP 流量。

如果您不確定呼叫 Airflow REST API 時會使用哪些 IP 位址,請使用預設設定選項 All IP addresses have access (default)

呼叫 Airflow REST API

本節提供 Python 指令碼範例,可用於透過穩定的 Airflow REST API 觸發 DAG。

將下列範例的內容放入名為 composer2_airflow_rest_api.py 的檔案,然後設定下列變數:

  • dag_id:DAG 的名稱,如 DAG 來源檔案中所定義。
  • dag_config:DAG 執行作業的設定。
  • web_server_url:Airflow 網路伺服器網址。 格式為 https://<web-server-id>.composer.googleusercontent.com

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

使用服務帳戶存取 Airflow REST API

在 2.3.0 之前的 Airflow 版本中,Airflow 資料庫將電子郵件欄位的長度限制為 64 個字元。服務帳戶的電子郵件地址有時會超過 64 個字元。您無法以一般方式為這類服務帳戶建立 Airflow 使用者。如果這類服務帳戶沒有 Airflow 使用者,存取 Airflow REST API 會導致 HTTP 錯誤 401 和 403。

解決方法是預先為服務帳戶註冊 Airflow 使用者。如要這麼做,請使用 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱,並使用任何不重複的字串做為電子郵件地址。

  1. 如要取得服務帳戶的 NUMERIC_USER_ID,請執行下列指令:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    取代:

    • SA_NAME 改為服務帳戶名稱。
    • PROJECT_ID 替換為專案 ID
  2. 為服務帳戶建立具有 Op 角色的 Airflow 使用者:

    Airflow UI

    1. 前往 Airflow UI

    2. 依序前往「安全性」>「列出使用者」,然後按一下「新增記錄」。您的 Airflow 使用者必須具備 Admin 角色,才能開啟這個頁面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱。將 NUMERIC_USER_ID 替換為上一個步驟中取得的使用者 ID。

    4. 將電子郵件指定為專屬 ID。你可以使用任何不重複的字串。

    5. 指定使用者的角色。例如 Op

    6. 確認已選取「是否啟用?」核取方塊。

    7. 指定使用者的名字和姓氏。您可以使用任何字串。

    8. 按一下 [儲存]

    gcloud

    執行下列 Airflow CLI 指令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    取代:

    • ENVIRONMENT_NAME 替換為環境的名稱。
    • LOCATION 改成環境所在的地區。
    • NUMERIC_USER_ID 替換為上一個步驟中取得的使用者 ID。
    • UNIQUE_ID,並提供 Airflow 使用者的 ID。你可以使用任何不重複的字串。
  3. 為服務帳戶建立 Airflow 使用者後,經過服務帳戶驗證的呼叫端會被視為預先註冊的使用者,並登入 Airflow。

擴充 Airflow REST API 元件

Airflow REST API 和 Airflow UI 端點會在 Airflow 網路伺服器中執行。如果大量使用 REST API,請根據預期負載,考慮增加 Airflow 網頁伺服器可用的 CPU 和記憶體。

後續步驟