访问 Airflow REST API

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow 具有一个 REST API 接口,可用于执行诸如以下任务:获取有关 DAG 运行和任务的信息、更新 DAG、获取 Airflow 配置、添加和删除连接以及列出用户。

如需查看将 Airflow REST API 与 Cloud Run 函数搭配使用的示例,请参阅使用 Cloud Functions 触发 DAG

Airflow REST API 版本

Cloud Composer 2 中提供了以下 Airflow REST API 版本:

  • Airflow 2 使用稳定的 REST API。Airflow 已弃用实验性 REST API。

  • 如果您通过 Airflow 配置替换来启用实验性 REST API,则仍然可以在 Airflow 2 中使用该 API,如下文所述。

准备工作

Enable the Cloud Composer API.

Enable the API

启用稳定的 Airflow REST API

Airflow 2 中默认已启用稳定的 REST API。

Cloud Composer 使用自己的 API 身份验证后端

授权以 Airflow 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。

您可以通过替换以下 Airflow 配置选项来启用或停用稳定的 REST API 或更改默认用户角色:

部分 备注
api (Airflow 2.2.5 及更低版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.composer.api.backend.composer_auth 如需停用稳定的 REST API,请更改为 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

启用实验性 Airflow REST API

默认情况下,API 身份验证功能在实验性 API 中处于停用状态。Airflow 网络服务器会拒绝您发出的所有请求。

如需启用 API 身份验证功能和 Airflow 2 实验性 API,请替换以下 Airflow 配置选项:

部分 备注
api (Airflow 2.2.5 及更低版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.api.auth.backend.default 默认值为 airflow.composer.api.backend.composer_auth
api enable_experimental_api True 默认值为 False

将此配置选项设置为 airflow.api.auth.backend.default 后,Airflow Web 服务器会接受所有 API 请求,无需进行身份验证。虽然 Airflow Web 服务器本身不需要进行身份验证,但它仍受 Identity-Aware Proxy 的保护,后者提供自己的身份验证层。

使用 Web 服务器访问控制允许对 Airflow REST API 进行 API 调用

调用方方法(根据用于调用 Airflow REST API 的方法) 可以使用 IPv4 或 IPv6 地址。请务必使用网站服务器访问控制取消屏蔽对 Airflow REST API 的 IP 流量。

如果您不确定将从哪些 IP 地址发送对 Airflow REST API 的调用,请使用默认配置选项 All IP addresses have access (default)

调用 Airflow REST API

本部分提供了一个示例 Python 脚本,该脚本可用于使用稳定的 Airflow REST API 来触发 DAG。

将以下示例的内容放入名为 composer2_airflow_rest_api.py 的文件中,然后在变量值中提供 Airflow 界面网址、DAG 的名称和 DAG 运行配置。

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

例如,以下配置不正确

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

使用服务账号访问 Airflow REST API

Airflow 数据库将电子邮件字段的长度限制为 64 个字符。服务账号有时包含长度超过 64 个字符的电子邮件地址。以常规方式无法为此类服务账号创建 Airflow 用户。如果此类服务账号没有 Airflow 用户,则访问 Airflow REST API 会导致 HTTP 错误 401 和 403。

要解决此问题,您可以为服务账号预先注册一个 Airflow 用户。为此,请使用 accounts.google.com:NUMERIC_USER_ID 作为用户名,并使用任意唯一的字符串作为电子邮件。

  1. 如需获取服务账号的 NUMERIC_USER_ID,请运行以下命令:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    替换:

    • SA_NAME 替换为服务账号名称。
    • PROJECT_ID 替换为项目 ID
  2. 为该服务账号创建具有 Op 角色的 Airflow 用户:

    Airflow 界面

    1. 转到 Airflow 界面

    2. 依次前往安全 > 列出用户,然后点击添加新记录。您的 Airflow 用户必须具有 Admin 角色,才能打开此页面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 作为用户名。将 NUMERIC_USER_ID 替换为上一步中获得的用户 ID。

    4. 将唯一标识符指定为电子邮件。您可以使用任意的唯一字符串。

    5. 指定用户的角色。例如 Op

    6. 确保已选中是否有效?复选框。

    7. 指定用户的名字和姓氏。您可以使用任何字符串。

    8. 点击保存

    gcloud

    运行以下 Airflow CLI 命令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    替换:

    • ENVIRONMENT_NAME 替换为环境的名称。
    • LOCATION 替换为环境所在的区域。
    • NUMERIC_USER_ID 替换为在上一步中获取的用户 ID。
    • UNIQUE_ID 替换为 Airflow 用户的标识符。您可以使用任意的唯一字符串。
  3. 为服务账号创建 Airflow 用户后,调用者 被认定为预注册用户,因此必须经过身份验证; 并登录了 Airflow。

扩缩 Airflow REST API 组件

Airflow REST API 和 Airflow 界面端点在组件(即 Airflow Web 服务器)中运行。如果您大量使用 REST API,可以考虑 增加 CPU 和内存参数以调整 Airflow Web 服务器资源 预期负载

后续步骤