パイプラインのビルド

Vertex AI Pipelines を使用すると、サーバーレスで ML ワークフローをオーケストレートできます。Vertex AI Pipelines で ML ワークフローをオーケストレートする前に、ワークフローをパイプラインとして記述する必要があります。ML パイプラインは、コンテナと Google Cloud サービスに基づく、移植可能でスケーラブルな ML ワークフローです。

このガイドでは、ML パイプラインの構築を開始する方法について説明します。

使用するパイプライン SDK の選び方

Vertex AI Pipelines では、次のいずれかの SDK を使用して構築されたパイプラインが実行されます。

  • Kubeflow Pipelines SDK v1.8 以降(v2 を推奨)

  • TensorFlow Extended v0.30.0 以降

テラバイト単位の構造化データまたはテキストデータを処理する ML ワークフローで TensorFlow を使用する場合は、TFX を使用してパイプラインを構築することをおすすめします。

その他のユースケースでは、Kubeflow Pipelines SDK を使用してパイプラインを構築することをおすすめします。Kubeflow Pipelines SDK でパイプラインを構築すると、カスタム コンポーネントを構築するか、Google Cloud パイプライン コンポーネントなどのビルド済みコンポーネントを再利用して、ワークフローを実装できます。Google Cloud のパイプライン コンポーネントを使用すると、パイプラインで AutoML などの Vertex AI サービスを簡単に使用できます。

このガイドでは、Kubeflow Pipelines SDK を使用してパイプラインを構築する方法について説明します。

始める前に

パイプラインの構築と実行を行う前に、次の手順で Google Cloud プロジェクトと開発環境を設定します。

  1. Google Cloud プロジェクトで ML パイプラインを実行できるようにするために、ガイドの Google Cloud プロジェクトの構成手順を行います。

  2. Kubeflow Pipelines SDK を使用してパイプラインを構築するために、Kubeflow Pipelines SDK v1.8 以降をインストールします。

  3. パイプラインで Vertex AI Python クライアントを使用するために、Vertex AI クライアント ライブラリ v1.7 以降をインストールします。

  4. パイプラインで Vertex AI サービスを使用するために、Google Cloud パイプライン コンポーネント SDK をインストールします。

パイプラインの構築を開始する

Vertex AI Pipelines で ML ワークフローをオーケストレートするには、まずワークフローをパイプラインとして記述する必要があります。次のサンプルでは、データセットの作成、AutoML によるモデルのトレーニング、予測用のトレーニング済みモデルのデプロイを Vertex AI を使って行うにあたり、Google Cloud パイプライン コンポーネントを使用する方法を示します。

次のコードサンプルを実行する前に、認証を設定する必要があります。

認証の設定方法

認証を設定するには、サービス アカウント キーを作成して、サービス アカウント キーへのパスを示す環境変数を設定する必要があります。

  1. サービス アカウントを作成します。

    1. Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動

    2. [サービス アカウント名] フィールドに名前を入力します。
    3. 省略可: [サービス アカウントの説明] フィールドに説明を入力します。
    4. [作成] をクリックします。
    5. [ロールを選択] フィールドをクリックします。[すべてのロール] で、[Vertex AI] > [Vertex AI ユーザー] を選択します。
    6. [完了] をクリックして、サービス アカウントを作成します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  2. 認証に使用するサービス アカウント キーを作成します。

    1. Google Cloud コンソールで、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  3. 新しいサービス アカウントに、パイプラインの実行に使用するサービス アカウントへのアクセスを許可します。
    1. [] をクリックしてサービス アカウントのリストに戻ります。
    2. パイプラインの実行に使用するサービス アカウントの名前をクリックします。[サービス アカウントの詳細] ページが表示されます。

      ガイドの手順に沿って Vertex AI Pipelines 用のプロジェクトを構成した場合、これは、細かな権限を持つサービス アカウントの構成で作成したサービス アカウントと同じになります。それ以外の場合、Vertex AI は Compute Engine のデフォルト サービス アカウントを使用してパイプラインを実行します。Compute Engine のデフォルトのサービス アカウントの名前は PROJECT_NUMBER-compute@developer.gserviceaccount.com です。

    3. [権限] タブをクリックします。
    4. [アクセス権を付与] をクリックします。[プリンシパルの追加] パネルが表示されます。
    5. [新しいプリンシパル] ボックスに、前の手順で作成したサービス アカウントのメールアドレスを入力します。
    6. [ロール] プルダウン リストで、[サービス アカウント] > [サービス アカウント ユーザー] を選択します。
    7. [保存] をクリックします。
  4. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。この変数は現在の shell セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

    例: Linux または macOS

    [PATH] をサービス アカウント キーが含まれる JSON ファイルのパスに置き換えます。

    export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

    次に例を示します。

    export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"

    例: Windows

    [PATH] は、サービス アカウント キーが保存されている JSON ファイルのパスに置き換え、[FILE_NAME] はファイル名に置き換えます。

    PowerShell を使用する場合:

    $env:GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

    次に例を示します。

    $env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\[FILE_NAME].json"

    コマンド プロンプトを使用する場合:

    set GOOGLE_APPLICATION_CREDENTIALS=[PATH]

