Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
本頁面將引導您建立以事件為基礎的推送架構,方法是根據 Pub/Sub 主題的變更觸發 Cloud Composer DAG。本教學課程中的範例會示範如何處理 Pub/Sub 管理的完整週期,包括訂閱管理,這是 DAG 程序的一部分。當您需要觸發 DAG,但不想設定額外存取權時,這項功能適用於部分常見用途。
舉例來說,如果基於安全考量,您不想直接存取 Cloud Composer 環境,可以透過 Pub/Sub 傳送訊息做為解決方案。您可以設定 Cloud Run 函式,建立 Pub/Sub 訊息並發布至 Pub/Sub 主題。接著,您可以建立 DAG,擷取 Pub/Sub 訊息,然後處理這些訊息。
在本範例中,您會建立 Cloud Run 函式,並部署兩個 DAG。第一個 DAG 會提取 Pub/Sub 訊息,並根據 Pub/Sub 訊息內容觸發第二個 DAG。
本教學課程假設您熟悉 Python 和 Google Cloud 控制台。
目標
費用
本教學課程使用下列 Google Cloud的計費元件:
完成本教學課程後,您可以刪除自己建立的資源,避免系統繼續計費。詳情請參閱「清除」一節。
事前準備
本教學課程需要 Google Cloud 專案。請按以下方式設定專案:
- 在 Google Cloud 控制台中,選取或建立專案: 
- 請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。 
- 請確認 Google Cloud 專案使用者具備下列角色,可建立必要資源: - 服務帳戶使用者 (roles/iam.serviceAccountUser)
- Pub/Sub 編輯者 (roles/pubsub.editor)
- 環境與 Storage 物件管理員
(roles/composer.environmentAndStorageObjectAdmin)
- Cloud Run functions 管理員 (roles/cloudfunctions.admin)
- 記錄檢視器 (roles/logging.viewer)
 
- 服務帳戶使用者 (
- 請確認執行 Cloud Run 函式的服務帳戶在專案中擁有足夠的 Pub/Sub 存取權限。根據預設,Cloud Run 函式會使用 App Engine 預設服務帳戶。這個服務帳戶具有「編輯者」角色,權限足以完成本教學課程。 
為專案啟用 API
主控台
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
Roles required to enable APIs
          To enable APIs, you need the Service Usage Admin IAM
          role (roles/serviceusage.serviceUsageAdmin), which
          contains the serviceusage.services.enable permission. Learn how to grant
          roles.
        
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
Roles required to enable APIs
      To enable APIs, you need the Service Usage Admin IAM
      role (roles/serviceusage.serviceUsageAdmin), which contains the
      serviceusage.services.enable permission. Learn how to grant
      roles.
    
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com 
Terraform
在專案中啟用 Cloud Composer API,方法是在 Terraform 指令碼中加入下列資源定義:
resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}
將 <PROJECT_ID> 替換為專案的專案 ID。例如:example-project。
建立 Cloud Composer 環境
這個程序會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在您的 Google Cloud 專案中執行作業。
建立 Pub/Sub 主題
這個範例會在訊息推送至 Pub/Sub 主題時觸發 DAG。建立要在本範例中使用的 Pub/Sub 主題:
主控台
- 前往 Google Cloud 控制台的「Pub/Sub Topics」(Pub/Sub 主題) 頁面。 
- 按一下 [Create Topic] (建立主題)。 
- 在「主題 ID」欄位中,輸入 - dag-topic-trigger做為主題的 ID。
- 其餘選項則保留預設狀態。 
- 按一下 [Create Topic] (建立主題)。 
gcloud
如要建立主題,請在 Google Cloud CLI 中執行 gcloud pubsub topics create 指令:
gcloud pubsub topics create dag-topic-trigger
Terraform
在 Terraform 指令碼中新增下列資源定義:
resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}
將 <PROJECT_ID> 替換為專案的專案 ID。例如:example-project。
上傳 DAG
將 DAG 上傳至環境:
程式碼範例包含兩個 DAG:trigger_dag 和 target_dag。
trigger_dag DAG 會訂閱 Pub/Sub 主題、提取 Pub/Sub 訊息,並觸發 Pub/Sub 訊息資料 DAG ID 中指定的另一個 DAG。在本例中,trigger_dag 會觸發 target_dag DAG,將訊息輸出至工作記錄。
trigger_dag DAG 包含下列工作:
- subscribe_task:訂閱 Pub/Sub 主題。
- pull_messages_operator:使用- PubSubPullOperator讀取 Pub/Sub 訊息資料。
- trigger_target_dag:根據從 Pub/Sub 主題擷取的訊息資料,觸發另一個 DAG (本例中為- target_dag)。
target_dag DAG 只包含一項工作:output_to_logs。這項工作會延遲一秒,然後將訊息列印到工作記錄。
部署 Cloud Run 函式,在 Pub/Sub 主題上發布訊息
在本節中,您將部署 Cloud Run 函式,在 Pub/Sub 主題上發布訊息。
建立 Cloud Run 函式並指定其設定
主控台
- 前往 Google Cloud 控制台的「Cloud Run functions」頁面。 
- 按一下「建立函式」。 
- 在「Environment」(環境) 欄位中,選取「1st gen」(第 1 代)。 
- 在「Function name」(函式名稱) 欄位中,輸入函式名稱: - pubsub-publisher。
- 在「觸發條件類型」欄位中,選取「HTTP」。 
- 在「Authentication」(驗證) 部分中,選取「Allow unauthenticated invocations」(允許未經驗證的叫用)。這個選項會授予未經驗證的使用者叫用 HTTP 函式的權限。 
- 按一下「儲存」。 
- 點選「下一步」,前往「程式碼」步驟。 
Terraform
建議您在這個步驟中使用 Google Cloud 控制台,因為無法直接透過 Terraform 管理函式的原始碼。
這個範例說明如何建立 Cloud Storage bucket、將檔案儲存在這個 bucket 中,然後使用 bucket 中的檔案做為 Cloud Run 函式的來源,從本機 ZIP 封存檔上傳 Cloud Run 函式。如果您採用這種做法,即使建立新的封存檔案,Terraform 也不會自動更新函式的原始碼。如要重新上傳函式程式碼,可以變更封存檔的檔案名稱。
- 下載 pubsub_publisher.py和requirements.txt檔案。
- 在 pubsub_publisher.py檔案中,將<PROJECT_ID>替換為專案的專案 ID。例如:example-project。
- 建立名為 pubsub_function.zip的 ZIP 封存檔,並加入pbusub_publisner.py和requirements.txt檔案。
- 將 ZIP 封存檔儲存至 Terraform 指令碼所在的目錄。
- 將下列資源定義新增至 Terraform 指令碼,並將 <PROJECT_ID>替換為專案的專案 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"
  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}
