Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面將引導您建立以事件為基礎的推送架構,方法是根據 Pub/Sub 主題的變更觸發 Cloud Composer DAG。本教學課程中的範例會示範如何處理 Pub/Sub 管理的完整週期,包括訂閱管理,這是 DAG 程序的一部分。當您需要觸發 DAG,但不想設定額外存取權時,這項功能適用於部分常見用途。
舉例來說,如果基於安全考量,您不想直接存取 Cloud Composer 環境,可以透過 Pub/Sub 傳送訊息做為解決方案。您可以設定 Cloud Run 函式,建立 Pub/Sub 訊息並發布至 Pub/Sub 主題。接著,您可以建立 DAG,從 Pub/Sub 提取訊息,然後處理這些訊息。
在這個特定範例中,您會建立 Cloud Run 函式並部署兩個 DAG。第一個 DAG 會提取 Pub/Sub 訊息,並根據 Pub/Sub 訊息內容觸發第二個 DAG。
本教學課程假設您熟悉 Python 和 Google Cloud 控制台。
目標
費用
本教學課程使用下列 Google Cloud的計費元件:
完成本教學課程後,您可以刪除自己建立的資源,避免系統繼續計費。詳情請參閱「清除」一節。
事前準備
本教學課程需要 Google Cloud 專案。請按以下方式設定專案:
在 Google Cloud 控制台中,選取或建立專案:
請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
請確認 Google Cloud 專案使用者具有下列角色,可建立必要資源:
- 服務帳戶使用者 (
roles/iam.serviceAccountUser
) - Pub/Sub 編輯者 (
roles/pubsub.editor
) - 環境與 Storage 物件管理員
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run functions 管理員 (
roles/cloudfunctions.admin
) - 記錄檢視器 (
roles/logging.viewer
)
- 服務帳戶使用者 (
請確認執行 Cloud Run 函式的服務帳戶在專案中擁有足夠的 Pub/Sub 存取權限。根據預設,Cloud Run 函式會使用 App Engine 預設服務帳戶。這個服務帳戶具有「編輯者」角色,權限足以完成本教學課程。
為專案啟用 API
主控台
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
在專案中啟用 Cloud Composer API,方法是在 Terraform 指令碼中加入下列資源定義:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
將 <PROJECT_ID>
替換為專案的專案 ID。例如:example-project
。
建立 Cloud Composer 環境
這個程序會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext
) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在專案中執行作業。 Google Cloud
建立 Pub/Sub 主題
這個範例會在訊息推送至 Pub/Sub 主題時觸發 DAG。建立要在本範例中使用的 Pub/Sub 主題:
主控台
前往 Google Cloud 控制台的「Pub/Sub Topics」(Pub/Sub 主題) 頁面。
按一下 [Create Topic] (建立主題)。
在「主題 ID」欄位中,輸入
dag-topic-trigger
做為主題的 ID。其他選項均保留預設值。
按一下 [Create Topic] (建立主題)。
gcloud
如要建立主題,請在 Google Cloud CLI 中執行 gcloud pubsub topics create 指令:
gcloud pubsub topics create dag-topic-trigger
Terraform
在 Terraform 指令碼中新增下列資源定義:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
將 <PROJECT_ID>
替換為專案的專案 ID。例如:example-project
。
上傳 DAG
將 DAG 上傳至環境:
程式碼範例包含兩個 DAG:trigger_dag
和 target_dag
。
trigger_dag
DAG 會訂閱 Pub/Sub 主題、提取 Pub/Sub 訊息,並觸發 Pub/Sub 訊息資料 DAG ID 中指定的另一個 DAG。在本例中,trigger_dag
會觸發 target_dag
DAG,將訊息輸出至工作記錄。
trigger_dag
DAG 包含下列工作:
subscribe_task
:訂閱 Pub/Sub 主題。pull_messages_operator
:使用PubSubPullOperator
讀取 Pub/Sub 訊息資料。trigger_target_dag
:根據從 Pub/Sub 主題擷取的訊息資料,觸發另一個 DAG (本例中為target_dag
)。
target_dag
DAG 只包含一項工作:output_to_logs
。這項工作會延遲一秒,然後將訊息列印到工作記錄。
部署 Cloud Run 函式,在 Pub/Sub 主題上發布訊息
在本節中,您將部署 Cloud Run 函式,在 Pub/Sub 主題上發布訊息。
建立 Cloud Run 函式並指定其設定
主控台
前往 Google Cloud 控制台的「Cloud Run functions」頁面。
按一下「建立函式」。
在「Environment」(環境) 欄位中,選取「1st gen」(第 1 代)。
在「Function name」(函式名稱) 欄位中,輸入函式名稱:
pubsub-publisher
。在「觸發條件類型」欄位中,選取「HTTP」。
在「Authentication」(驗證) 部分中,選取「Allow unauthenticated invocations」(允許未經驗證的叫用)。這個選項會授予未經驗證的使用者叫用 HTTP 函式的權限。
按一下「儲存」。
點選「下一步」,前往「程式碼」步驟。
Terraform
建議您使用 Google Cloud 控制台執行這個步驟,因為無法直接透過 Terraform 管理函式的原始碼。
這個範例說明如何建立 Cloud Storage bucket、將檔案儲存在這個 bucket 中,然後使用 bucket 中的檔案做為 Cloud Run 函式的來源,從本機 ZIP 封存檔上傳 Cloud Run 函式。如果您採用這種做法,即使建立新的封存檔案,Terraform 也不會自動更新函式的原始碼。如要重新上傳函式程式碼,可以變更封存檔的檔案名稱。
- 下載
pubsub_publisher.py
和requirements.txt
檔案。 - 在
pubsub_publisher.py
檔案中,將<PROJECT_ID>
替換為專案的專案 ID。例如:example-project
。 - 建立名為
pubsub_function.zip
的 ZIP 封存檔,並加入pbusub_publisner.py
和requirements.txt
檔案。 - 將 ZIP 封存檔儲存至 Terraform 指令碼所在的目錄。
- 將下列資源定義新增至 Terraform 指令碼,並將
<PROJECT_ID>
替換為專案的專案 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
指定 Cloud Run 函式程式碼參數
主控台
在「程式碼」步驟的「執行階段」欄位中,選取函式使用的語言執行階段。在本範例中,請選取「Python 3.10」。
在「Entry point」(進入點) 欄位中輸入
pubsub_publisher
。這是 Cloud Run 函式執行時執行的程式碼。這個旗標的值必須是來源程式碼中存在的函式名稱或完整類別名稱。
Terraform
略過這個步驟。Cloud Run 函式參數已在 google_cloudfunctions_function
資源中定義。
上傳 Cloud Run 函式程式碼
主控台
在「原始碼」欄位中,選取提供函式原始碼的適當選項。 在本教學課程中,您將使用 Cloud Run functions Inline Editor 新增函式程式碼。您也可以上傳 ZIP 檔案,或使用 Cloud Source Repositories。
- 將下列程式碼範例放入 main.py 檔案。
- 將
<PROJECT_ID>
替換為專案的專案 ID。例如:example-project
。
Terraform
略過這個步驟。Cloud Run 函式參數已在 google_cloudfunctions_function
資源中定義。
指定 Cloud Run 函式依附元件
主控台
在 requirements.txt 中繼資料檔案中指定函式依附元件:
部署函式時,Cloud Run 函式會下載並安裝 requirements.txt 檔案中宣告的依附元件,每個套件一行。這個檔案所在的目錄必須與包含函式程式碼的 main.py 檔案相同。詳情請參閱 Requirements Files pip
說明文件。
Terraform
略過這個步驟。Cloud Run 函式依附元件定義於 pubsub_function.zip
封存檔的 requirements.txt
檔案中。
部署 Cloud Run 函式
主控台
按一下「Deploy」(部署)。部署作業成功完成後,Google Cloud 控制台的「Cloud Run 函式」頁面會顯示函式,並附上綠色勾號。
請確認執行 Cloud Run 函式的服務帳戶在專案中具備足夠權限,可存取 Pub/Sub。
Terraform
初始化 Terraform:
terraform init
檢查設定,確認 Terraform 即將建立或更新的資源符合預期:
terraform plan
如要檢查設定是否有效,請執行下列指令:
terraform validate
執行下列指令,並在提示中輸入 yes,即可套用 Terraform 設定:
terraform apply
等待 Terraform 顯示「Apply complete!」訊息。
在 Google Cloud 控制台中,前往 UI 中的資源,確認 Terraform 已建立或更新這些資源。
測試 Cloud Run 函式
如要確認函式是否會將訊息發布至 Pub/Sub 主題,以及範例 DAG 是否正常運作,請按照下列步驟操作:
確認 DAG 是否處於啟用狀態:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「DAG」分頁。
檢查名為
trigger_dag
和target_dag
的 DAG 的「State」欄中的值。兩個 DAG 都必須處於Active
狀態。
推送測試 Pub/Sub 訊息。您可以在 Cloud Shell 中執行這項操作:
前往 Google Cloud 控制台的「Functions」頁面。
按一下函式名稱
pubsub-publisher
。前往「測試」分頁。
在「設定觸發事件」專區中,輸入下列 JSON 鍵/值:
{"message": "target_dag"}
。請勿修改鍵值組,因為這則訊息稍後會觸發測試 DAG。在「測試指令」部分,按一下「在 Cloud Shell 中測試」。
在 Cloud Shell 終端機中,等待系統自動顯示指令。按下
Enter
執行這項指令。如果出現「Authorize Cloud Shell」(授權 Cloud Shell) 訊息,請按一下「Authorize」(授權)。
確認訊息內容對應至 Pub/Sub 訊息。在本例中,輸出訊息必須以
Message b'target_dag' with message_length 10 published to
開頭,做為函式的回應。
確認是否已觸發
target_dag
:等待至少一分鐘,讓
trigger_dag
的新 DAG 執行作業完成。前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「DAG」分頁。
按一下
trigger_dag
前往「DAG details」(DAG 詳細資料) 頁面。「Runs」(執行) 分頁會顯示trigger_dag
DAG 的 DAG 執行作業清單。這個 DAG 每分鐘執行一次,並處理函式傳送的所有 Pub/Sub 訊息。如果沒有傳送任何訊息,DAG 執行記錄中會將
trigger_target
工作標示為Skipped
。如果 DAG 已觸發,則trigger_target
工作會標示為Success
。查看最近幾次 DAG 執行作業,找出所有三個工作 (
subscribe_task
、pull_messages_operator
和trigger_target
) 狀態皆為Success
的 DAG 執行作業。返回「DAGs」分頁,檢查
target_dag
DAG 的「Successful runs」資料欄是否列出一次成功的執行作業。
摘要
在本教學課程中,您已瞭解如何使用 Cloud Run 函式在 Pub/Sub 主題上發布訊息,以及如何部署訂閱 Pub/Sub 主題的 DAG、提取 Pub/Sub 訊息,並觸發訊息資料 DAG ID 中指定的另一個 DAG。
此外,還有其他建立及管理 Pub/Sub 訂閱項目和觸發 DAG 的方法,但這些方法不在本教學課程的討論範圍內。舉例來說,您可以使用 Cloud Run 函式在發生特定事件時觸發 Airflow DAG。歡迎參考我們的教學課程,親自試用其他Google Cloud 功能。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
刪除專案
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
刪除個別資源
如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。
主控台
- 刪除 Cloud Composer 環境。您也可以在執行這項程序時刪除環境的 bucket。
- 刪除 Pub/Sub 主題
dag-topic-trigger
。 刪除 Cloud Run 函式。
前往 Google Cloud 控制台的 Cloud Run 函式。
按一下要刪除的函式旁的核取方塊,
pubsub-publisher
。按一下「刪除」,然後按照操作說明執行。
Terraform
- 請確認 Terraform 指令碼未包含專案仍需的資源項目。舉例來說,您可能想保留部分已啟用的 API,並繼續指派 IAM 權限 (如果您已在 Terraform 腳本中新增這類定義)。
- 執行
terraform destroy
。 - 手動刪除環境的值區。Cloud Composer 不會自動刪除。您可以透過 Google Cloud 控制台或 Google Cloud CLI 執行這項操作。
後續步驟
- 測試 DAG
- 測試 HTTP 函式
- 部署 Cloud Run 函式
- 歡迎自行試用其他 Google Cloud 功能,請參考我們的教學課程。