访问 Airflow REST API

Cloud Composer 1 | Cloud Composer 2

Apache Airflow 具有一个 REST API 接口,可用于执行诸如以下任务:获取有关 DAG 运行和任务的信息、更新 DAG、获取 Airflow 配置、添加和删除连接以及列出用户。

如需查看将 Airflow REST API 与 Cloud Functions 搭配使用的示例,请参阅使用 Cloud Functions 触发 DAG

Airflow REST API 版本

Cloud Composer 1 中提供了以下 Airflow REST API 版本:

  • Airflow 2 使用稳定的 REST API。Airflow 已弃用实验性 REST API。

  • 如果您通过 Airflow 配置替换来启用实验性 REST API,则仍然可以在 Airflow 2 中使用该 API,如下文所述。

准备工作

启用 Cloud Composer API。

启用 API

启用稳定的 Airflow REST API

Airflow 2

Airflow 2 中默认已启用稳定的 REST API。

Cloud Composer 使用自己的 API 身份验证后端,该后端与 Identity-Aware Proxy 集成。

授权以 Airflow 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。

您可以通过替换以下 Airflow 配置选项来启用或停用稳定的 REST API 或更改默认用户角色:

部分 Notes
api (Airflow 2.2.5 及更早版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.composer.api.backend.composer_auth 如需停用稳定版 REST API,请更改为 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

Airflow 1

Airflow 1 不提供稳定的 REST API。您可以改用实验性 REST API。

启用实验性 Airflow REST API

Airflow 2

默认情况下,API 身份验证功能在实验性 API 中处于停用状态。Airflow 网络服务器会拒绝您发出的所有请求。

如需启用 API 身份验证功能和 Airflow 2 实验性 API,请替换以下 Airflow 配置选项:

部分 Notes
api (Airflow 2.2.5 及更早版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.api.auth.backend.default 默认值为 airflow.composer.api.backend.composer_auth
api enable_experimental_api True 默认值为 False

Airflow 1

默认情况下,Airflow 1.10.11 及更高版本中会停用 API 身份验证功能。 Airflow 网络服务器会拒绝您发出的所有请求。您需要使用请求来触发 DAG,因此请启用此功能。

如需在 Airflow 1 中启用 API 身份验证功能,请替换以下 Airflow 配置选项:

部分 Notes
api auth_backend airflow.api.auth.backend.default 默认值为 airflow.api.auth.backend.deny_all

将此配置选项设置为 airflow.api.auth.backend.default 后,Airflow Web 服务器会接受所有未经身份验证的 API 请求。虽然 Airflow Web 服务器本身不需要进行身份验证,但它仍受 Identity-Aware Proxy 的保护,后者提供自己的身份验证层。

允许使用 Web 服务器访问权限控制对 Airflow REST API 进行 API 调用

调用方方法可以使用 IPv4 或 IPv6 地址,具体取决于用于调用 Airflow REST API 的方法。请务必使用 Web 服务器访问权限控制来取消屏蔽流向 Airflow REST API 的 IP 流量。

如果您不确定对 Airflow REST API 的调用将从哪个 IP 地址发送,请使用默认配置选项 All IP addresses have access (default)

调用 Airflow REST API

获取 IAM 代理的 client_id

要向 Airflow REST API 端点发出请求,函数需要获得保护 Airflow 网络服务器的 IAM 代理的客户端 ID。

Cloud Composer 不直接提供此信息,而是向 Airflow 网络服务器发出未经身份验证的请求,并从重定向网址捕获此客户端 ID。

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL 替换为 Airflow 网页界面的网址。

在输出中,搜索 client_id 后面的字符串。例如:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

将以下代码保存在名为 get_client_id.py 的文件中。填写 project_idlocationcomposer_environment 的值,然后在 Cloud Shell 或本地环境中运行代码。

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

使用 client_id 调用 Airflow REST API

进行以下替换:

  • client_id 变量的值替换为上一步中获得的 client_id 值。
  • webserver_id 变量的值替换为您的租户项目 ID,它是 Airflow 网页界面网址中 .appspot.com 前面的部分。您已在上一步中获得 Airflow 网页界面网址。
  • 指定您使用的 Airflow REST API 版本:

    • 如果您使用稳定的 Airflow REST API,请将 USE_EXPERIMENTAL_API 变量设置为 False
    • 如果您使用实验性 Airflow REST API,则无需进行任何更改。USE_EXPERIMENTAL_API 变量已设置为 True

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text

# END COPIED IAP CODE

使用服务账号访问 Airflow REST API

Airflow 数据库将电子邮件字段的长度限制为 64 个字符。服务账号有时包含长度超过 64 个字符的电子邮件地址。您无法以常规方式为此类服务帐号创建 Airflow 用户。如果此类服务帐号没有 Airflow 用户,则访问 Airflow REST API 会导致 HTTP 错误 401 和 403。

要解决此问题,您可以为服务账号预先注册一个 Airflow 用户。为此,请使用 accounts.google.com:NUMERIC_USER_ID 作为用户名,并使用任意唯一的字符串作为电子邮件。

  1. 如需获取服务账号的 NUMERIC_USER_ID,请运行以下命令:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    您需要在其中:

    • SA_NAME 替换为服务账号名称。
    • PROJECT_ID 替换为项目 ID
  2. 为该服务账号创建具有 Op 角色的 Airflow 用户:

    Airflow 界面

    1. 转到 Airflow 界面

    2. 依次前往管理 > 用户,然后点击创建。您的 Airflow 用户必须具有 Admin 角色才能打开此页面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 作为用户名。将 NUMERIC_USER_ID 替换为上一步中获得的用户 ID。

    4. 将唯一标识符指定为电子邮件。您可以使用任意的唯一字符串。

    5. 指定用户的角色。例如 Op

    6. 确保已选中 Is Active? 复选框。

    7. 指定用户的名字和姓氏。您可以使用任何字符串。

    8. 点击保存

    gcloud

    在 Airflow 2 中,运行以下 Airflow CLI 命令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    您需要在其中:

    • ENVIRONMENT_NAME 替换为环境的名称。
    • LOCATION 替换为环境所在的区域。
    • NUMERIC_USER_ID 替换为在上一步中获取的用户 ID。
    • UNIQUE_ID 替换为 Airflow 用户的标识符。您可以使用任意的唯一字符串。
  3. 为服务帐号创建 Airflow 用户后,以该服务帐号身份通过身份验证的调用方将被识别为预注册用户,并登录到 Airflow。

扩缩 Airflow REST API 组件

Airflow REST API 和 Airflow 界面端点在组件(即 Airflow Web 服务器)内运行。如果您大量使用 REST API,请考虑增加 CPU 和内存参数,以根据预期负载调整 Airflow Webserver 资源。

后续步骤