Cloud Scheduler を使用し、HTTP トリガーでイベント ドリブンの Cloud Functions の関数を使用することで、プリコンパイル済みパイプラインの実行をスケジュールできます。
簡単なパイプラインを構築してコンパイルする
Kubeflow Pipelines SDK を使用して、スケジュールされたパイプラインを構築し、JSON ファイルにコンパイルします。
サンプルの hello-world-scheduled-pipeline
を次に示します。
import json
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import component
# A simple component that prints and returns a greeting string
@component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a JSON file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.json')
コンパイル済みのパイプライン JSON を Cloud Storage バケットにアップロードする
Google Cloud Console で、Cloud Storage ブラウザを開きます。
プロジェクトを構成したときに作成した Cloud Storage バケットをクリックします。
既存のフォルダか新しいフォルダを使用して、コンパイル済みのパイプライン JSON(この例では
hello_world_scheduled_pipeline.json
)を、選択したフォルダにアップロードします。アップロードした JSON ファイルをクリックして、詳細を表示します。後で使用できるように、gsutil URI をコピーします。
HTTP トリガーを使用して Cloud Functions の関数を作成する
コンソールで、[Cloud Functions] ページにアクセスします。
[関数の作成] ボタンをクリックします。
[基本] セクションで、関数に名前を付けます(例:
hello-world-scheduled-pipeline-function
)。[トリガー] セクションで、トリガータイプとして [HTTP] を選択します。
このトリガー用に生成された URL をメモし、次のセクションで使用できるように保存します。
他のフィールドはすべてデフォルトのまま [保存] をクリックし、トリガー セクションの構成を保存します。
他のフィールドはすべてデフォルトのまま [次へ] をクリックし、[コード] セクションに進みます。
[ランタイム] で Python 3.7 を選択します。
[エントリ ポイント] に、「process_request」と入力します(サンプルコードのエントリ ポイント関数名)。
[ソースコード] で [インライン エディタ] を選択します(選択されていない場合)。
main.py
ファイルに、次のコードを追加します。import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def process_request(request): """Processes the incoming HTTP request. Args: request (flask.Request): HTTP request object. Returns: The response text or any set of values that can be turned into a Response object using `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`. """ # decode http request payload and translate into JSON object request_str = request.data.decode('utf-8') request_json = json.loads(request_str) pipeline_spec_uri = request_json['pipeline_spec_uri'] parameter_values = request_json['parameter_values'] aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name=f'hello-world-cloud-function-pipeline', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) job.submit() return "Job submitted"
次のように置き換えます。
- PROJECT_ID: このパイプラインが実行される Google Cloud プロジェクト。
- REGION: このパイプラインが実行されるリージョン。
- PIPELINE_ROOT: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定する。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。
requirements.txt
ファイルで、その内容を次のパッケージ要件に置き換えます。google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
[デプロイ] をクリックして Cloud Functions の関数をデプロイします。
Cloud Scheduler ジョブを作成する
コンソールで、[Cloud Scheduler] ページにアクセスします。
[ジョブを作成] ボタンをクリックします。
ジョブに名前(
my-hello-world-schedule
など)を付け、必要に応じて説明を追加します。ジョブの [頻度] を、unix-cron 形式で指定します。次に例を示します。
0 9 * * 1
この cron スケジュールの例では、毎週月曜日の午前 9 時に実行されます。詳細については、cron ジョブ スケジュールの構成をご覧ください。
タイムゾーンを選択します。
[続行] をクリックして実行内容を構成します。
[ターゲット タイプ] メニューから [HTTP] を選択します。
[URL] フィールドに、前のセクションで保存したトリガー URL を入力します。
HTTP メソッドとして [POST] を選択します(選択されていない場合)。
[本文] ボックスに、コードで定義された形式で JSON 文字列を入力します。前のセクションの例を使用すると、次のようになります。
{ "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } }
[Auth ヘッダー] のプルダウンで、[OIDC トークンを追加] を選択します。
[サービス アカウント] プルダウンで、Cloud Functions の関数を呼び出す権限があるサービス アカウントを選択します。このチュートリアルでは、Compute Engine のデフォルトのサービス アカウントを使用できます。
他のフィールドはすべてデフォルトのまま [作成] をクリックします。
ジョブを手動で実行する(省略可)
必要に応じて、作成したジョブを実行して機能を検証できます。
[Cloud Scheduler] コンソール ページを開きます。
ジョブ名の横にある [今すぐ実行] ボタンをクリックします。
プロジェクトに作成された最初のジョブを初めて呼び出すときは、必要な構成が行われるため、実行されるまで数分かかることがあります。
ジョブのステータスは [結果] 列に表示されます。