指定 Cloud Run 函式程式碼參數
主控台
- 在「程式碼」步驟的「執行階段」欄位中,選取函式使用的語言執行階段。在本範例中,請選取「Python 3.10」。 
- 在「Entry point」(進入點) 欄位中輸入 - pubsub_publisher。這是 Cloud Run 函式執行時執行的程式碼。這個旗標的值必須是來源程式碼中存在的函式名稱或完整類別名稱。
Terraform
略過這個步驟。Cloud Run 函式參數已在 google_cloudfunctions_function 資源中定義。
上傳 Cloud Run 函式程式碼
主控台
在「原始碼」欄位中,選取提供函式原始碼的適當選項。 在本教學課程中,您將使用 Cloud Run functions Inline Editor 新增函式程式碼。您也可以上傳 ZIP 檔案,或使用 Cloud Source Repositories。
- 將下列程式碼範例放入 main.py 檔案。
- 將 <PROJECT_ID>替換為專案的專案 ID。例如:example-project。
Terraform
略過這個步驟。Cloud Run 函式參數已在 google_cloudfunctions_function 資源中定義。
指定 Cloud Run 函式依附元件
主控台
在 requirements.txt 中繼資料檔案中指定函式依附元件:
部署函式時,Cloud Run 函式會下載並安裝 requirements.txt 檔案中宣告的依附元件,每個套件一行。這個檔案所在的目錄必須與包含函式程式碼的 main.py 檔案相同。詳情請參閱 Requirements Files pip 說明文件。
Terraform
略過這個步驟。Cloud Run 函式依附元件定義於 pubsub_function.zip 封存檔的 requirements.txt 檔案中。
部署 Cloud Run 函式
主控台
按一下「Deploy」(部署)。部署作業成功完成後,Google Cloud 控制台的「Cloud Run 函式」頁面會顯示函式,並附上綠色勾號。
請確認執行 Cloud Run 函式的服務帳戶在專案中具備足夠權限,可存取 Pub/Sub。
Terraform
- 初始化 Terraform: - terraform init
- 檢查設定,確認 Terraform 即將建立或更新的資源符合預期: - terraform plan
- 如要檢查設定是否有效,請執行下列指令: - terraform validate
- 執行下列指令,並在提示中輸入 yes,即可套用 Terraform 設定: - terraform apply
等待 Terraform 顯示「Apply complete!」訊息。
在 Google Cloud 控制台中,前往 UI 中的資源,確認 Terraform 已建立或更新這些資源。
測試 Cloud Run 函式
如要確認函式是否在 Pub/Sub 主題上發布訊息,以及範例 DAG 是否正常運作,請按照下列步驟操作:
- 確認 DAG 是否處於啟用狀態: - 前往 Google Cloud 控制台的「Environments」頁面。 
- 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。 
- 前往「DAG」分頁。 
- 檢查名為 - trigger_dag和- target_dag的 DAG 的「State」欄中的值。兩個 DAG 都必須處於- Active狀態。
 
