使用 Cloud Functions 和 Airflow REST API 触发 Cloud Composer DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Cloud Functions 函数触发 Cloud Composer DAG 以响应事件。

Apache Airflow 旨在定期运行 DAG,但您也可以触发 DAG 来响应事件。一种方法是使用 Cloud Functions,在发生指定事件时触发 Cloud Composer DAG。

在本指南的示例中,每当 Cloud Storage 存储桶发生更改时,系统都会运行一个 DAG。对存储桶中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。

准备工作

检查环境的网络配置

此解决方案不适用于专用 IP 和 VPC Service Controls 配置,因为在这些配置中,无法配置从 Cloud Functions 到 Airflow Web 服务器的连接。

在 Cloud Composer 2 中,您可以使用另一种方法:使用 Cloud Functions 和 Pub/Sub 消息触发 DAG

为您的项目启用 API

控制台

启用 Cloud Composer and Cloud Functions API。

启用 API

gcloud

Enable the Cloud Composer and Cloud Functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

启用 Airflow REST API

根据您的 Airflow 版本,执行以下操作:

允许使用 Web 服务器访问权限控制对 Airflow REST API 进行 API 调用

Cloud Functions 可以使用 IPv4 或 IPv6 地址来连接到 Airflow REST API。

如果您不确定调用的 IP 范围是什么,请在 Web 服务器访问权限控制中使用默认配置选项 All IP addresses have access (default),以免意外屏蔽您的 Cloud Functions 函数。

创建 Cloud Storage 存储桶

此示例会在 Cloud Storage 存储桶中发生更改时触发 DAG。请创建一个新存储桶以用于此示例。

获取 Airflow 网络服务器网址

此示例向 Airflow 网络服务器端点发出 REST API 请求。您需要在 Cloud Functions 函数代码中使用 .appspot.com 之前的 Airflow 网页界面网址部分。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 点击您的环境的名称。

  3. 环境详情页面上,前往环境配置标签页。

  4. Airflow 网页界面项中列出了 Airflow 网络服务器的网址。

gcloud

运行以下命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

获取 IAM 代理的 client_id

如需向 Airflow REST API 端点发出请求,该函数需要使用为 Airflow Web 服务器提供保护的 Identity and Access Management 代理的客户端 ID。

Cloud Composer 不直接提供此信息,而是向 Airflow 网络服务器发出未经身份验证的请求,并从重定向网址捕获此客户端 ID。

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL 替换为 Airflow 网页界面的网址。

在输出中,搜索 client_id 后面的字符串。例如:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

将以下代码保存在名为 get_client_id.py 的文件中。填写 project_idlocationcomposer_environment 的值,然后在 Cloud Shell 或本地环境中运行代码。

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

将 DAG 上传到您的环境

将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。

import datetime

import airflow
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

部署触发 DAG 的 Cloud Functions 函数

您可以使用 Cloud Functions 或 Cloud Run 支持的首选语言部署 Cloud Function。本教程演示了一个使用 PythonJava 实现的 Cloud Function。

指定 Cloud Functions 函数配置参数

  • 触发器。在本示例中,请选择在存储桶中创建新对象或现有对象被覆盖时触发的触发器。

    • 触发器类型。Cloud Storage。

    • 事件类型完成创建/创建

    • 存储桶。选择必须触发此函数的存储桶。

    • 失败时重试。在本示例中,我们建议停用此选项。如果您在生产环境中使用自己的函数,请启用此选项以处理暂时性错误

  • 运行时服务帐号:在运行时、构建、连接和安全设置部分中。根据您的偏好设置,使用以下选项之一:

    • 选择 Compute Engine 默认服务账号。此账号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。

    • 创建自定义服务账号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务账号。此选项遵循最小权限原则。

  • 运行时和入口点,在代码步骤中。为此示例添加代码时,选择 Python 3.7 或更高版本的运行时,并将 trigger_dag 指定为入口点。

添加要求

requirements.txt 文件中指定依赖项:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

将以下代码放入 main.py 文件并进行以下替换:

  • client_id 变量的值替换为您之前获得的 client_id 值。

  • webserver_id 变量的值替换为您的租户项目 ID,它是 Airflow 网页界面网址中 .appspot.com 前面的部分。您之前已获得 Airflow 网页界面网址。

  • 指定您使用的 Airflow REST API 版本:

    • 如果您使用稳定的 Airflow REST API,请将 USE_EXPERIMENTAL_API 变量设置为 False
    • 如果您使用实验性 Airflow REST API,则无需进行任何更改。USE_EXPERIMENTAL_API 变量已设置为 True


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text

# END COPIED IAP CODE

测试函数

要检查函数和 DAG 是否按预期工作,请执行以下操作:

  1. 等待您的函数完成部署。
  2. 向您的 Cloud Storage 存储桶上传一个文件。作为替代方案,您可以在 Google Cloud 控制台中为该函数选择测试函数操作,以手动触发该函数。
  3. 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
  4. 在 Airflow 界面中,检查此运行的任务日志。您应该会看到 print_gcs_info 任务将从函数接收的数据输出到日志中:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

后续步骤