Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页将引导您通过触发 Cloud Composer DAG 来响应 Pub/Sub 主题更改,从而创建基于事件的推送架构。本教程中的示例演示了如何在 DAG 流程中处理 Pub/Sub 管理的整个周期,包括订阅管理。当您需要触发 DAG,但不想设置额外的访问权限时,此方法适用于某些常见用例。
例如,如果您出于安全考虑不想提供对 Cloud Composer 环境的直接访问权限,则可以使用通过 Pub/Sub 发送的消息作为解决方案。您可以配置 Cloud Run 函数,以创建 Pub/Sub 消息并将其发布到 Pub/Sub 主题。然后,您可以创建一个 DAG,用于拉取 Pub/Sub 消息,然后处理这些消息。
在本示例中,您将创建一个 Cloud Run 函数并部署两个 DAG。第一个 DAG 会拉取 Pub/Sub 消息,并根据 Pub/Sub 消息内容触发第二个 DAG。
本教程假定您熟悉 Python 和 Google Cloud 控制台。
目标
费用
本教程使用 Google Cloud的以下可计费组件:
完成本教程后,您可以通过删除您创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
在本教程中,您需要一个 Google Cloud 项目。按如下方式配置项目:
在 Google Cloud 控制台中,选择或创建项目:
确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能。
确保您的 Google Cloud 项目用户具有以下角色,以便创建必要的资源:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub Editor (
roles/pubsub.editor
) - Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run functions Admin (
roles/cloudfunctions.admin
) - Logs Viewer (
roles/logging.viewer
)
- Service Account User (
确保运行 Cloud Run 函数的服务账号在您的项目中拥有足够的权限来访问 Pub/Sub。默认情况下,Cloud Run 函数使用 App Engine 默认服务账号。此服务账号具有 Editor 角色,该角色具有本教程所需的足够权限。
为您的项目启用 API
控制台
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
将以下资源定义添加到 Terraform 脚本中,以便在项目中启用 Cloud Composer API:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
将 <PROJECT_ID>
替换为项目的项目 ID。例如 example-project
。
创建 Cloud Composer 环境
在此过程中,您需要向 Composer Service Agent 账号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext
) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。
创建 Pub/Sub 主题
此示例会针对推送到 Pub/Sub 主题的消息触发 DAG。创建一个 Pub/Sub 主题以在此示例中使用:
控制台
在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。
点击创建主题。
在主题 ID 字段中,输入
dag-topic-trigger
作为主题的 ID。将其他选项保留为默认值。
点击创建主题。
gcloud
如需创建主题,请在 Google Cloud CLI 中运行 gcloud pubsub topics create 命令:
gcloud pubsub topics create dag-topic-trigger
Terraform
将以下资源定义添加到 Terraform 脚本中:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
将 <PROJECT_ID>
替换为项目的项目 ID。例如 example-project
。
上传 DAG
将 DAG 上传到您的环境:
示例代码包含两个 DAG:trigger_dag
和 target_dag
。
trigger_dag
DAG 会订阅 Pub/Sub 主题、拉取 Pub/Sub 消息,并触发 Pub/Sub 消息数据的 DAG ID 中指定的另一个 DAG。在此示例中,trigger_dag
会触发 target_dag
DAG,后者会将消息输出到任务日志。
trigger_dag
DAG 包含以下任务:
subscribe_task
:订阅 Pub/Sub 主题。pull_messages_operator
:使用PubSubPullOperator
读取 Pub/Sub 消息数据。trigger_target_dag
:根据从 Pub/Sub 主题中拉取的消息中的数据触发另一个 DAG(在本例中为target_dag
)。
target_dag
DAG 仅包含一个任务:output_to_logs
。此任务会延迟 1 秒钟,然后将消息输出到任务日志。
部署用于在 Pub/Sub 主题中发布消息的 Cloud Run 函数
在本部分中,您将部署一个 Cloud Run 函数,用于在 Pub/Sub 主题中发布消息。
创建 Cloud Run 函数并指定其配置
控制台
在 Google Cloud 控制台中,前往 Cloud Run 函数页面。
点击创建函数。
在环境字段中,选择第 1 代。
在函数名称字段中,输入函数的名称:
pubsub-publisher
。在触发器类型字段中,选择 HTTP。
在身份验证部分中,选择允许未通过身份验证的调用。此选项可向未经身份验证的用户授予调用 HTTP 函数的权限。
点击“保存”。
点击下一步以继续执行代码步骤。
Terraform
请考虑使用 Google Cloud 控制台执行此步骤,因为没有简单的方法可以通过 Terraform 管理函数的源代码。
此示例演示了如何通过创建 Cloud Storage 存储分区、将文件存储在此存储分区中,然后将存储分区中的文件用作 Cloud Run 函数的来源,从本地 zip 归档文件上传 Cloud Run 函数。如果您使用此方法,Terraform 不会自动更新函数的源代码,即使您创建了新的归档文件也是如此。如需重新上传函数代码,您可以更改归档文件的文件名。
- 下载
pubsub_publisher.py
和requirements.txt
文件。 - 在
pubsub_publisher.py
文件中,将<PROJECT_ID>
替换为项目的项目 ID。例如example-project
。 - 使用
pbusub_publisner.py
和requirements.txt
文件创建一个名为pubsub_function.zip
的 ZIP 归档文件。 - 将 ZIP 归档文件保存到存储 Terraform 脚本的目录。
- 将以下资源定义添加到 Terraform 脚本中,并将
<PROJECT_ID>
替换为项目的项目 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
指定 Cloud Run 函数代码参数
控制台
在代码步骤的运行时字段中,选择函数要使用的语言运行时。在此示例中,选择 Python 3.10。
在入口点字段中,输入
pubsub_publisher
。这是在您的 Cloud Run 函数运行时执行的代码。此标志的值必须是源代码中存在的函数名称或完全限定类名称。
Terraform
跳过此步骤。Cloud Run 函数参数已在 google_cloudfunctions_function
资源中定义。
上传 Cloud Run 函数代码
控制台
在源代码字段中,选择提供函数源代码的适当选项。在本教程中,您将使用 Cloud Run Functions 内嵌编辑器添加函数代码。或者,您也可以上传 ZIP 文件或使用 Cloud Source Repositories。
- 将以下代码示例放入 main.py 文件中。
- 将
<PROJECT_ID>
替换为项目的项目 ID。例如example-project
。
Terraform
跳过此步骤。Cloud Run 函数参数已在 google_cloudfunctions_function
资源中定义。
指定 Cloud Run 函数依赖项
控制台
在 requirements.txt 元数据文件中指定函数依赖项:
部署函数时,Cloud Run functions 会下载并安装 requirements.txt 文件中声明的依赖项,每个软件包对应一行内容。此文件必须与包含函数代码的 main.py 文件位于同一目录中。如需了解详情,请参阅 pip
文档中的要求文件。
Terraform
跳过此步骤。Cloud Run 函数依赖项在 pubsub_function.zip
归档文件的 requirements.txt
文件中定义。
部署 Cloud Run 函数
控制台
点击部署。部署成功完成后,Google Cloud 控制台的 Cloud Run 函数页面中会显示带有绿色对勾标记的函数。
确保运行 Cloud Run 函数的服务账号在您的项目中拥有足够的权限来访问 Pub/Sub。
Terraform
初始化 Terraform:
terraform init
查看配置并验证 Terraform 将创建或更新的资源是否符合您的预期:
terraform plan
如需检查配置是否有效,请运行以下命令:
terraform validate
通过运行以下命令并在提示符处输入“yes”,应用 Terraform 配置:
terraform apply
等待 Terraform 显示“应用完成!”消息。
在 Google Cloud 控制台中,在界面中找到资源,以确保 Terraform 已创建或更新它们。
测试 Cloud Run 函数
如需检查您的函数是否向 Pub/Sub 主题发布消息,以及示例 DAG 是否按预期运行,请执行以下操作:
检查 DAG 是否处于活动状态:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往 DAG 标签页。
检查名为
trigger_dag
和target_dag
的 DAG 的 State 列中的值。这两个 DAG 都必须处于Active
状态。
推送测试 Pub/Sub 消息。您可以在 Cloud Shell 中执行此操作:
在 Google Cloud 控制台中,前往 Functions 页面。
点击函数名称
pubsub-publisher
。前往测试标签页。
在配置触发事件部分,输入以下 JSON 键值对:
{"message": "target_dag"}
。请勿修改键值对,因为此消息稍后会触发测试 DAG。在测试命令部分,点击在 Cloud Shell 中测试。
在 Cloud Shell 终端中,等待系统自动显示命令。按
Enter
运行此命令。如果系统显示为 Cloud Shell 提供授权消息,请点击授权。
检查消息内容是否与 Pub/Sub 消息相符。在此示例中,输出消息必须以
Message b'target_dag' with message_length 10 published to
开头,作为函数的响应。
检查是否触发了
target_dag
:等待至少一分钟,以便
trigger_dag
的新的 DAG 运行完成。在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往 DAG 标签页。
点击
trigger_dag
以前往 DAG 详情页面。运行标签页上会显示trigger_dag
DAG 的 DAG 运行列表。此 DAG 每分钟运行一次,并处理从函数发送的所有 Pub/Sub 消息。如果未发送任何消息,则 DAG 运行日志中会将
trigger_target
任务标记为Skipped
。如果触发了 DAG,则trigger_target
任务会被标记为Success
。查看最近的几个 DAG 运行作业,找到一个所有三个任务(
subscribe_task
、pull_messages_operator
和trigger_target
)均处于Success
状态的 DAG 运行作业。返回 DAG 标签页,检查
target_dag
DAG 的成功运行列是否列出了一次成功运行。
摘要
在本教程中,您学习了如何使用 Cloud Run 函数在 Pub/Sub 主题上发布消息,以及如何部署一个 DAG,用于订阅 Pub/Sub 主题、拉取 Pub/Sub 消息,并触发消息数据的 DAG ID 中指定的另一个 DAG。
除了本教程中介绍的方法外,还有其他方法可用于创建和管理 Pub/Sub 订阅以及触发 DAG。例如,您可以在发生指定事件时使用 Cloud Run 函数触发 Airflow DAG。查看我们的教程,亲自试用其他Google Cloud 功能。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
删除各个资源
如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。
控制台
- 删除 Cloud Composer 环境。在此过程中,您还会删除环境的存储分区。
- 删除 Pub/Sub 主题
dag-topic-trigger
。 删除 Cloud Run 函数。
在 Google Cloud 控制台中,前往 Cloud Run functions。
点击要删除的函数
pubsub-publisher
对应的复选框。点击删除,然后按照说明操作。
Terraform
- 确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能希望保留某些已启用的 API 并继续分配 IAM 权限(如果您已向 Terraform 脚本添加了此类定义)。
- 运行
terraform destroy
。 - 手动删除环境的存储分区。Cloud Composer 不会自动删除该存储分区。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。
后续步骤
- 测试 DAG
- 测试 HTTP 函数
- 部署 Cloud Run 函数
- 试用其他 Google Cloud 功能。查看我们的教程。