使用 Cloud Functions 触发 DAG

本页面介绍如何使用 Cloud Functions 函数实现基于事件的 DAG 触发器。

虽然 Airflow 设计为定期运行 DAG,但您可以通过事件来触发 DAG。方法之一是使用 Cloud Functions 在发生指定事件时触发 Cloud Composer DAG。例如,您可以创建一个函数,当 Cloud Storage 存储分区中的对象发生更改时或者当消息推送到 Pub/Sub 主题时触发 DAG。

在本指南的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个 DAG。对存储分区中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。

准备工作

为您的项目启用 API

启用 Cloud Composer and Cloud Functions API。

启用 API

启用 Airflow Web 服务器 REST API

默认情况下,Airflow 1.10.11 及更高版本中会停用 API 身份验证功能。 Airflow Web 服务器会拒绝您发出的所有请求。您需要使用请求来触发 DAG,因此请启用此功能。

要启用 API 身份验证功能,请替换以下 Airflow 配置选项

部分 备注
api auth_backend airflow.api.auth.backend.default 默认值为 airflow.api.auth.backend.deny_all

获取 Airflow Web 服务器网址

您的函数会向 Airflow Web 服务器端点发出请求,因此请获取 Airflow Web 服务器的网址。

控制台

如需获取 Airflow Web 服务器网址,请执行以下操作:

  1. 打开环境页面。

    打开“环境”页面

  2. 点击您的环境的名称。
  3. 环境配置下,查看 Airflow 网页界面项。

gcloud

要获取 Airflow Web 服务器网址,请运行以下命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的 Compute Engine 区域。

获取 IAM 代理的 client_id

要向 Airflow REST API 端点发出请求,函数需要获得保护 Airflow Web 服务器的 IAM 代理的客户端 ID。

Cloud Composer 不直接提供此信息,而是向 Airflow Web 服务器发出未经身份验证的请求,并从重定向网址捕获此客户端 ID。

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep "location:"

AIRFLOW_URL 替换为 Airflow 网页界面的网址。

在输出中,搜索以 apps.googleusercontent.com 结尾的 client_id 字符串。例如:

location: https://accounts.google.com/o/oauth2/v2/auth?
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com&response_type= ...

创建 Cloud Storage 存储分区

由于此示例会在 Cloud Storage 存储分区中发生更改时触发 DAG,因此请创建一个新存储分区以用于此示例。

通过 Cloud Functions 函数触发 DAG

将 DAG 上传到您的环境

将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id='composer_sample_trigger_response_dag',
    start_date=days_ago(1),
    schedule_interval=None) as dag:

    # Print the received dag_run configuration.
    # The DAG run configuration contains information about the
    # Cloud Storage object change.
    t1 = BashOperator(
        task_id='print_gcs_info',
        bash_command='echo Triggered from GCF: {{ dag_run.conf }}',
        dag=dag)

    t1

部署触发 DAG 的 Cloud Functions 函数

使用以下配置参数和内容部署 Python Cloud Functions 函数

指定 Cloud Functions 函数配置参数

  • 触发器。在本示例中,请选择在存储分区中创建新对象或现有对象被覆盖时触发的触发器。

    • 触发器类型。Cloud Storage。

    • 事件类型完成创建/创建

    • 存储分区。选择必须触发此函数的存储分区。

    • 失败时重试。在本示例中,我们建议停用此选项。如果您在生产环境中使用自己的函数,请启用此选项以处理暂时性错误

  • 运行时服务帐号。根据您的偏好使用下列选项之一:

    • 选择 Compute Engine 默认服务帐号。此帐号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。

    • 创建自定义服务帐号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务帐号。此选项遵循最小权限原则。

  • 运行时和入口点。为此示例添加代码时,请选择 Python 3.7 运行时,并将 trigger_dag 指定为入口点。

添加要求

requirements.txt 文件中指定依赖项:

requests_toolbelt==0.9.1
google-auth==1.31.0

添加函数代码

将以下代码放入 main.py 文件并进行以下替换:

  • client_id 变量的值替换为上一步中获取的 client_id 值。
  • webserver_id 变量的值替换为上一步中获取的 Airflow 网页界面网址。

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json={"conf": data, "replace_microseconds": 'false'})

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

测试函数

要检查函数和 DAG 是否按预期工作,请执行以下操作:

  1. 等待函数部署完毕。
  2. 向您的 Cloud Storage 存储分区上传一个文件。除此之外,您也可以手动触发函数,方法是在 Google Cloud Console 中为该函数选择测试函数操作。
  3. 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
  4. 在 Airflow 网页界面中,检查此运行的任务日志。您应该会看到 print_gcs_info 任务将从函数接收的数据输出到日志中:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0

后续步骤