透過 Pub/Sub 觸發管道執行作業

本頁面說明如何使用Cloud Pub/Sub 觸發程序,透過事件驅動型 Cloud Function 撰寫、部署及觸發管道執行作業。步驟如下:

  1. 使用 Kubeflow Pipelines (KFP) SDK 定義機器學習管道,並編譯成 YAML 檔案。

  2. 將編譯的管道定義上傳至 Cloud Storage 值區。

  3. 使用 Cloud Run 函式建立、設定及部署函式,並由新的或現有的 Pub/Sub 主題觸發。

定義及編譯管道

使用 Kubeflow Pipelines SDK 建構排程管道,並編譯成 YAML 檔案。

範例 hello-world-scheduled-pipeline

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

將已編譯的 pipeline YAML 上傳至 Cloud Storage 值區

  1. 在 Google Cloud 控制台中開啟 Cloud Storage 瀏覽器。

    Cloud Storage 瀏覽器

  2. 按一下您在設定專案時建立的 Cloud Storage bucket。

  3. 使用現有或新的資料夾,將編譯後的管道 YAML (在本範例中為 hello_world_scheduled_pipeline.yaml) 上傳至所選資料夾。

  4. 按一下上傳的 YAML 檔案,即可存取詳細資料。複製 gsutil URI,以供稍後使用。

建立具有 Pub/Sub 觸發條件的 Cloud Run 函式

  1. 前往控制台的「Cloud Run functions」頁面。

    前往 Cloud Run 函式頁面

  2. 按一下 [Create function] (建立函式) 按鈕。

  3. 在「Basics」(基本) 專區中,為函式命名 (例如 my-scheduled-pipeline-function)。

  4. 在「Trigger」(觸發條件) 部分中,選取「Cloud Pub/Sub」做為觸發條件類型。

    建立函式設定,選擇 Pub/Sub 做為觸發條件類型圖片

  5. 在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 清單中,按一下「Create a topic」(建立主題)

  6. 在「建立主題」方塊中,為新主題命名 (例如 my-scheduled-pipeline-topic),然後選取「建立主題」

  7. 其餘欄位保留預設值,然後點選「儲存」,儲存「觸發條件」部分的設定。

  8. 其他欄位則一概保留預設值,然後按一下「下一步」,前往「程式碼」部分。

  9. 選取「Runtime」(執行階段) 底下的 [Python 3.7]

  10. 在「Entry point」(進入點) 中輸入「subscribe」(範例程式碼進入點函式名稱)。

  11. 在「Source code」(原始碼) 下,選取「Inline Editor」(內嵌編輯器) (如果尚未選取)。

  12. main.py 檔案中,新增下列程式碼:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    更改下列內容:

    • PROJECT_ID:這個管道執行的 Google Cloud 專案。
    • REGION:這個管道執行的區域。
    • PIPELINE_ROOT:指定管道服務帳戶可存取的 Cloud Storage URI。管道執行的構件會儲存在管道根目錄中。
  13. requirements.txt 檔案中,將內容替換為下列套件 需求:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. 按一下「deploy」(部署) 即可部署函式。

後續步驟