次のコードサンプルは、Cloud Pub/Sub トリガーとイベント ドリブンの Cloud Functions の関数を使用して、パイプラインの作成、デプロイ、トリガーを行います。
簡単なパイプラインを作成してコンパイルする
Kubeflow Pipelines SDK を使用して、スケジュールされたパイプラインを構築し、YAML ファイルにコンパイルします。
サンプルの hello-world-scheduled-pipeline
を次に示します。
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
コンパイル済みのパイプライン YAML を Cloud Storage バケットにアップロードする
Google Cloud コンソールで Cloud Storage ブラウザを開きます。
プロジェクトを構成したときに作成した Cloud Storage バケットをクリックします。
既存のフォルダか新しいフォルダを使用して、コンパイル済みのパイプライン YAML(この例では
hello_world_scheduled_pipeline.yaml
)を、選択したフォルダにアップロードします。アップロードした YAML ファイルをクリックして、詳細を表示します。後で使用できるように、gsutil URI をコピーします。
Pub/Sub トリガーを使用して Cloud Functions の関数を作成する
コンソールで Cloud Run 関数ページにアクセスします。
[関数の作成] ボタンをクリックします。
[基本] セクションで、関数に名前を付けます(例:
my-scheduled-pipeline-function
)。[トリガー] セクションで、トリガータイプとして [Cloud Pub/Sub] を選択します。
[Cloud Pub/Sub トピックを選択してください] プルダウンで、[トピックを作成] をクリックします。
[トピックの作成] ボックスで、新しいトピックに名前(例:
my-scheduled-pipeline-topic
)を入力して、[トピックを作成] を選択します。他のフィールドはすべてデフォルトのまま [保存] をクリックし、トリガー セクションの構成を保存します。
他のフィールドはすべてデフォルトのまま [次へ] をクリックし、[コード] セクションに進みます。
[ランタイム] で Python 3.7 を選択します。
[エントリポイント] に「subscribe」(サンプルコードのエントリ ポイントの関数名)と入力します。
[ソースコード] で [インライン エディタ] を選択します(選択されていない場合)。
main.py
ファイルに、次のコードを追加します。import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
次のように置き換えます。
- PROJECT_ID: このパイプラインが実行される Google Cloud プロジェクト。
- REGION: このパイプラインが実行されるリージョン。
- PIPELINE_ROOT: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定する。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。
requirements.txt
ファイルで、その内容を次のパッケージ要件に置き換えます。google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
[デプロイ] をクリックして Cloud Functions の関数をデプロイします。