Cloud Scheduler でパイプライン実行をスケジュールする

Cloud Scheduler を使用し、HTTP トリガーでイベント ドリブンの Cloud Functions の関数を使用することで、プリコンパイル済みパイプラインの実行をスケジュールできます。

簡単なパイプラインを構築してコンパイルする

Kubeflow Pipelines SDK を使用して、スケジュールされたパイプラインを構築し、JSON ファイルにコンパイルします。

サンプルの hello-world-scheduled-pipeline を次に示します。

import json
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import component

# A simple component that prints and returns a greeting string
@component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a JSON file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.json')

コンパイル済みのパイプライン JSON を Cloud Storage バケットにアップロードする

  1. Google Cloud Console で、Cloud Storage ブラウザを開きます。

    Cloud Storage ブラウザ

  2. プロジェクトを構成したときに作成した Cloud Storage バケットをクリックします。

  3. 既存のフォルダか新しいフォルダを使用して、コンパイル済みのパイプライン JSON(この例では hello_world_scheduled_pipeline.json)を、選択したフォルダにアップロードします。

  4. アップロードした JSON ファイルをクリックして、詳細を表示します。後で使用できるように、gsutil URI をコピーします。

HTTP トリガーを使用して Cloud Functions の関数を作成する

  1. コンソールで、[Cloud Functions] ページにアクセスします。

    [Cloud Functions] ページに移動

  2. [関数の作成] ボタンをクリックします。

  3. [基本] セクションで、関数に名前を付けます(例: hello-world-scheduled-pipeline-function)。

  4. [トリガー] セクションで、トリガータイプとして [HTTP] を選択します。

    関数の作成の構成でトリガータイプとして HTTP を選択した画像

  5. このトリガー用に生成された URL をメモし、次のセクションで使用できるように保存します。

  6. 他のフィールドはすべてデフォルトのまま [保存] をクリックし、トリガー セクションの構成を保存します。

  7. 他のフィールドはすべてデフォルトのまま [次へ] をクリックし、[コード] セクションに進みます。

  8. [ランタイム] で Python 3.7 を選択します。

  9. [エントリ ポイント] に、「process_request」と入力します(サンプルコードのエントリ ポイント関数名)。

  10. [ソースコード] で [インライン エディタ] を選択します(選択されていない場合)。

  11. main.py ファイルに、次のコードを追加します。

    import json
    from google.cloud import aiplatform
    
    PROJECT_ID = 'your-project-id'         # <---CHANGE THIS
    REGION = 'your-region'                 # <---CHANGE THIS
    PIPELINE_ROOT = 'your-cloud-storage-pipeline-root'   # <---CHANGE THIS
    
    def process_request(request):
       """Processes the incoming HTTP request.
    
       Args:
         request (flask.Request): HTTP request object.
    
       Returns:
         The response text or any set of values that can be turned into a Response
         object using `make_response
         <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
       """
    
       # decode http request payload and translate into JSON object
       request_str = request.data.decode('utf-8')
       request_json = json.loads(request_str)
    
       pipeline_spec_uri = request_json['pipeline_spec_uri']
       parameter_values = request_json['parameter_values']
    
       aiplatform.init(
           project=PROJECT_ID,
           location=REGION,
       )
    
       job = aiplatform.PipelineJob(
           display_name=f'hello-world-cloud-function-pipeline',
           template_path=pipeline_spec_uri,
           pipeline_root=PIPELINE_ROOT,
           enable_caching=False,
           parameter_values=parameter_values
       )
    
       job.submit()
       return "Job submitted"
    

    次のように置き換えます。

    • PROJECT_ID: このパイプラインが実行される Google Cloud プロジェクト。
    • REGION: このパイプラインが実行されるリージョン。
    • PIPELINE_ROOT: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定する。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。
  12. requirements.txt ファイルで、その内容を次のパッケージ要件に置き換えます。

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  13. [デプロイ] をクリックして Cloud Functions の関数をデプロイします。

Cloud Scheduler ジョブを作成する

  1. コンソールで、[Cloud Scheduler] ページにアクセスします。

    [Cloud Scheduler] ページに移動

  2. [ジョブを作成] ボタンをクリックします。

  3. ジョブに名前(my-hello-world-schedule など)を付け、必要に応じて説明を追加します。

  4. ジョブの [頻度] を、unix-cron 形式で指定します。次に例を示します。

    0 9 * * 1

    この cron スケジュールの例では、毎週月曜日の午前 9 時に実行されます。詳細については、cron ジョブ スケジュールの構成をご覧ください。

  5. タイムゾーンを選択します。

  6. [続行] をクリックして実行内容を構成します。

  7. [ターゲット タイプ] メニューから [HTTP] を選択します。

  8. [URL] フィールドに、前のセクションで保存したトリガー URL を入力します。

  9. HTTP メソッドとして [POST] を選択します(選択されていない場合)。

  10. [本文] ボックスに、コードで定義された形式で JSON 文字列を入力します。前のセクションの例を使用すると、次のようになります。

     {
       "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
       "parameter_values": {
         "greet_name": "<any-greet-string>"
       }
     }
    
    

    ジョブの作成の実行内容を構成するセクションにあるボックスの画像

  11. [Auth ヘッダー] のプルダウンで、[OIDC トークンを追加] を選択します。

  12. [サービス アカウント] プルダウンで、Cloud Functions の関数を呼び出す権限があるサービス アカウントを選択します。このチュートリアルでは、Compute Engine のデフォルトのサービス アカウントを使用できます。

  13. 他のフィールドはすべてデフォルトのまま [作成] をクリックします。

ジョブを手動で実行する(省略可)

必要に応じて、作成したジョブを実行して機能を検証できます。

  1. [Cloud Scheduler] コンソール ページを開きます。

  2. ジョブ名の横にある [今すぐ実行] ボタンをクリックします。

    プロジェクトに作成された最初のジョブを初めて呼び出すときは、必要な構成が行われるため、実行されるまで数分かかることがあります。

  3. ジョブのステータスは [結果] 列に表示されます。