使用 Cloud Functions 和 Pub/Sub 訊息觸發 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面將引導您建立以事件為基礎的推送架構,方法是根據 Pub/Sub 主題的變更觸發 Cloud Composer DAG。本教學課程中的範例會示範如何處理 Pub/Sub 管理的完整週期,包括訂閱管理,這是 DAG 程序的一部分。當您需要觸發 DAG,但不想設定額外存取權時,這項功能適用於部分常見用途。

舉例來說,如果基於安全考量,您不想直接存取 Cloud Composer 環境,可以透過 Pub/Sub 傳送訊息做為解決方案。您可以設定 Cloud Run 函式,建立 Pub/Sub 訊息並發布至 Pub/Sub 主題。接著,您可以建立 DAG,從 Pub/Sub 提取訊息,然後處理這些訊息。

在這個特定範例中,您會建立 Cloud Run 函式並部署兩個 DAG。第一個 DAG 會提取 Pub/Sub 訊息,並根據 Pub/Sub 訊息內容觸發第二個 DAG。

本教學課程假設您熟悉 Python 和 Google Cloud 控制台。

目標

費用

本教學課程使用下列 Google Cloud的計費元件:

完成本教學課程後,您可以刪除自己建立的資源,避免系統繼續計費。詳情請參閱「清除」一節。

事前準備

