Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页面将指导您通过触发 Cloud Composer DAG 来响应 Pub/Sub 主题更改,从而创建基于事件的推送架构。本教程中的示例演示了如何在 DAG 流程中处理 Pub/Sub 管理(包括订阅管理)的整个周期。当您需要触发 DAG,但不想设置额外的访问权限时,它适用于一些常见用例。
例如,如果出于安全考虑,您不想让用户直接访问 Cloud Composer 环境,那么可以将通过 Pub/Sub 发送的消息作为一种解决方案。您可以配置一个用于创建 Pub/Sub 消息并将其发布到 Pub/Sub 主题的 Cloud Function。然后,您可以创建一个 DAG 来拉取 Pub/Sub 消息,然后处理这些消息。
在此具体示例中,您将创建一个 Cloud Function 并部署两个 DAG。第一个 DAG 会拉取 Pub/Sub 消息,并根据 Pub/Sub 消息内容触发第二个 DAG。
本教程假定您熟悉 Python 和 Google Cloud 控制台。
目标
费用
本教程使用 Google Cloud 的以下收费组件:
完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理。
准备工作
在本教程中,您需要一个 Google Cloud 项目。按以下方式配置项目:
在 Google Cloud 控制台中,选择或创建项目:
确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能。
如需创建必要的资源,请确保您的 Google Cloud 项目用户具有以下角色:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub Editor (
roles/pubsub.editor
) - Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Functions Admin (
roles/cloudfunctions.admin
) - 日志查看器 (
roles/logging.viewer
)
- Service Account User (
确保运行 Cloud Functions 函数的服务帐号在您的项目中具有足够的权限来访问 Pub/Sub。默认情况下,Cloud Functions 使用 App Engine 默认服务帐号。此服务帐号具有 Editor 角色,该角色对本教程具有足够的权限。
为您的项目启用 API
控制台
启用 Cloud Composer, Cloud Functions, and Pub/Sub API。
gcloud
Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
通过将以下资源定义添加到 Terraform 脚本,在项目中启用 Cloud Composer API:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
将 <PROJECT_ID>
替换为您的项目的项目 ID。例如 example-project
。
创建 Cloud Composer 环境
在此过程中,您需要向 Composer Service Agent 帐号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext
) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。
创建 Pub/Sub 主题
此示例会触发 DAG 以响应推送到 Pub/Sub 主题的消息。创建要在此示例中使用的 Pub/Sub 主题:
控制台
在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。
点击创建主题。
在主题 ID 字段中,输入
dag-topic-trigger
作为主题的 ID。将其他选项保留为默认值。
点击创建主题。
gcloud
如需创建主题,请在 Google Cloud CLI 中运行 gcloud pubsub topics create 命令:
gcloud pubsub topics create dag-topic-trigger
Terraform
将以下资源定义添加到您的 Terraform 脚本中:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
将 <PROJECT_ID>
替换为您的项目的项目 ID。例如 example-project
。
上传 DAG
将 DAG 上传到您的环境:
示例代码包含两个 DAG:trigger_dag
和 target_dag
。
trigger_dag
DAG 订阅一个 Pub/Sub 主题,拉取 Pub/Sub 消息,然后触发 Pub/Sub 消息数据的 DAG ID 中指定的另一个 DAG。在此示例中,trigger_dag
会触发 target_dag
DAG,该 DAG 会将消息输出到任务日志。
trigger_dag
DAG 包含以下任务:
subscribe_task
:订阅 Pub/Sub 主题。pull_messages_operator
:使用PubSubPullOperator
读取 Pub/Sub 消息数据。trigger_target_dag
:根据从 Pub/Sub 主题拉取的消息中的数据,触发另一个 DAG(在本示例中为target_dag
)。
target_dag
DAG 仅包含一项任务:output_to_logs
。此任务会将消息输出到任务日志,延迟 1 秒。
部署一个 Cloud Function,用于在 Pub/Sub 主题上发布消息
在本部分中,您将部署一个 Cloud Function,用于在 Pub/Sub 主题上发布消息。
创建 Cloud Function 并指定其配置
控制台
在 Google Cloud 控制台中,转到 Cloud Functions 页面。
点击创建函数。
在环境字段中,选择第 1 代。
在函数名称字段中,输入函数的名称:
pubsub-publisher
。在触发器类型字段中,选择 HTTP。
在身份验证部分中,选择允许未通过身份验证的调用。此选项会为未经身份验证的用户授予调用 HTTP 函数的权限。
点击保存。
点击下一步以继续执行代码步骤。
Terraform
在此步骤中,请考虑使用 Google Cloud 控制台,因为无法从 Terraform 直接管理函数的源代码。
本示例演示了如何通过以下方法通过本地 zip 归档文件上传 Cloud Function:创建一个 Cloud Storage 存储桶,将该文件存储在此存储桶中,然后将该存储桶中的文件用作 Cloud Function 函数的来源。如果您使用此方法,即使您创建新的归档文件,Terraform 也不会自动更新函数的源代码。如需重新上传函数代码,您可以更改归档文件的文件名。
- 下载
pubsub_publisher.py
和requirements.txt
文件。 - 在
pubsub_publisher.py
文件中,将<PROJECT_ID>
替换为您的项目 ID。例如example-project
。 - 使用
pbusub_publisner.py
和requirements.txt
文件创建一个名为pubsub_function.zip
的 zip 归档文件。 - 将 zip 归档文件保存到存储 Terraform 脚本的目录。
- 将以下资源定义添加到您的 Terraform 脚本中,并将
<PROJECT_ID>
替换为您的项目 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
指定 Cloud Functions 函数代码参数
控制台
在代码步骤的运行时字段中,选择您的函数使用的语言运行时。在此示例中,请选择 Python 3.10。
在入口点字段中,输入
pubsub_publisher
。这是您的 Cloud Function 运行时执行的代码。此标志的值必须是源代码中存在的函数名称或完全限定类名。
Terraform
跳过此步骤。google_cloudfunctions_function
资源中已定义 Cloud Function 参数。
上传您的 Cloud Function 代码
控制台
在源代码字段中,选择有关如何提供函数源代码的相应选项。在本教程中,使用 Cloud Functions 内嵌编辑器添加函数代码。作为替代方案,您可以上传 ZIP 文件或使用 Cloud Source Repositories。
- 将以下代码示例放入 main.py 文件中。
- 将
<PROJECT_ID>
替换为您的项目的项目 ID。例如example-project
。
Terraform
跳过此步骤。google_cloudfunctions_function
资源中已定义 Cloud Function 参数。
指定 Cloud Function 依赖项
控制台
在 requirements.txt 元数据文件中指定函数依赖项:
部署您的函数时,Cloud Functions 将下载并安装 requirements.txt 文件中声明的依赖项,每个软件包占一行。此文件必须与包含函数代码的 main.py 文件位于同一目录中。如需了解详情,请参阅 pip
文档中的要求文件。
Terraform
跳过此步骤。Cloud Function 依赖项在 pubsub_function.zip
归档的 requirements.txt
文件中定义。
部署您的 Cloud Function
控制台
点击部署。部署成功完成后,该函数会在 Google Cloud 控制台的 Cloud Functions 页面上显示一个绿色对勾标记。
请确保运行 Cloud Function 的服务帐号在您的项目中具有足够的权限来访问 Pub/Sub。
Terraform
初始化 Terraform:
terraform init
查看配置并验证 Terraform 将要创建或更新的资源是否符合您的预期:
terraform plan
如需检查配置是否有效,请运行以下命令:
terraform validate
运行以下命令并在提示符处输入 yes,以应用 Terraform 配置:
terraform apply
等待 Terraform 显示“应用完成!”消息。
在 Google Cloud 控制台中,导航到界面中的资源,确保 Terraform 已创建或更新这些资源。
测试 Cloud Functions 函数
如需检查您的函数是否针对 Pub/Sub 主题发布消息以及示例 DAG 是否按预期运行,请执行以下操作:
检查 DAG 是否处于活跃状态:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到 DAG 标签页。
检查名为
trigger_dag
和target_dag
的 DAG 的 State 列中的值。两个 DAG 都必须处于Active
状态。
推送测试 Pub/Sub 消息。您可以在 Cloud Shell 中执行此操作:
在 Google Cloud 控制台中,转到函数页面。
点击函数的名称
pubsub-publisher
。转到测试标签页。
在配置触发事件部分中,输入以下 JSON 键值对:
{"message": "target_dag"}
。请勿修改键值对,因为此消息稍后会触发测试 DAG。在测试命令部分中,点击在 Cloud Shell 中测试。
在 Cloud Shell 终端中,等待命令自动显示。按
Enter
运行此命令。如果系统显示授权 Cloud Shell 消息,请点击授权。
检查消息内容是否与 Pub/Sub 消息对应。在此示例中,输出消息必须以
Message b'target_dag' with message_length 10 published to
开头,作为函数的响应。
检查是否触发了
target_dag
:等待至少一分钟,以便
trigger_dag
的新 DAG 运行完成。在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到 DAG 标签页。
点击
trigger_dag
以转到 DAG 详情页面。Runs(运行)标签页会显示trigger_dag
DAG 的 DAG 运行列表。此 DAG 每分钟运行一次,并处理从该函数发送的所有 Pub/Sub 消息。如果未发送任何消息,则在 DAG 运行日志中,
trigger_target
任务会标记为Skipped
。如果触发了 DAG,则trigger_target
任务会被标记为Success
。查看近期的几次 DAG 运行,找到所有三项任务(
subscribe_task
、pull_messages_operator
和trigger_target
)都处于Success
状态的 DAG 运行。返回 DAGs 标签页,然后检查
target_dag
DAG 的 成功运行列是否列出了一次成功的运行。
摘要
在本教程中,您学习了如何使用 Cloud Functions 在 Pub/Sub 主题上发布消息,并部署一个 DAG 来订阅 Pub/Sub 主题、拉取 Pub/Sub 消息,并触发消息数据的 DAG ID 中指定的另一个 DAG。
还有一些其他方法可以创建和管理 Pub/Sub 订阅以及触发 DAG,这些内容不在本教程的讨论范围内。例如,您可以在发生指定事件时使用 Cloud Functions 触发 Airflow DAG。查看我们的教程,亲自试用其他 Google Cloud 功能。
清理
为避免系统因本教程中使用的资源向您的 Google Cloud 帐号收取费用,请删除包含这些资源的项目,或者保留项目并删除各个资源。
删除项目
删除 Google Cloud 项目:
gcloud projects delete PROJECT_ID
删除各个资源
如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。
控制台
- 删除 Cloud Composer 环境。在此过程中,您还将删除环境的存储桶。
- 删除 Pub/Sub 主题 (
dag-topic-trigger
)。 删除 Cloud Function。
在 Google Cloud 控制台中,转到 Cloud Functions。
点击要删除的函数
pubsub-publisher
对应的复选框。点击删除,然后按照说明操作。
Terraform
- 请确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能想使某些 API 保持启用状态,同时仍分配 IAM 权限(如果您向 Terraform 脚本添加了此类定义)。
- 运行
terraform destroy
。 - 手动删除环境的存储桶。Cloud Composer 不会自动将其删除。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。
后续步骤
- 测试 DAG
- 测试 HTTP 函数
- 部署 Cloud Functions 函数
- 试用其他 Google Cloud 功能。查阅我们的教程。