Cloud Pub/Sub を使用してパイプライン実行をトリガーする

次のコードサンプルは、Cloud Pub/Sub トリガーイベント ドリブンの Cloud Functions の関数を使用して、パイプラインの作成、デプロイ、トリガーを行います。

簡単なパイプラインを作成してコンパイルする

Kubeflow Pipelines SDK を使用して、スケジュールされたパイプラインを構築し、YAML ファイルにコンパイルします。

サンプルの hello-world-scheduled-pipeline を次に示します。

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

コンパイル済みのパイプライン YAML を Cloud Storage バケットにアップロードする

  1. Google Cloud コンソールで Cloud Storage ブラウザを開きます。

    Cloud Storage ブラウザ

  2. プロジェクトを構成したときに作成した Cloud Storage バケットをクリックします。

  3. 既存のフォルダか新しいフォルダを使用して、コンパイル済みのパイプライン YAML(この例では hello_world_scheduled_pipeline.yaml)を、選択したフォルダにアップロードします。

  4. アップロードした YAML ファイルをクリックして、詳細を表示します。後で使用できるように、gsutil URI をコピーします。

Pub/Sub トリガーを使用して Cloud Functions の関数を作成する

  1. コンソールで、[Cloud Functions] ページにアクセスします。

    [Cloud Functions] ページに移動

  2. [関数の作成] ボタンをクリックします。

  3. [基本] セクションで、関数に名前を付けます(例: my-scheduled-pipeline-function)。

  4. [トリガー] セクションで、トリガータイプとして [Cloud Pub/Sub] を選択します。

    関数の作成の構成でトリガータイプとして pubsub を選択した画像

  5. [Cloud Pub/Sub トピックを選択してください] プルダウンで、[トピックを作成] をクリックします。

  6. [トピックの作成] ボックスで、新しいトピックに名前(例: my-scheduled-pipeline-topic)を入力して、[トピックを作成] を選択します。

  7. 他のフィールドはすべてデフォルトのまま [保存] をクリックし、トリガー セクションの構成を保存します。

  8. 他のフィールドはすべてデフォルトのまま [次へ] をクリックし、[コード] セクションに進みます。

  9. [ランタイム] で Python 3.7 を選択します。

  10. [エントリポイント] に「subscribe」(サンプルコードのエントリ ポイントの関数名)と入力します。

  11. [ソースコード] で [インライン エディタ] を選択します(選択されていない場合)。

  12. main.py ファイルに、次のコードを追加します。

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    次のように置き換えます。

    • PROJECT_ID: このパイプラインが実行される Google Cloud プロジェクト。
    • REGION: このパイプラインが実行されるリージョン。
    • PIPELINE_ROOT: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定する。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。
  13. requirements.txt ファイルで、その内容を次のパッケージ要件に置き換えます。

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. [デプロイ] をクリックして Cloud Functions の関数をデプロイします。