使用 Cloud Functions 和 Pub/Sub 消息触发 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面将指导您创建基于事件的推送架构,方法是 触发 Cloud Composer DAG 以响应 Pub/Sub 主题更改。本教程中的示例演示了如何处理整个周期 Pub/Sub 管理(包括订阅管理) DAG 流程的一个部分它适用于一些常见的使用场景 需要触发 DAG,但不想设置额外的访问权限。

例如,通过 Pub/Sub 发送的消息可用作 如果您不想让用户能直接访问 Cloud Composer 安全环境您可以配置 用于创建 Pub/Sub 消息和 发布到 Pub/Sub 主题上然后,您可以创建一个 拉取 Pub/Sub 消息,然后处理这些消息。

在此具体示例中,您将创建一个 Cloud Function 两个 DAG第一个 DAG 拉取 Pub/Sub 消息并触发 根据 Pub/Sub 消息内容生成第二个 DAG。

本教程假定您熟悉 Python 和 Google Cloud 控制台。

目标

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除 您创建的资源如需了解详情,请参阅清理

准备工作

在本教程中,您需要 project。 按以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    转到项目选择器

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 如需创建必要的资源,请确保您的 Google Cloud 项目用户具有以下角色

    • Service Account User (roles/iam.serviceAccountUser)
    • Pub/Sub Editor (roles/pubsub.editor)
    • 环境和存储对象管理员 (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Functions Admin (roles/cloudfunctions.admin)
    • 日志查看器 (roles/logging.viewer)
  4. 确保 运行您的 Cloud Functions 函数的服务账号 在项目中具有访问 Pub/Sub 的足够权限。修改者 默认情况下,Cloud Functions 函数使用 App Engine 默认服务账号。 此服务账号具有 Editor 角色,该角色有足够的权限 本教程的权限。

为您的项目启用 API

控制台

启用 Cloud Composer, Cloud Functions, and Pub/Sub API。

启用 API

gcloud

Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

添加以下代码,在项目中启用 Cloud Composer API 将资源定义添加到 Terraform 脚本中:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID> 替换为项目 ID 项目名称例如 example-project

创建 Cloud Composer 环境

创建一个 Cloud Composer 2 环境

在此过程中 您授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色分配给 Composer Service Agent 。Cloud Composer 使用此账号执行操作 Google Cloud 项目中的资源。

创建 Pub/Sub 主题

此示例会触发 DAG 以响应推送到 Pub/Sub 主题。创建要在此中使用的 Pub/Sub 主题 示例:

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    转到“Pub/Sub 主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入 dag-topic-trigger 作为 主题。

  4. 将其他选项保留为默认值。

  5. 点击创建主题

gcloud

如需创建主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create dag-topic-trigger

Terraform

将以下资源定义添加到您的 Terraform 脚本中:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID> 替换为项目 ID 项目名称例如 example-project

上传 DAG

将 DAG 上传到您的环境:

  1. 将以下 DAG 文件保存在本地计算机上。
  2. <PROJECT_ID> 替换为项目 ID 项目名称例如 example-project
  3. 将修改后的 DAG 文件上传到您的环境。
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

示例代码包含两个 DAG:trigger_dagtarget_dag

trigger_dag DAG 订阅一个 Pub/Sub 主题, Pub/Sub 消息,并触发 DAG ID 中指定的另一个 DAG Pub/Sub 消息数据的不同部分在此示例中,trigger_dag 触发器 target_dag DAG,用于向任务日志输出消息。

trigger_dag DAG 包含以下任务:

  • subscribe_task:订阅 Pub/Sub 主题。
  • pull_messages_operator:读取 Pub/Sub 消息数据 和PubSubPullOperator
  • trigger_target_dag:触发另一个 DAG(在此示例中为 target_dag) 根据从 Pub/Sub 拉取的消息中的数据, 主题。

target_dag DAG 仅包含一项任务:output_to_logs。此任务 以 1 秒延迟将消息输出到任务日志。

部署一个 Cloud Function,用于在 Pub/Sub 主题上发布消息

在本部分中,您将部署一个 Cloud Function,用于在 Pub/Sub 主题上发布消息。

创建 Cloud Function 并指定其配置

控制台

  1. 在 Google Cloud 控制台中,转到 Cloud Functions 页面。

    转到 Cloud Functions

  2. 点击创建函数

  3. 环境字段中,选择第 1 代

  4. 函数名称字段中,输入函数的名称: pubsub-publisher.

  5. 触发器类型字段中,选择 HTTP

  6. 身份验证部分中,选择 允许未通过身份验证的调用。此选项 未经身份验证的用户 能够调用 HTTP 函数。

  7. 点击保存。

  8. 点击下一步以继续执行代码步骤。

Terraform

在这一步中,请考虑使用 Google Cloud 控制台,因为 通过 Terraform 管理函数源代码的简单方法。

此示例演示了如何上传 Cloud Function。 从本地 ZIP 归档文件中获取。 将文件存储在此存储桶中,然后将存储桶中的文件用作 Cloud Function 的源如果使用这种方法 Terraform 不会自动更新函数的源代码, 即使您创建新的归档文件如需重新上传函数代码,您需要 可以更改归档文件的文件名

  1. 下载 pubsub_publisher.pyrequirements.txt 文件。
  2. pubsub_publisher.py 文件中,将 <PROJECT_ID> 替换为 您的项目的项目 ID。例如 example-project
  3. 创建一个名为 pubsub_function.zip 且包含以下内容的 zip 归档文件: pbusub_publisner.pyrequirements.txt 文件。
  4. 将 zip 归档文件保存到存储 Terraform 脚本的目录。
  5. 将以下资源定义添加到您的 Terraform 脚本中,并 将 <PROJECT_ID> 替换为您的项目 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

指定 Cloud Functions 函数代码参数

控制台

  1. 代码步骤的运行时字段中,选择语言 运行时。在此示例中,请选择 Python 3.10

  2. 入口点字段中,输入 pubsub_publisher。代码如下 将在您的 Cloud Function 运行时执行。 此标志必须是函数名称或完全限定类名, 是否存在

Terraform

跳过此步骤。Cloud Function 参数已在 google_cloudfunctions_function 资源。

上传您的 Cloud Function 代码

控制台

源代码字段中,选择相应的选项 提供函数源代码。 在本教程中,使用 Cloud Functions 函数添加函数代码 内嵌编辑器。作为替代方案,您可以上传 ZIP 文件,或使用 Cloud Source Repositories

  1. 将以下代码示例放入 main.py 文件中。
  2. <PROJECT_ID> 替换为项目 ID 项目名称例如 example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

跳过此步骤。Cloud Function 参数已在 google_cloudfunctions_function 资源。

指定 Cloud Function 依赖项

控制台

requirements.txt 元数据文件中指定函数依赖项:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

部署函数时,Cloud Functions 会下载并安装 在 requirements.txt 文件中声明的依赖项,每个软件包占一行。 此文件必须与包含以下内容的 main.py 文件位于同一目录中: 函数代码。有关详情,请参阅 要求文件pip

Terraform

跳过此步骤。Cloud Function 依赖项在 pubsub_function.zip 归档文件中的 requirements.txt 文件。

部署您的 Cloud Function

控制台

点击部署。部署成功完成后,该函数会显示 此存储分区中 Cloud Functions 页面上有一个绿色的对勾标记, Google Cloud 控制台。

请确保运行 Cloud Function 的服务账号 在项目中拥有足够的权限 Pub/Sub。

Terraform

  1. 初始化 Terraform:

    terraform init
    
  2. 查看配置并验证 Terraform 的资源 创建或更新符合您预期的:

    terraform plan
    
  3. 如需检查配置是否有效,请运行以下命令 命令:

    terraform validate
    
  4. 运行以下命令以应用 Terraform 配置 在提示符后输入 yes:

    terraform apply
    

等待 Terraform 显示“应用完成!”消息。

在 Google Cloud 控制台中,前往界面中的资源 确认 Terraform 已经创建或更新它们。

测试 Cloud Functions 函数

检查您的函数是否会在 Pub/Sub 主题上发布消息 并确保示例 DAG 按预期运行:

  1. 检查 DAG 是否处于活跃状态:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    3. 转到 DAG 标签页。

    4. 检查名为 trigger_dag target_dag。两个 DAG 都必须处于 Active 状态。

  2. 推送测试 Pub/Sub 消息。您可以在 Cloud Shell:

    1. 在 Google Cloud 控制台中,转到函数页面。

      转到 Cloud Functions

    2. 点击函数的名称 pubsub-publisher

    3. 转到测试标签页。

    4. 配置触发事件部分中,输入以下内容 JSON 键值对:{"message": "target_dag"}。请勿修改键值对 因为此消息稍后会触发测试 DAG。

    5. 测试命令部分中,点击在 Cloud Shell 中测试

    6. Cloud Shell 终端中,等待命令显示 。按 Enter 运行此命令。

    7. 如果出现授权 Cloud Shell 消息, 点击授权

    8. 检查消息内容是否与 Pub/Sub 消息。在此示例中,输出消息必须以 Message b'target_dag' with message_length 10 published to作为 。

  3. 检查是否触发了 target_dag

    1. 请至少等待一分钟,以便新 DAG 运行 trigger_dag

    2. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    3. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    4. 转到 DAG 标签页。

    5. 点击 trigger_dag 以转到 DAG 详情页面。在运行 标签页,系统会显示 trigger_dag DAG 的 DAG 运行列表。

      此 DAG 每分钟运行一次,处理所有 Pub/Sub 函数发送的消息。如果没有发送任何消息, trigger_target 任务在 DAG 运行日志中被标记为 Skipped。如果 DAG 被触发,则 trigger_target 任务被标记为 Success.

    6. 查看近期的几次 DAG 运行以找到 三项任务(subscribe_taskpull_messages_operatortrigger_target)处于“Success”状态。

    7. 返回 DAG 标签页,并检查 Successful running(成功运行) target_dag DAG 列中列出了一次成功运行。

摘要

在本教程中,您学习了如何使用 Cloud Functions 来发布 Pub/Sub 消息,并部署一个可订阅 Pub/Sub 主题,拉取 Pub/Sub 消息,以及触发器 消息数据的 DAG ID 中指定的另一个 DAG。

你还可以通过其他方式 创建和管理 Pub/Sub 订阅触发 DAG 不超出本教程的讨论范围例如,您可以 使用 Cloud Functions 函数触发 Airflow DAG 当指定事件发生时触发。 查看我们的教程,尝试 Google Cloud 的各项功能。

清理

为避免系统因资源向您的 Google Cloud 账号收取费用 您可以删除包含这些资源的项目 或者保留项目而删除各个资源。

删除项目

    删除 Google Cloud 项目:

    gcloud projects delete PROJECT_ID

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。您 删除环境的存储桶
  2. 删除 Pub/Sub 主题 (dag-topic-trigger)。
  3. 删除 Cloud Function。

    1. 在 Google Cloud 控制台中,转到 Cloud Functions。

      转到 Cloud Functions

    2. 点击要删除的函数对应的复选框。 pubsub-publisher.

    3. 点击删除,然后按照说明操作。

Terraform

  1. 确保您的 Terraform 脚本不包含 项目仍需要的资源例如,您 可能希望让一些 API 保持启用状态, (如果您已将此类定义添加到 Terraform 脚本)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储桶。Cloud Composer 不会自动将其删除您可以从 Google Cloud 控制台或 Google Cloud CLI。

后续步骤