使用 Cloud Functions 触发 DAG

本页面介绍如何使用 Cloud Functions 函数实现基于事件的 DAG 触发器。

虽然 Airflow 设计为定期运行 DAG,但您可以通过事件来触发 DAG。方法之一是使用 Cloud Functions 在发生指定事件时触发 Cloud Composer DAG。例如,您可以创建一个函数,当 Cloud Storage 存储桶中的对象发生更改时或者当消息推送到 Pub/Sub 主题时触发 DAG。

在本指南的示例中,每当 Cloud Storage 存储桶发生更改时,系统都会运行一个 DAG。对存储桶中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。

准备工作

为您的项目启用 API

启用 Cloud Composer and Cloud Functions API。

启用 API

启用 Airflow REST API

根据您的 Airflow 版本,执行以下操作:

获取 Airflow 网络服务器网址

您的函数会向 Airflow 网络服务器端点发出请求,因此请获取 Airflow 网络服务器的网址。

控制台

如需获取 Airflow 网络服务器网址,请执行以下操作:

  1. 打开环境页面。

    打开“环境”页面

  2. 点击您的环境的名称。
  3. 环境配置下,查看 Airflow 网页界面项。

gcloud

要获取 Airflow 网络服务器网址,请运行以下命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境名称。
  • LOCATION 替换为环境所在的 Compute Engine 区域。

获取 IAM 代理的 client_id

要向 Airflow REST API 端点发出请求,函数需要获得保护 Airflow 网络服务器的 IAM 代理的客户端 ID。

Cloud Composer 不直接提供此信息,而是向 Airflow 网络服务器发出未经身份验证的请求,并从重定向网址捕获此客户端 ID。

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL 替换为 Airflow 网页界面的网址。

在输出中,搜索 client_id 后面的字符串。例如:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

将以下代码保存在名为 get_client_id.py 的文件中。填写 project_idlocationcomposer_environment 的值,然后在 Cloud Shell 或本地环境中运行代码。

python get_client_id.py

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = ("This script is intended to be used with Composer 1 environments. "
                     "In Composer 2, the Airflow Webserver is not in the tenant project, "
                     "so there is no tenant client ID. "
                     "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details.")
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

创建 Cloud Storage 存储桶

由于此示例会在 Cloud Storage 存储桶中发生更改时触发 DAG,因此请创建一个新存储桶以用于此示例。

通过 Cloud Functions 函数触发 DAG

将 DAG 上传到您的环境

将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。

import datetime

import airflow
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        start_date=datetime.datetime(2021, 1, 1),
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

部署触发 DAG 的 Cloud Functions 函数

使用以下配置参数和内容部署 Python Cloud Functions 函数

指定 Cloud Functions 函数配置参数

  • 触发器。在本示例中,请选择在存储桶中创建新对象或现有对象被覆盖时触发的触发器。

    • 触发器类型。Cloud Storage。

    • 事件类型完成创建/创建

    • 存储桶。选择必须触发此函数的存储桶。

    • 失败时重试。在本示例中,我们建议停用此选项。如果您在生产环境中使用自己的函数,请启用此选项以处理暂时性错误

  • 运行时服务帐号。根据您的偏好使用下列选项之一:

    • 选择 Compute Engine 默认服务帐号。此帐号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。

    • 创建自定义服务帐号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务帐号。此选项遵循最小权限原则。

  • 运行时和入口点。为此示例添加代码时,请选择 Python 3.7 或更高版本的运行时,并将 trigger_dag 指定为入口点。

添加要求

requirements.txt 文件中指定依赖项:

requests_toolbelt==0.9.1
google-auth==2.3.3

添加函数代码

将以下代码放入 main.py 文件并进行以下替换:

  • client_id 变量的值替换为上一步中获取的 client_id 值。
  • webserver_id 变量的值替换为您的租户项目 ID,它是 Airflow 网页界面网址中 .appspot.com 前面的部分。您已在上一步中获得 Airflow 网页界面网址。
  • 指定您使用的 Airflow REST API 版本:

    • 如果您使用稳定的 Airflow REST API,请将 USE_EXPERIMENTAL_API 变量设置为 False
    • 如果您使用实验性 Airflow REST API,则无需进行任何更改。USE_EXPERIMENTAL_API 变量已设置为 True

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

测试函数

要检查函数和 DAG 是否按预期工作,请执行以下操作:

  1. 等待函数部署完毕。
  2. 向您的 Cloud Storage 存储桶上传一个文件。除此之外,您也可以手动触发函数,方法是在 Google Cloud Console 中为该函数选择测试函数操作。
  3. 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
  4. 在 Airflow 网页界面中,检查此运行的任务日志。您应该会看到 print_gcs_info 任务将从函数接收的数据输出到日志中:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0

后续步骤