- 推送測試 Pub/Sub 訊息。您可以在 Cloud Shell 中執行這項操作: - 前往 Google Cloud 控制台的「Functions」頁面。 
- 按一下函式名稱 - pubsub-publisher。
- 前往「測試」分頁。 
- 在「設定觸發事件」專區中,輸入下列 JSON 鍵/值: - {"message": "target_dag"}。請勿修改鍵值組,因為這則訊息稍後會觸發測試 DAG。
- 在「測試指令」部分,按一下「在 Cloud Shell 中測試」。 
- 在 Cloud Shell 終端機中,等待系統自動顯示指令。按下 - Enter執行這項指令。
- 如果出現「Authorize Cloud Shell」(授權 Cloud Shell) 訊息,請按一下「Authorize」(授權)。 
- 確認訊息內容對應至 Pub/Sub 訊息。在本例中,輸出訊息必須以 - Message b'target_dag' with message_length 10 published to開頭,做為函式的回應。
 
- 確認是否已觸發 - target_dag:- 等待至少一分鐘,讓 - trigger_dag的新 DAG 執行作業完成。
- 前往 Google Cloud 控制台的「Environments」頁面。 
- 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。 
- 前往「DAG」分頁。 
- 按一下 - trigger_dag前往「DAG 詳細資料」頁面。「Runs」(執行) 分頁會顯示- trigger_dagDAG 的 DAG 執行作業清單。- 這個 DAG 每分鐘都會執行一次,並處理函式傳送的所有 Pub/Sub 訊息。如果沒有傳送任何訊息,DAG 執行記錄中會將 - trigger_target工作標示為- Skipped。如果 DAG 已觸發,則- trigger_target工作會標示為- Success。
- 查看最近幾次 DAG 執行作業,找出所有三項工作 ( - subscribe_task、- pull_messages_operator和- trigger_target) 狀態皆為- Success的 DAG 執行作業。
- 返回「DAGs」分頁,檢查 - target_dagDAG 的「Successful runs」資料欄是否列出一次成功的執行作業。
 
摘要
在本教學課程中,您已瞭解如何使用 Cloud Run 函式在 Pub/Sub 主題上發布訊息,以及如何部署訂閱 Pub/Sub 主題的 DAG、提取 Pub/Sub 訊息,並觸發訊息資料 DAG ID 中指定的另一個 DAG。
此外,還有其他建立及管理 Pub/Sub 訂閱項目和觸發 DAG 的方法,但這些方法不在本教學課程的討論範圍內。舉例來說,您可以使用 Cloud Run 函式在發生特定事件時觸發 Airflow DAG。歡迎參考我們的教學課程,親自試用其他Google Cloud 功能。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
刪除專案
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
刪除個別資源
如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。
主控台
- 刪除 Cloud Composer 環境。您也可以在這項程序中刪除環境的 bucket。
- 刪除 Pub/Sub 主題:dag-topic-trigger。
- 刪除 Cloud Run 函式。 - 前往 Google Cloud 控制台的 Cloud Run 函式。 
- 按一下要刪除的函式旁的核取方塊, - pubsub-publisher。
- 按一下「刪除」,然後按照操作說明執行。 
 
Terraform
- 請確認 Terraform 指令碼未包含專案仍需的資源項目。舉例來說,您可能想保留部分已啟用的 API,並繼續指派 IAM 權限 (如果您已在 Terraform 腳本中新增這類定義)。
- 執行 terraform destroy。
- 手動刪除環境的值區。Cloud Composer 不會自動刪除。您可以透過 Google Cloud 控制台或 Google Cloud CLI 執行這項操作。
後續步驟
- 測試 DAG
- 測試 HTTP 函式
- 部署 Cloud Run 函式
- 歡迎自行試用其他 Google Cloud 功能,請參考我們的教學課程。