本教學課程需要 Google Cloud 專案。請按以下方式設定專案:

  1. 在 Google Cloud 控制台中,選取或建立專案

    前往專案選取器

  2. 請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能

  3. 請確認 Google Cloud 專案使用者具有下列角色,可建立必要資源:

    • 服務帳戶使用者 (roles/iam.serviceAccountUser)
    • Pub/Sub 編輯者 (roles/pubsub.editor)
    • 環境與 Storage 物件管理員 (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Run functions 管理員 (roles/cloudfunctions.admin)
    • 記錄檢視器 (roles/logging.viewer)
  4. 請確認執行 Cloud Run 函式的服務帳戶在專案中擁有足夠的 Pub/Sub 存取權限。根據預設,Cloud Run 函式會使用 App Engine 預設服務帳戶。這個服務帳戶具有「編輯者」角色,權限足以完成本教學課程。

為專案啟用 API

主控台

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

在專案中啟用 Cloud Composer API,方法是在 Terraform 指令碼中加入下列資源定義:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID> 替換為專案的專案 ID。例如:example-project

建立 Cloud Composer 環境

建立 Cloud Composer 2 環境

這個程序會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在專案中執行作業。 Google Cloud

建立 Pub/Sub 主題

這個範例會在訊息推送至 Pub/Sub 主題時觸發 DAG。建立要在本範例中使用的 Pub/Sub 主題:

主控台

  1. 前往 Google Cloud 控制台的「Pub/Sub Topics」(Pub/Sub 主題) 頁面。

    前往 Pub/Sub 主題

  2. 按一下 [Create Topic] (建立主題)

  3. 在「主題 ID」欄位中,輸入 dag-topic-trigger 做為主題的 ID。

  4. 其他選項均保留預設值。

  5. 按一下 [Create Topic] (建立主題)

gcloud

如要建立主題,請在 Google Cloud CLI 中執行 gcloud pubsub topics create 指令:

gcloud pubsub topics create dag-topic-trigger

Terraform

在 Terraform 指令碼中新增下列資源定義:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID> 替換為專案的專案 ID。例如:example-project

上傳 DAG

將 DAG 上傳至環境:

  1. 將下列 DAG 檔案儲存到本機電腦。
  2. <PROJECT_ID> 替換為專案的專案 ID。例如:example-project
  3. 上傳編輯後的 DAG 檔案至環境。
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

程式碼範例包含兩個 DAG:trigger_dagtarget_dag

trigger_dag DAG 會訂閱 Pub/Sub 主題、提取 Pub/Sub 訊息,並觸發 Pub/Sub 訊息資料 DAG ID 中指定的另一個 DAG。在本例中,trigger_dag 會觸發 target_dag DAG,將訊息輸出至工作記錄。

trigger_dag DAG 包含下列工作:

  • subscribe_task:訂閱 Pub/Sub 主題。
  • pull_messages_operator:使用 PubSubPullOperator 讀取 Pub/Sub 訊息資料。
  • trigger_target_dag:根據從 Pub/Sub 主題擷取的訊息資料,觸發另一個 DAG (本例中為 target_dag)。

target_dag DAG 只包含一項工作:output_to_logs。這項工作會延遲一秒,然後將訊息列印到工作記錄。

部署 Cloud Run 函式,在 Pub/Sub 主題上發布訊息

在本節中,您將部署 Cloud Run 函式,在 Pub/Sub 主題上發布訊息。

建立 Cloud Run 函式並指定其設定

主控台

  1. 前往 Google Cloud 控制台的「Cloud Run functions」頁面。

    前往 Cloud Run functions

  2. 按一下「建立函式」

  3. 在「Environment」(環境) 欄位中,選取「1st gen」(第 1 代)

  4. 在「Function name」(函式名稱) 欄位中,輸入函式名稱:pubsub-publisher

  5. 在「觸發條件類型」欄位中,選取「HTTP」

  6. 在「Authentication」(驗證) 部分中,選取「Allow unauthenticated invocations」(允許未經驗證的叫用)。這個選項會授予未經驗證的使用者叫用 HTTP 函式的權限。

  7. 按一下「儲存」。

  8. 點選「下一步」,前往「程式碼」步驟。

Terraform

建議您使用 Google Cloud 控制台執行這個步驟,因為無法直接透過 Terraform 管理函式的原始碼。

這個範例說明如何建立 Cloud Storage bucket、將檔案儲存在這個 bucket 中,然後使用 bucket 中的檔案做為 Cloud Run 函式的來源,從本機 ZIP 封存檔上傳 Cloud Run 函式。如果您採用這種做法,即使建立新的封存檔案,Terraform 也不會自動更新函式的原始碼。如要重新上傳函式程式碼,可以變更封存檔的檔案名稱。

  1. 下載 pubsub_publisher.pyrequirements.txt 檔案。
  2. pubsub_publisher.py 檔案中,將 <PROJECT_ID> 替換為專案的專案 ID。例如:example-project
  3. 建立名為 pubsub_function.zip 的 ZIP 封存檔,並加入 pbusub_publisner.pyrequirements.txt 檔案。
  4. 將 ZIP 封存檔儲存至 Terraform 指令碼所在的目錄。
  5. 將下列資源定義新增至 Terraform 指令碼,並將 <PROJECT_ID> 替換為專案的專案 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

指定 Cloud Run 函式程式碼參數

主控台

  1. 在「程式碼」步驟的「執行階段」欄位中,選取函式使用的語言執行階段。在本範例中,請選取「Python 3.10」

  2. 在「Entry point」(進入點) 欄位中輸入 pubsub_publisher。這是 Cloud Run 函式執行時執行的程式碼。這個旗標的值必須是來源程式碼中存在的函式名稱或完整類別名稱。

Terraform

略過這個步驟。Cloud Run 函式參數已在 google_cloudfunctions_function 資源中定義。

上傳 Cloud Run 函式程式碼

主控台

在「原始碼」欄位中,選取提供函式原始碼的適當選項。 在本教學課程中,您將使用 Cloud Run functions Inline Editor 新增函式程式碼。您也可以上傳 ZIP 檔案,或使用 Cloud Source Repositories。

  1. 將下列程式碼範例放入 main.py 檔案。
  2. <PROJECT_ID> 替換為專案的專案 ID。例如:example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

略過這個步驟。Cloud Run 函式參數已在 google_cloudfunctions_function 資源中定義。

指定 Cloud Run 函式依附元件

主控台

requirements.txt 中繼資料檔案中指定函式依附元件:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

部署函式時,Cloud Run 函式會下載並安裝 requirements.txt 檔案中宣告的依附元件,每個套件一行。這個檔案所在的目錄必須與包含函式程式碼的 main.py 檔案相同。詳情請參閱 Requirements Files pip 說明文件。

Terraform

略過這個步驟。Cloud Run 函式依附元件定義於 pubsub_function.zip 封存檔的 requirements.txt 檔案中。

部署 Cloud Run 函式

主控台

按一下「Deploy」(部署)。部署作業成功完成後,Google Cloud 控制台的「Cloud Run 函式」頁面會顯示函式,並附上綠色勾號。

請確認執行 Cloud Run 函式的服務帳戶在專案中具備足夠權限,可存取 Pub/Sub。

Terraform

  1. 初始化 Terraform:

    terraform init
    
  2. 檢查設定,確認 Terraform 即將建立或更新的資源符合預期:

    terraform plan
    
  3. 如要檢查設定是否有效,請執行下列指令:

    terraform validate
    
  4. 執行下列指令,並在提示中輸入 yes,即可套用 Terraform 設定:

    terraform apply
    

等待 Terraform 顯示「Apply complete!」訊息。

在 Google Cloud 控制台中,前往 UI 中的資源,確認 Terraform 已建立或更新這些資源。

測試 Cloud Run 函式

如要確認函式是否會將訊息發布至 Pub/Sub 主題,以及範例 DAG 是否正常運作,請按照下列步驟操作:

  1. 確認 DAG 是否處於啟用狀態:

    1. 前往 Google Cloud 控制台的「Environments」頁面。

      前往「環境」

    2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

    3. 前往「DAG」分頁。

    4. 檢查名為 trigger_dagtarget_dag 的 DAG 的「State」欄中的值。兩個 DAG 都必須處於 Active 狀態。

  2. 推送測試 Pub/Sub 訊息。您可以在 Cloud Shell 中執行這項操作:

    1. 前往 Google Cloud 控制台的「Functions」頁面。

      前往 Cloud Run functions

    2. 按一下函式名稱 pubsub-publisher

    3. 前往「測試」分頁。

    4. 在「設定觸發事件」專區中,輸入下列 JSON 鍵/值:{"message": "target_dag"}。請勿修改鍵值組,因為這則訊息稍後會觸發測試 DAG。

    5. 在「測試指令」部分,按一下「在 Cloud Shell 中測試」

    6. Cloud Shell 終端機中,等待系統自動顯示指令。按下 Enter 執行這項指令。

    7. 如果出現「Authorize Cloud Shell」(授權 Cloud Shell) 訊息,請按一下「Authorize」(授權)

    8. 確認訊息內容對應至 Pub/Sub 訊息。在本例中,輸出訊息必須以 Message b'target_dag' with message_length 10 published to 開頭,做為函式的回應。

  3. 確認是否已觸發 target_dag

    1. 等待至少一分鐘,讓 trigger_dag 的新 DAG 執行作業完成。

    2. 前往 Google Cloud 控制台的「Environments」頁面。

      前往「環境」

    3. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

    4. 前往「DAG」分頁。

    5. 按一下 trigger_dag 前往「DAG details」(DAG 詳細資料) 頁面。「Runs」(執行) 分頁會顯示 trigger_dag DAG 的 DAG 執行作業清單。

      這個 DAG 每分鐘執行一次,並處理函式傳送的所有 Pub/Sub 訊息。如果沒有傳送任何訊息,DAG 執行記錄中會將 trigger_target 工作標示為 Skipped。如果 DAG 已觸發,則 trigger_target 工作會標示為 Success

    6. 查看最近幾次 DAG 執行作業,找出所有三個工作 (subscribe_taskpull_messages_operatortrigger_target) 狀態皆為 Success 的 DAG 執行作業。

    7. 返回「DAGs」分頁,檢查 target_dag DAG 的「Successful runs」資料欄是否列出一次成功的執行作業。

摘要

在本教學課程中,您已瞭解如何使用 Cloud Run 函式在 Pub/Sub 主題上發布訊息,以及如何部署訂閱 Pub/Sub 主題的 DAG、提取 Pub/Sub 訊息,並觸發訊息資料 DAG ID 中指定的另一個 DAG。

此外,還有其他建立及管理 Pub/Sub 訂閱項目觸發 DAG 的方法,但這些方法不在本教學課程的討論範圍內。舉例來說,您可以使用 Cloud Run 函式在發生特定事件時觸發 Airflow DAG歡迎參考我們的教學課程,親自試用其他Google Cloud 功能。

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

刪除專案

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

刪除個別資源

如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。

主控台

  1. 刪除 Cloud Composer 環境。您也可以在執行這項程序時刪除環境的 bucket。
  2. 刪除 Pub/Sub 主題 dag-topic-trigger
  3. 刪除 Cloud Run 函式。

    1. 前往 Google Cloud 控制台的 Cloud Run 函式。

      前往 Cloud Run functions

    2. 按一下要刪除的函式旁的核取方塊,pubsub-publisher

    3. 按一下「刪除」,然後按照操作說明執行。

Terraform

  1. 請確認 Terraform 指令碼未包含專案仍需的資源項目。舉例來說,您可能想保留部分已啟用的 API,並繼續指派 IAM 權限 (如果您已在 Terraform 腳本中新增這類定義)。
  2. 執行 terraform destroy
  3. 手動刪除環境的值區。Cloud Composer 不會自動刪除。您可以透過 Google Cloud 控制台或 Google Cloud CLI 執行這項操作。

後續步驟