Kubeflow Pipelines DSL パッケージを使用してワークフローを定義する

kfp.dsl パッケージには、パイプラインとコンポーネントを定義して操作するために使用できるドメイン固有の言語(DSL)が含まれています。

Kubeflow Pipelines コンポーネントは、パイプライン ステップを作成するファクトリ関数です。各コンポーネントでは、コンポーネントの入力、出力、実装を記述します。たとえば、以下のコードサンプルでは、ds_op がコンポーネントです。

コンポーネントは、パイプライン ステップを作成するために使用されます。パイプラインが実行されると、その依存データが使用可能になったときにステップが実行されます。たとえば、トレーニング コンポーネントで CSV ファイルを入力として受け取り、それを使用してモデルをトレーニングできます。

import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.dataset import ImageDatasetCreateOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLImageTrainingJobRunOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

project_id = PROJECT_ID
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    # The first step of your workflow is a dataset generator.
    # This step takes a Google Cloud Pipeline Component, providing the necessary
    # input arguments, and uses the Python variable `ds_op` to define its
    # output. Note that here the `ds_op` only stores the definition of the
    # output but not the actual returned object from the execution. The value
    # of the object is not accessible at the dsl.pipeline level, and can only be
    # retrieved by providing it as the input to a downstream component.
    ds_op = ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

    # The second step is a model training component. It takes the dataset
    # outputted from the first step, supplies it as an input argument to the
    # component (see `dataset=ds_op.outputs["dataset"]`), and will put its
    # outputs into `training_job_run_op`.
    training_job_run_op = AutoMLImageTrainingJobRunOp(
        project=project_id,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        dataset=ds_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )

    # The third and fourth step are for deploying the model.
    create_endpoint_op = EndpointCreateOp(
        project=project_id,
        display_name = "create-endpoint",
    )

    model_deploy_op = ModelDeployOp(
        model=training_job_run_op.outputs["model"],
        endpoint=create_endpoint_op.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1,
    )

次のように置き換えます。

  • PROJECT_ID: このパイプラインが実行される Google Cloud プロジェクト。
  • PIPELINE_ROOT_PATH: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定します。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。

    パイプライン ルートは、パイプライン関数の @kfp.dsl.pipeline アノテーションの引数として設定することも、create_run_from_job_spec を呼び出してパイプライン実行を作成する際に設定することもできます。

パイプラインを YAML ファイルにコンパイルする

パイプラインのワークフローを定義したら、パイプラインを YAML 形式にコンパイルできます。YAML ファイルには、Vertex AI Pipelines でパイプラインを実行するためのすべての情報が含まれます。

from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='image_classif_pipeline.yaml'
)

パイプライン実行を送信する

パイプラインのワークフローを YAML 形式にコンパイルしたら、Vertex AI Python クライアントでパイプラインを送信して実行できます。

import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=project_id,
    location=PROJECT_REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="image_classif_pipeline.yaml",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    }
)

job.submit()

次のように置き換えます。

  • PROJECT_REGION: このパイプラインが実行されるリージョン。

上の例の各要素の内容は次のとおりです。

  1. Kubeflow パイプラインは Python 関数として定義されます。関数にはアノテーションとして @kfp.dsl.pipeline デコレータが付けられます。これは、パイプラインの名前とルートパスを指定します。パイプラインのルートパスは、パイプラインのアーティファクトが保存される場所です。
  2. パイプラインのワークフロー ステップは、Google Cloud パイプライン コンポーネントを使用して作成されます。コンポーネントの出力を別のコンポーネントの入力として使用することで、パイプラインのワークフローをグラフとして定義します。たとえば、training_job_run_opds_opdataset 出力に依存します。
  3. パイプラインをコンパイルするには kfp.compiler.Compiler を使用します。
  4. パイプライン実行は、Vertex AI Python クライアントを使用して Vertex AI Pipelines に作成します。パイプライン名とパイプラインのルートパスは、パイプラインの実行時にオーバーライドできます。パイプライン実行は、パイプライン名を使用してグループ化できます。パイプライン名をオーバーライドすると、本番環境と試験運用版のパイプライン実行を区別しやすくなります。

パイプラインの構築について詳しくは、Kubeflow パイプラインの構築セクションと、サンプルとチュートリアルをご覧ください。

パイプラインをローカルでテストする(省略可)

