Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页面介绍了如何使用 Cloud Run 函数 响应事件的 Cloud Composer DAG。
Apache Airflow 旨在定期运行 DAG,但您也可以 来触发 DAG 以响应事件。一种方法是使用 要触发的 Cloud Run 函数 发生指定事件时的 Cloud Composer DAG。
在本指南的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个 DAG。对存储桶中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。
准备工作
检查环境的网络配置
此解决方案不适用于专用 IP 和 VPC Service Controls 配置,因为在这些配置中无法配置 Cloud Run 函数与 Airflow Web 服务器之间的连接。
在 Cloud Composer 2 中,您可以使用另一种方法:使用 Cloud Run 函数和 Pub/Sub 消息触发 DAG
为您的项目启用 API
控制台
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
启用 Airflow REST API
根据您的 Airflow 版本,执行以下操作:
- 对于 Airflow 2,稳定的 REST API 默认处于启用状态。如果您的环境停用了稳定的 API,则启用稳定的 REST API。
- 对于 Airflow 1,启用实验性 REST API。
使用 Web 服务器访问控制允许对 Airflow REST API 进行 API 调用
Cloud Run 函数可以使用 IPv4 连接到 Airflow REST API 或 IPv6 地址。
如果您不确定调用 IP 地址范围,请在网站服务器访问控制中使用默认配置选项 All IP addresses have access (default)
,以免意外屏蔽您的 Cloud Run 函数。
创建 Cloud Storage 存储桶
此示例会在 Cloud Storage 存储桶中发生更改时触发 DAG。请创建一个新存储桶以用于此示例。
获取 Airflow 网络服务器网址
此示例向 Airflow 网络服务器端点发出 REST API 请求。您需要在 Cloud Functions 函数代码中使用 .appspot.com
之前的 Airflow 网页界面网址部分。
控制台
在 Google Cloud 控制台中,前往环境页面。
点击您的环境的名称。
在环境详情页面上,前往环境配置标签页。
Airflow 网页界面项中列出了 Airflow 网络服务器的网址。
gcloud
运行以下命令:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
获取 IAM 代理的 client_id
如需向 Airflow REST API 端点发出请求,请执行以下操作: 该函数需要 Identity and Access Management 代理的客户端 ID 用于保护 Airflow Web 服务器
Cloud Composer 不直接提供此信息,而是向 Airflow 网络服务器发出未经身份验证的请求,并从重定向网址捕获此客户端 ID。
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
将 AIRFLOW_URL
替换为 Airflow 网页界面的网址。
在输出中,搜索 client_id
后面的字符串。例如:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
将以下代码保存在名为 get_client_id.py
的文件中。填写 project_id
、location
和 composer_environment
的值,然后在 Cloud Shell 或本地环境中运行代码。
将 DAG 上传到您的环境
将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。
部署触发 DAG 的 Cloud Functions 函数
您可以使用 Google Cloud 支持的首选语言部署 Cloud Functions 函数, Cloud Run 函数或 Cloud Run。本教程演示了以 Python 和 Java 实现的 Cloud Functions 函数。
指定 Cloud Functions 函数配置参数
触发器。在本示例中,请选择在存储桶中创建新对象或现有对象被覆盖时触发的触发器。
运行时服务账号,在运行时、构建、连接和安全设置部分中。根据您的偏好使用下列选项之一:
选择 Compute Engine 默认服务账号。此账号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。
创建自定义服务账号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务账号。此选项遵循最小权限原则。
运行时和入口点,在代码步骤中。为此示例添加代码时,请选择 Python 3.7 或更高版本的运行时,并将
trigger_dag
指定为入口点。
添加要求
在 requirements.txt
文件中指定依赖项:
将以下代码放入 main.py
文件并进行以下替换:
将
client_id
变量的值替换为您之前获得的client_id
值。将
webserver_id
变量的值替换为您的租户项目 ID,它是 Airflow 网页界面网址中.appspot.com
前面的部分。您之前已获得 Airflow 网页界面网址。指定您使用的 Airflow REST API 版本:
- 如果您使用稳定的 Airflow REST API,请将
USE_EXPERIMENTAL_API
变量设置为False
。 - 如果您使用实验性 Airflow REST API,则无需进行任何更改。
USE_EXPERIMENTAL_API
变量已设置为True
。
- 如果您使用稳定的 Airflow REST API,请将
测试函数
要检查函数和 DAG 是否按预期工作,请执行以下操作:
- 等待您的函数完成部署。
- 向您的 Cloud Storage 存储分区上传一个文件。作为替代方案, 可以手动触发函数,方法是选择测试函数 操作。
- 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
- 在 Airflow 界面中,检查此运行的任务日志。您应该会看到
print_gcs_info
任务将从函数接收的数据输出到日志中:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h