Apache Airflow 具有一个 REST API 接口,可用于执行诸如以下任务:获取有关 DAG 运行和任务的信息、更新 DAG、获取 Airflow 配置、添加和删除连接以及列出用户。

如需查看将 Airflow REST API 与 Cloud Run 函数搭配使用的示例,请参阅使用 Cloud Run 函数触发 DAG

Airflow REST API 版本

配置稳定的 Airflow REST API

Airflow 2 中默认启用稳定的 REST API。 Cloud Composer 使用自己的 API 身份验证后端

授权以 Airflow 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。

您可以通过替换以下 Airflow 配置选项来启用或停用稳定的 REST API 或更改默认用户角色:

部分 备注
api auth_backends airflow.composer.api.backend.composer_auth 如需停用稳定的 REST API,请更改为 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

使用 Web 服务器访问控制功能允许对 Airflow REST API 进行 API 调用

调用 Airflow REST API 所用的方法不同,调用方方法可以使用 IPv4 或 IPv6 地址。请务必使用网站服务器访问控制取消屏蔽指向 Airflow REST API 的 IP 流量。

如果您不确定将从哪些 IP 地址发送对 Airflow REST API 的调用,请使用默认配置选项 All IP addresses have access (default)

调用 Airflow REST API

本部分提供了一个示例 Python 脚本,该脚本可用于使用稳定的 Airflow REST API 来触发 DAG。

将以下示例的内容放入名为 composer2_airflow_rest_api.py 的文件中,然后设置以下变量:

  • dag_id:DAG 的名称,如 DAG 源文件中所定义。
  • dag_config:DAG 运行作业的配置。
  • web_server_url:您的 Airflow Web 服务器网址。 格式为 https://<web-server-id>.composer.googleusercontent.com

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests

# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])

def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    Make a request to Cloud Composer 2 environment's web server.
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                  If no timeout is provided, it is set to 90 by default.

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)

def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    Make a request to trigger a dag using the stable Airflow 2 REST API.

      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
    elif response.status_code != 200:
        return response.text

if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config


使用服务账号访问 Airflow REST API

Airflow 数据库将电子邮件字段的长度限制为 64 个字符。服务账号有时包含长度超过 64 个字符的电子邮件地址。以常规方式无法为此类服务账号创建 Airflow 用户。如果此类服务账号没有 Airflow 用户,则访问 Airflow REST API 会导致 HTTP 错误 401 和 403。

要解决此问题,您可以为服务账号预先注册一个 Airflow 用户。为此,请使用 accounts.google.com:NUMERIC_USER_ID 作为用户名,并使用任意唯一的字符串作为电子邮件。

  1. 如需获取服务账号的 NUMERIC_USER_ID,请运行以下命令:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \


    • SA_NAME 替换为服务账号名称。
    • PROJECT_ID 替换为项目 ID
  2. 为该服务账号创建具有 Op 角色的 Airflow 用户:

    Airflow 界面

    1. 前往 Airflow 界面

    2. 依次前往安全 > 列出用户,然后点击添加新记录。您的 Airflow 用户必须具有 Admin 角色,才能打开此页面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 作为用户名。将 NUMERIC_USER_ID 替换为上一步中获得的用户 ID。

    4. 将唯一标识符指定为电子邮件。您可以使用任意的唯一字符串。

    5. 指定用户的角色。例如 Op

    6. 确保选中有效复选框。

    7. 指定用户的名字和姓氏。您可以使用任何字符串。

    8. 点击保存


    运行以下 Airflow CLI 命令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password


    • ENVIRONMENT_NAME 替换为环境的名称。
    • LOCATION 替换为环境所在的区域。
    • NUMERIC_USER_ID 替换为在上一步中获取的用户 ID。
    • UNIQUE_ID 替换为 Airflow 用户的标识符。您可以使用任意的唯一字符串。
  3. 为服务账号创建 Airflow 用户后,以该服务账号身份进行身份验证的调用方会被识别为预注册用户,并登录到 Airflow。

扩缩 Airflow REST API 组件

Airflow REST API 和 Airflow 界面端点在 Airflow 网络服务器中运行。如果您大量使用 REST API,请根据预期负载考虑增加 Airflow Web 服务器可用的 CPU 和内存量。