パイプラインとコンポーネントを定義したら、ローカルの作成環境でコードを実行してコンポーネント コードをテストできます。パイプラインまたはコンポーネントをローカルで実行すると、Vertex AI Pipelines などのリモート環境でパイプライン実行を作成する前に、潜在的な問題を特定してデバッグできます。パイプラインとコンポーネントをローカルで実行する方法については、KFP のドキュメントローカル実行をご覧ください。

このページでは、2 つのタスクで構成されるパイプラインを定義して実行する方法について説明します。

ローカル環境を設定する

  1. Kubeflow Pipelines(KFP)SDK v2 の最新のマイナー バージョンをインストールします。

    pip install --upgrade kfp>=2,<3
    
  2. 省略可: Docker をインストールします。

  3. 次のコードサンプルを使用して、シンプルなパイプラインを定義します。

    from kfp import dsl
    
    # Define a component to add two numbers.
    @dsl.component
    def add(a: int, b: int) -> int:
        return a + b
    
    # Define a simple pipeline using the component.
    @dsl.pipeline
    def addition_pipeline(x: int, y: int, z: int) -> int:
        task1 = add(a=x, b=y)
        task2 = add(a=task1.output, b=z)
        return task2.output
    

ローカル実行を呼び出す

local.init() 関数を使用してローカル セッションを初期化します。local.init() を使用すると、パイプラインとコンポーネントを呼び出すときに KFP SDK がローカルで実行されます。

local.init() を使用する場合は、ランナータイプを指定する必要があります。ランナータイプは、KFP が各タスクをどのように実行するかを示します。

次のサンプルを使用して、コンテナ内の各タスクを実行する DockerRunner ランナータイプを指定します。KFP でサポートされているローカル ランナーの詳細については、KFP ドキュメントのローカル ランナーをご覧ください。

from kfp import local

local.init(runner=local.DockerRunner())

pipeline_task = addition_pipeline(x=1, y=2, z=3)

次のコードを使用して、ローカル実行時のパイプライン タスクの出力を表示します。

print(f'Result: {pipeline_task.output}')

Kubeflow パイプラインを構築する

次のプロセスを使用してパイプラインを構築します。

  1. パイプラインを一連のコンポーネントとして設計します。再利用性を高めるため、各コンポーネントが単一の責任を持つようにします。可能な限り、Google Cloud パイプライン コンポーネントなどの実績のあるコンポーネントを再利用するようにパイプラインを設計します。

    パイプラインの設計について学習する

  2. Kubeflow Pipelines SDK を使用して、ML ワークフローの実装に必要なカスタム コンポーネントを構築します。コンポーネントは、ML ワークフローのステップを実行する自己完結型のコードセットです。パイプライン コンポーネントを作成するには、次のオプションを使用します。

  3. パイプラインを Python 関数として構築します。

    パイプラインを Python 関数として定義する方法について学習する

  4. Kubeflow Pipelines SDK コンパイラを使用してパイプラインをコンパイルします。

    from kfp import compiler
    
    compiler.Compiler().compile(
        pipeline_func=PIPELINE_FUNCTION,
        package_path=PIPELINE_PACKAGE_PATH)
    

    次のように置き換えます。

    • PIPELINE_FUNCTION: パイプラインの関数の名前。
    • PIPELINE_PACKAGE_PATH: コンパイル済みのパイプラインを保存する場所。
  5. Google Cloud Console または Python を使用してパイプラインを実行します

パイプライン内の Google Cloud リソースにアクセスする

パイプラインの実行時にサービス アカウントを指定しない場合、Vertex AI Pipelines は Compute Engine のデフォルトのサービス アカウントを使用してパイプラインを実行します。Vertex AI Pipelines はまた、パイプライン実行のサービス アカウントを使用して、パイプラインの Google Cloud リソースへのアクセスも承認します。Compute Engine のデフォルトのサービス アカウントには、プロジェクト編集者のロールがデフォルトで付与されています。これにより、パイプラインに対して、Google Cloud プロジェクト内の Google Cloud リソースへの過剰なアクセス権が付与されることがあります。

パイプラインを実行するサービス アカウントを作成し、パイプラインの実行に必要な Google Cloud リソースに対するきめ細かな権限をアカウントに付与することをおすすめします。

Identity and Access Management を使用してサービス アカウントを作成し、サービス アカウントに付与したアクセス権を管理する方法を確認してください。

パイプラインを最新の状態に保つ

パイプラインの構築と実行に使用する SDK クライアントとコンテナ イメージは、セキュリティの脆弱性に対するパッチの適用と新しい機能の追加のために定期的に新しいバージョンに更新されます。パイプラインを最新バージョンで最新の状態に保つために、以下の作業を行うことをおすすめします。

次のステップ