访问 Airflow REST API

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

Apache Airflow 具有一个 REST API 接口,可用于执行诸如以下任务:获取有关 DAG 运行和任务的信息、更新 DAG、获取 Airflow 配置、添加和删除连接以及列出用户。

如需查看将 Airflow REST API 与 Cloud Run 函数搭配使用的示例,请参阅使用 Cloud Functions 触发 DAG

Airflow REST API 版本

Cloud Composer 2 中提供了以下 Airflow REST API 版本:

  • Airflow 2 使用稳定的 REST API。Airflow 已弃用实验性 REST API。

  • 如果您通过 Airflow 配置替换来启用实验性 REST API,则仍然可以在 Airflow 2 中使用该 API,如下文所述。

准备工作

Enable the Cloud Composer API.

Enable the API

启用稳定的 Airflow REST API

Airflow 2 中默认已启用稳定的 REST API。

Cloud Composer 使用自己的 API 身份验证后端

授权以 Airflow 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。

您可以通过替换以下 Airflow 配置选项来启用或停用稳定的 REST API 或更改默认用户角色:

部分 备注
api (Airflow 2.2.5 及更低版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.composer.api.backend.composer_auth 如需停用稳定的 REST API,请更改为 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

启用实验性 Airflow REST API

默认情况下,API 身份验证功能在实验性 API 中处于停用状态。Airflow 网络服务器会拒绝您发出的所有请求。

如需启用 API 身份验证功能和 Airflow 2 实验性 API,请替换以下 Airflow 配置选项:

部分 备注
api (Airflow 2.2.5 及更低版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.api.auth.backend.default 默认值为 airflow.composer.api.backend.composer_auth
api enable_experimental_api True 默认值为 False

将此配置选项设置为 airflow.api.auth.backend.default,Airflow Web 服务器接受所有 API 请求。虽然 Airflow Web 服务器本身不需要进行身份验证,但它仍受 Identity-Aware Proxy 的保护,后者提供自己的身份验证层。

允许使用 Web 服务器访问权限控制机制对 Airflow REST API 进行 API 调用

调用 Airflow REST API 时所用的方法不同,调用方方法可以使用 IPv4 或 IPv6 地址。记得取消屏蔽 使用 Web 服务器访问权限控制将 IP 流量传输到 Airflow REST API。

如果您不确定将从哪些 IP 地址发送对 Airflow REST API 的调用,请使用默认配置选项 All IP addresses have access (default)

调用 Airflow REST API

本部分提供了一个示例 Python 脚本,该脚本可用于使用稳定的 Airflow REST API 来触发 DAG。

将以下示例的内容放入名为 composer2_airflow_rest_api.py 的文件中,然后在变量值中提供 Airflow 界面网址、DAG 的名称和 DAG 运行配置。

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

例如,以下配置不正确

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

使用服务账号访问 Airflow REST API

Airflow 数据库将电子邮件字段的长度限制为 64 个字符。服务账号有时包含长度超过 64 个字符的电子邮件地址。以常规方式无法为此类服务账号创建 Airflow 用户。如果此类服务没有 Airflow 用户 访问 Airflow REST API 会导致 HTTP 错误 401 和 403。

要解决此问题,您可以为服务账号预先注册一个 Airflow 用户。为此,请使用 accounts.google.com:NUMERIC_USER_ID 作为用户名,并使用任意唯一的字符串作为电子邮件。

  1. 如需获取服务账号的 NUMERIC_USER_ID,请运行以下命令:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    替换:

    • SA_NAME 替换为服务账号名称。
    • PROJECT_ID 替换为项目 ID
  2. 为该服务账号创建具有 Op 角色的 Airflow 用户:

    Airflow 界面

    1. 前往 Airflow 界面

    2. 依次前往安全 > 列出用户,然后点击添加新记录。您的 Airflow 用户必须具有 Admin 角色,才能打开此页面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 作为用户名。将 NUMERIC_USER_ID 替换为上一步中获得的用户 ID。

    4. 将唯一标识符指定为电子邮件。您可以使用任意的唯一字符串。

    5. 指定用户的角色。例如 Op

    6. 确保已选中是否有效?复选框。

    7. 指定用户的名字和姓氏。您可以使用任何字符串。

    8. 点击保存

    gcloud

    运行以下 Airflow CLI 命令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    替换:

    • ENVIRONMENT_NAME 替换为环境的名称。
    • LOCATION 替换为环境所在的区域。
    • NUMERIC_USER_ID 替换为在上一步中获取的用户 ID。
    • UNIQUE_ID 替换为 Airflow 用户的标识符。您可以使用任意的唯一字符串。
  3. 为服务账号创建 Airflow 用户后,以该服务账号身份进行身份验证的调用方会被识别为预注册用户,并登录到 Airflow。

扩缩 Airflow REST API 组件

Airflow REST API 和 Airflow 界面端点在组件(即 Airflow Web 服务器)中运行。如果您大量使用 REST API,不妨考虑增加 CPU 和内存参数,以便根据预期负载调整 Airflow Web 服务器资源。

后续步骤