Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页介绍了如何使用 Cloud Run 函数触发 Cloud Composer DAG 来响应事件。
Apache Airflow 设计为定期运行 DAG,但您也可以触发 DAG 来响应事件。方法之一是使用 Cloud Run 函数在发生指定事件时触发 Cloud Composer DAG。
在本指南的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个 DAG。对存储桶中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。
准备工作
检查环境的网络配置
此解决方案不适用于专用 IP 和 VPC Service Controls 配置,因为在这些配置中无法配置 Cloud Run 函数与 Airflow Web 服务器之间的连接。
在 Cloud Composer 2 中,您可以使用另一种方法:使用 Cloud Run 函数和 Pub/Sub 消息触发 DAG
为您的项目启用 API
控制台
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
启用 Airflow REST API
对于 Airflow 2,稳定的 REST API 默认处于启用状态。如果您的环境停用了稳定的 API,则启用稳定的 REST API。
使用 Web 服务器访问控制允许对 Airflow REST API 进行 API 调用
Cloud Run 函数可以使用 IPv4 连接到 Airflow REST API 或 IPv6 地址。
如果您不确定发起调用的 IP 范围,请使用默认值
Web 服务器访问权限控制中的配置选项,即 All IP addresses have access (default)
防止意外阻止您的 Cloud Run 函数
创建 Cloud Storage 存储桶
此示例会在 Cloud Storage 存储桶中发生更改时触发 DAG。请创建一个新存储桶以用于此示例。
获取 Airflow 网络服务器网址
此示例向 Airflow 网络服务器端点发出 REST API 请求。您可以在 Cloud Functions 函数代码中使用 Airflow 网络服务器的网址。
控制台
在 Google Cloud 控制台中,前往环境页面。
点击您的环境的名称。
在环境详情页面上,前往环境配置标签页。
Airflow 网页界面项中列出了 Airflow 网络服务器的网址。
gcloud
运行以下命令:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
将 DAG 上传到您的环境
将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。
部署触发 DAG 的 Cloud Functions 函数
您可以使用 Cloud Run functions 或 Cloud Run 支持的首选语言部署 Cloud Functions 函数。本教程演示了以 Python 和 Java 实现的 Cloud Functions 函数。
指定 Cloud Functions 函数配置参数
触发器。在本示例中,请选择在存储桶中创建新对象或现有对象被覆盖时触发的触发器。
运行时服务账号(位于 运行时、构建、连接和安全设置部分。使用以下任一 根据您的偏好设置,选择以下选项:
选择 Compute Engine 默认服务账号。此账号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。
创建自定义服务账号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务账号。此选项遵循最小权限原则。
运行时和入口点,在代码步骤中:
(Python)为此示例添加代码时,请选择 Python 3.7 或更高版本的运行时,并将
trigger_dag_gcf
指定为入口点。(Java) 为此示例添加代码时,请选择 Java 17 运行时并指定
com.example.Example
作为入口点。
添加要求
Python
在 requirements.txt
文件中指定依赖项:
Java
将以下依赖项添加到 Google Cloud Functions 界面生成的 pom.xml
中的 dependencies
部分。
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-docs</artifactId>
<version>v1-rev20210707-1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.14.0</version>
</dependency>
Python
添加使用 Airflow REST API 触发 DAG 的代码。创建一个名为 composer2_airflow_rest_api.py
的文件,并将进行 Airflow REST API 调用的代码放在此文件中。
请勿更改任何变量。Cloud Functions 函数会从 main.py
文件导入此文件。
将以下代码放入 main.py
文件中。将 web_server_url
变量的值替换为您之前获得的 Airflow 网站服务器地址。
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'composer_sample_trigger_response_dag'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
Java
将以下代码放入 Example.java
文件中。将
将 webServerUrl
变量替换为您之前测试的 Airflow Web 服务器地址
。
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.example;
import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
/**
* Cloud Function that triggers an Airflow DAG in response to an event (in
* this case a Cloud Storage event).
*/
public class Example implements BackgroundFunction<GcsEvent> {
private static final Logger logger = Logger.getLogger(Example.class.getName());
// TODO(developer): replace with your values
// Replace webServerUrl with the Airflow web server address. To obtain this
// URL, run the following command for your environment:
// gcloud composer environments describe example-environment \
// --location=your-composer-region \
// --format="value(config.airflowUri)"
@Override
public void accept(GcsEvent event, Context context) throws Exception {
String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
String dagName = "composer_sample_trigger_response_dag";
String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);
logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
dagName, event.name));
logger.info(String.format("Triggering DAG via the following URL: %s", url));
GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
HttpRequestFactory requestFactory =
new NetHttpTransport().createRequestFactory(credentialsAdapter);
Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
Map<String, String> conf = new HashMap<String, String>();
conf.put("bucket", event.bucket);
conf.put("name", event.name);
conf.put("generation", event.generation);
conf.put("operation", context.eventType());
json.put("conf", conf);
HttpContent content = new JsonHttpContent(new GsonFactory(), json);
HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
request.getHeaders().setContentType("application/json");
HttpResponse response;
try {
response = request.execute();
int statusCode = response.getStatusCode();
logger.info("Response code: " + statusCode);
logger.info(response.parseAsString());
} catch (HttpResponseException e) {
// https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
logger.info("Received HTTP exception");
logger.info(e.getLocalizedMessage());
logger.info("- 400 error: wrong arguments passed to Airflow API");
logger.info("- 401 error: check if service account has Composer User role");
logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
logger.info("- 404 error: check Web Server URL");
} catch (Exception e) {
logger.info("Received exception");
logger.info(e.getLocalizedMessage());
}
}
/** Details of the storage event. */
public static class GcsEvent {
/** Bucket name. */
String bucket;
/** Object name. */
String name;
/** Object version. */
String generation;
}
}
测试函数
要检查函数和 DAG 是否按预期工作,请执行以下操作:
- 等待您的函数完成部署。
- 向您的 Cloud Storage 存储分区上传一个文件。作为替代方案, 可以手动触发函数,方法是选择测试函数 操作。
- 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
- 在 Airflow 界面中,检查此运行的任务日志。您应该会看到
print_gcs_info
任务将从函数接收的数据输出到日志中:
Python
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
Java
[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0