使用 Cloud Functions 函数和 Pub/Sub 消息触发 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页将引导您通过触发 Cloud Composer DAG 来响应 Pub/Sub 主题更改,从而创建基于事件的推送架构。本教程中的示例演示了如何在 DAG 流程中处理 Pub/Sub 管理的整个周期,包括订阅管理。当您需要触发 DAG,但不想设置额外的访问权限时,此方法适用于某些常见用例。

例如,如果您出于安全考虑不想提供对 Cloud Composer 环境的直接访问权限,则可以使用通过 Pub/Sub 发送的消息作为解决方案。您可以配置 Cloud Run 函数,以创建 Pub/Sub 消息并将其发布到 Pub/Sub 主题。然后,您可以创建一个 DAG,用于拉取 Pub/Sub 消息,然后处理这些消息。

在本示例中,您将创建一个 Cloud Run 函数并部署两个 DAG。第一个 DAG 会拉取 Pub/Sub 消息,并根据 Pub/Sub 消息内容触发第二个 DAG。

本教程假定您熟悉 Python 和 Google Cloud 控制台。

目标

费用

本教程使用 Google Cloud的以下可计费组件:

完成本教程后,您可以通过删除您创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

在本教程中,您需要一个 Google Cloud 项目。按如下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    前往“项目选择器”

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 确保您的 Google Cloud 项目用户具有以下角色,以便创建必要的资源:

    • Service Account User (roles/iam.serviceAccountUser)
    • Pub/Sub Editor (roles/pubsub.editor)
    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Run functions Admin (roles/cloudfunctions.admin)
    • Logs Viewer (roles/logging.viewer)
  4. 确保运行 Cloud Run 函数的服务账号在您的项目中拥有足够的权限来访问 Pub/Sub。默认情况下,Cloud Run 函数使用 App Engine 默认服务账号。此服务账号具有 Editor 角色,该角色具有本教程所需的足够权限。

为您的项目启用 API

控制台

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

将以下资源定义添加到 Terraform 脚本中,以便在项目中启用 Cloud Composer API:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID> 替换为项目的项目 ID。例如 example-project

创建 Cloud Composer 环境

创建 Cloud Composer 2 环境

在此过程中,您需要向 Composer Service Agent 账号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。

创建 Pub/Sub 主题

此示例会针对推送到 Pub/Sub 主题的消息触发 DAG。创建一个 Pub/Sub 主题以在此示例中使用:

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    前往“Pub/Sub 主题”页面

  2. 点击创建主题

  3. 主题 ID 字段中,输入 dag-topic-trigger 作为主题的 ID。

  4. 将其他选项保留为默认值。

  5. 点击创建主题

gcloud

如需创建主题,请在 Google Cloud CLI 中运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create dag-topic-trigger

Terraform

将以下资源定义添加到 Terraform 脚本中:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID> 替换为项目的项目 ID。例如 example-project

上传 DAG

将 DAG 上传到您的环境:

  1. 将以下 DAG 文件保存到您的本地计算机上。
  2. <PROJECT_ID> 替换为项目的项目 ID。例如 example-project
  3. 将修改后的 DAG 文件上传到您的环境。
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

示例代码包含两个 DAG:trigger_dagtarget_dag

trigger_dag DAG 会订阅 Pub/Sub 主题、拉取 Pub/Sub 消息,并触发 Pub/Sub 消息数据的 DAG ID 中指定的另一个 DAG。在此示例中,trigger_dag 会触发 target_dag DAG,后者会将消息输出到任务日志。

trigger_dag DAG 包含以下任务:

  • subscribe_task:订阅 Pub/Sub 主题。
  • pull_messages_operator:使用 PubSubPullOperator 读取 Pub/Sub 消息数据。
  • trigger_target_dag:根据从 Pub/Sub 主题中拉取的消息中的数据触发另一个 DAG(在本例中为 target_dag)。

target_dag DAG 仅包含一个任务:output_to_logs。此任务会延迟 1 秒钟,然后将消息输出到任务日志。

部署用于在 Pub/Sub 主题中发布消息的 Cloud Run 函数

在本部分中,您将部署一个 Cloud Run 函数,用于在 Pub/Sub 主题中发布消息。

创建 Cloud Run 函数并指定其配置

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Run 函数页面。

    前往 Cloud Run 函数

  2. 点击创建函数

  3. 环境字段中,选择第 1 代

  4. 函数名称字段中,输入函数的名称:pubsub-publisher

  5. 触发器类型字段中,选择 HTTP

  6. 身份验证部分中,选择允许未通过身份验证的调用。此选项可向未经身份验证的用户授予调用 HTTP 函数的权限。

  7. 点击“保存”。

  8. 点击下一步以继续执行代码步骤。

Terraform

请考虑使用 Google Cloud 控制台执行此步骤,因为没有简单的方法可以通过 Terraform 管理函数的源代码。

此示例演示了如何通过创建 Cloud Storage 存储分区、将文件存储在此存储分区中,然后将存储分区中的文件用作 Cloud Run 函数的来源,从本地 zip 归档文件上传 Cloud Run 函数。如果您使用此方法,Terraform 不会自动更新函数的源代码,即使您创建了新的归档文件也是如此。如需重新上传函数代码,您可以更改归档文件的文件名。

  1. 下载 pubsub_publisher.pyrequirements.txt 文件。
  2. pubsub_publisher.py 文件中,将 <PROJECT_ID> 替换为项目的项目 ID。例如 example-project
  3. 使用 pbusub_publisner.pyrequirements.txt 文件创建一个名为 pubsub_function.zip 的 ZIP 归档文件。
  4. 将 ZIP 归档文件保存到存储 Terraform 脚本的目录。
  5. 将以下资源定义添加到 Terraform 脚本中,并将 <PROJECT_ID> 替换为项目的项目 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

指定 Cloud Run 函数代码参数

控制台

  1. 代码步骤的运行时字段中,选择函数要使用的语言运行时。在此示例中,选择 Python 3.10

  2. 入口点字段中,输入 pubsub_publisher。这是在您的 Cloud Run 函数运行时执行的代码。此标志的值必须是源代码中存在的函数名称或完全限定类名称。

Terraform

跳过此步骤。Cloud Run 函数参数已在 google_cloudfunctions_function 资源中定义。

上传 Cloud Run 函数代码

控制台

源代码字段中,选择提供函数源代码的适当选项。在本教程中,您将使用 Cloud Run Functions 内嵌编辑器添加函数代码。或者,您也可以上传 ZIP 文件或使用 Cloud Source Repositories。

  1. 将以下代码示例放入 main.py 文件中。
  2. <PROJECT_ID> 替换为项目的项目 ID。例如 example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

跳过此步骤。Cloud Run 函数参数已在 google_cloudfunctions_function 资源中定义。

指定 Cloud Run 函数依赖项

控制台

requirements.txt 元数据文件中指定函数依赖项:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

部署函数时,Cloud Run functions 会下载并安装 requirements.txt 文件中声明的依赖项,每个软件包对应一行内容。此文件必须与包含函数代码的 main.py 文件位于同一目录中。如需了解详情,请参阅 pip 文档中的要求文件

Terraform

跳过此步骤。Cloud Run 函数依赖项在 pubsub_function.zip 归档文件的 requirements.txt 文件中定义。

部署 Cloud Run 函数

控制台

点击部署。部署成功完成后,Google Cloud 控制台的 Cloud Run 函数页面中会显示带有绿色对勾标记的函数。

确保运行 Cloud Run 函数的服务账号在您的项目中拥有足够的权限来访问 Pub/Sub。

Terraform

  1. 初始化 Terraform:

    terraform init
    
  2. 查看配置并验证 Terraform 将创建或更新的资源是否符合您的预期:

    terraform plan
    
  3. 如需检查配置是否有效,请运行以下命令:

    terraform validate
    
  4. 通过运行以下命令并在提示符处输入“yes”,应用 Terraform 配置:

    terraform apply
    

等待 Terraform 显示“应用完成!”消息。

在 Google Cloud 控制台中,在界面中找到资源,以确保 Terraform 已创建或更新它们。

测试 Cloud Run 函数

如需检查您的函数是否向 Pub/Sub 主题发布消息,以及示例 DAG 是否按预期运行,请执行以下操作:

  1. 检查 DAG 是否处于活动状态:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    3. 前往 DAG 标签页。

    4. 检查名为 trigger_dagtarget_dag 的 DAG 的 State 列中的值。这两个 DAG 都必须处于 Active 状态。

  2. 推送测试 Pub/Sub 消息。您可以在 Cloud Shell 中执行此操作:

    1. 在 Google Cloud 控制台中,前往 Functions 页面。

      前往 Cloud Run 函数

    2. 点击函数名称 pubsub-publisher

    3. 前往测试标签页。

    4. 配置触发事件部分,输入以下 JSON 键值对:{"message": "target_dag"}。请勿修改键值对,因为此消息稍后会触发测试 DAG。

    5. 测试命令部分,点击在 Cloud Shell 中测试

    6. Cloud Shell 终端中,等待系统自动显示命令。按 Enter 运行此命令。

    7. 如果系统显示为 Cloud Shell 提供授权消息,请点击授权

    8. 检查消息内容是否与 Pub/Sub 消息相符。在此示例中,输出消息必须以 Message b'target_dag' with message_length 10 published to 开头,作为函数的响应。

  3. 检查是否触发了 target_dag

    1. 等待至少一分钟,以便 trigger_dag 的新的 DAG 运行完成。

    2. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    3. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    4. 前往 DAG 标签页。

    5. 点击 trigger_dag 以前往 DAG 详情页面。运行标签页上会显示 trigger_dag DAG 的 DAG 运行列表。

      此 DAG 每分钟运行一次,并处理从函数发送的所有 Pub/Sub 消息。如果未发送任何消息,则 DAG 运行日志中会将 trigger_target 任务标记为 Skipped。如果触发了 DAG,则 trigger_target 任务会被标记为 Success

    6. 查看最近的几个 DAG 运行作业,找到一个所有三个任务(subscribe_taskpull_messages_operatortrigger_target)均处于 Success 状态的 DAG 运行作业。

    7. 返回 DAG 标签页,检查 target_dag DAG 的成功运行列是否列出了一次成功运行。

摘要

在本教程中,您学习了如何使用 Cloud Run 函数在 Pub/Sub 主题上发布消息,以及如何部署一个 DAG,用于订阅 Pub/Sub 主题、拉取 Pub/Sub 消息,并触发消息数据的 DAG ID 中指定的另一个 DAG。

除了本教程中介绍的方法外,还有其他方法可用于创建和管理 Pub/Sub 订阅以及触发 DAG。例如,您可以在发生指定事件时使用 Cloud Run 函数触发 Airflow DAG查看我们的教程,亲自试用其他Google Cloud 功能。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。在此过程中,您还会删除环境的存储分区。
  2. 删除 Pub/Sub 主题 dag-topic-trigger
  3. 删除 Cloud Run 函数。

    1. 在 Google Cloud 控制台中,前往 Cloud Run functions。

      前往 Cloud Run 函数

    2. 点击要删除的函数 pubsub-publisher 对应的复选框。

    3. 点击删除,然后按照说明操作。

Terraform

  1. 确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能希望保留某些已启用的 API 并继续分配 IAM 权限(如果您已向 Terraform 脚本添加了此类定义)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储分区。Cloud Composer 不会自动删除该存储分区。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。

后续步骤