パイプラインの構築

Vertex Pipelines を使用すると、サーバーレスで機械学習(ML)ワークフローをオーケストレートできます。Vertex Pipelines で ML ワークフローをオーケストレートする前に、ワークフローをパイプラインとして記述する必要があります。ML パイプラインは、コンテナと Google Cloud サービスに基づく、移植可能でスケーラブルな ML ワークフローです。

このガイドでは、ML パイプラインの構築を開始する方法について説明します。

使用するパイプライン SDK について

Vertex Pipelines は、Kubeflow Pipelines SDK v1.6 以降または TensorFlow Extended v0.30.0 以降を使用して構築されたパイプラインを実行できます。

  • テラバイト単位の構造化データまたはテキストデータを処理する ML ワークフローで TensorFlow を使用する場合は、TFX を使用してパイプラインを構築することをおすすめします。

  • その他のユースケースでは、Kubeflow Pipelines SDK を使用してパイプラインを構築することをおすすめします。Kubeflow Pipelines SDK でパイプラインを構築すると、カスタム コンポーネントを構築、または Google Cloud パイプライン コンポーネントなどのビルド済みコンポーネントを再利用して、ワークフローを実装できます。Google Cloud のパイプライン コンポーネントを使用すると、パイプラインで AutoML などの Vertex AI サービスを簡単に使用できます。

このガイドでは、Kubeflow Pipelines SDK を使用してパイプラインを構築する方法について説明します。

始める前に

パイプラインの構築と実行を行う前に、次の手順で Google Cloud プロジェクトと開発環境を設定します。

  1. Cloud プロジェクトで ML パイプラインを実行できるようにするには、ガイドの Cloud プロジェクトの構成手順を行います。

  2. Kubeflow Pipelines SDK を使用してパイプラインを構築するには、Kubeflow Pipelines SDK v1.6 以降をインストールします。

  3. パイプラインで Vertex AI サービスを使用するには、Google Cloud パイプライン コンポーネントをインストールします。

パイプラインの構築を開始する

Vertex Pipelines で ML ワークフローをオーケストレートするには、まずワークフローをパイプラインとして記述する必要があります。次のサンプルは、データセットを作成して、AutoML を使用してモデルをトレーニングし、予測用にトレーニング済みのモデルをデプロイするために、Vertex AI を使用して Google Cloud パイプライン コンポーネントを使用する方法を示しています。

次のコードサンプルを実行する前に、認証を設定する必要があります。

import kfp
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

project_id = PROJECT_ID
region = REGION
pipeline_root_path = PIPELINE_ROOT

@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    ds_op = gcc_aip.ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

    training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
        project=project_id,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        base_model=None,
        dataset=ds_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )

    endpoint_op = gcc_aip.ModelDeployOp(
        project=project_id, model=training_job_run_op.outputs["model"]
    )

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='image_classif_pipeline.json')

api_client = AIPlatformClient(project_id=project_id, region=region)

response = api_client.create_run_from_job_spec(
    'image_classif_pipeline.json',
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    })

次のように置き換えます。

  • PROJECT_ID: このパイプラインが実行される Google Cloud プロジェクト。
  • REGION: このパイプラインが実行されるリージョン。
  • PIPELINE_ROOT_PATH: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定する。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。

    パイプライン ルートは、パイプライン関数の @kfp.dsl.pipeline アノテーションの引数として設定することも、create_run_from_job_spec を呼び出してパイプライン実行を作成する際に設定することもできます。

上の例の各要素の内容は次のとおりです。

  1. Kubeflow パイプラインは Python 関数として定義されます。関数にはアノテーションとして @kfp.dsl.pipeline デコレータが付けられます。これは、パイプラインの名前とルートパスを指定します。パイプラインのルートパスは、パイプラインのアーティファクトが保存される場所です。
  2. パイプラインのワークフロー ステップは、Google Cloud パイプライン コンポーネントを使用して作成されます。コンポーネントの出力を別のコンポーネントの入力として使用することで、パイプラインのワークフローをグラフとして定義します。たとえば、training_job_run_opds_opdataset 出力に依存しています。
  3. パイプラインをコンパイルするには kfp.v2.compiler.Compiler を使用します。
  4. kfp.v2.google.client.AIPlatformClient を使用して、Vertex Pipelines にパイプライン実行を作成します。パイプラインの実行時に、パイプライン名とパイプラインのルートパスをオーバーライドできます。パイプライン実行は、パイプライン名を使用してグループ化できます。パイプライン名をオーバーライドすると、本番環境と試験運用版のパイプライン実行を区別しやすくなります。

パイプラインの構築について詳しくは、Kubeflow パイプラインの構築セクションと、サンプルとチュートリアルをご覧ください。

Kubeflow Pipelines を構築する

次のプロセスを使用してパイプラインを構築します。

  1. パイプラインを一連のコンポーネントとして設計します。再利用性を促進するため、各コンポーネントは単一責任を持つようにする必要があります。可能な限り、Google Cloud Pipeline コンポーネントなどの実績のあるコンポーネントを再利用するようにパイプラインを設計します。

    パイプラインの設計について学習する

  2. Kubeflow Pipelines SDK v2 を使用して、ML ワークフローの実装に必要なカスタム コンポーネントを構築します。コンポーネントは、ML ワークフローのステップを実行する自己完結型のコードセットです。パイプライン コンポーネントを作成するには、次のオプションを使用します。

  3. パイプラインを Python 関数として構築します。

    パイプラインを Python 関数として定義する方法について学習する

  4. Kubeflow Pipelines SDK v2 コンパイラを使用してパイプラインをコンパイルします。

    from kfp.v2 import compiler
    
    compiler.Compiler().compile(
        pipeline_func=PIPELINE_FUNCTION,
        package_path=PIPELINE_PACKAGE_PATH)
    

    次のように置き換えます。

    • PIPELINE_FUNCTION: パイプラインの関数の名前。
    • PIPELINE_PACKAGE_PATH: コンパイル済みのパイプラインを保存する場所。
  5. Google Cloud Console または Python を使用してパイプラインを実行します

Google Cloud のパイプライン コンポーネントを使用する

Google Cloud パイプライン コンポーネントは、パイプラインで Vertex AI をより簡単に使用できるようにする Kubeflow のパイプライン コンポーネントです。これらのコンポーネントを使用して、次のようなタスクを行います。

  • 新しいデータセットを作成し、画像データ、表形式データ、テキストデータ、動画データをデータセットに読み込む。
  • データセットから画像データ、表形式データ、テキストデータ、動画データを Cloud Storage にエクスポートする。
  • AutoML を使用して、画像データ、表形式データ、テキストデータ、動画データを使用してモデルをトレーニングする。
  • カスタム コンテナまたは Python パッケージを使用して、カスタム トレーニング ジョブを実行する。
  • モデルを Vertex AI にアップロードする。
  • 予測用の新しいエンドポイントを作成する。
  • 予測用のエンドポイントにモデルをデプロイする。

Google Cloud のパイプライン コンポーネントの詳細を確認してください。

次のサンプルを使用して、Google Cloud のパイプライン コンポーネントの詳細を確認してください。

パイプライン ステップのマシンタイプを指定する

Kubeflow Pipelines コンポーネントは、パイプライン ステップを作成するファクトリ関数です。各コンポーネントでは、コンポーネントの入力、出力、実装を記述します。たとえば、以下のコードサンプルでは、train_op がコンポーネントです。

コンポーネントは、パイプライン ステップを作成するために使用されます。パイプラインが実行されると、その依存データが使用可能になったときにステップが実行されます。

たとえば、トレーニング コンポーネントで CSV ファイルを入力として受け取り、それを使用してモデルをトレーニングできます。パイプライン ステップにマシンタイプ パラメータを設定することで、パイプラインの各ステップの要件を管理できます。2 つのトレーニング ステップがあり、最初のステップでは巨大なデータファイルを使用し、2 番目のステップでは小さなデータファイルを使用する場合は、最初のタスクにより多くのメモリと CPU を割り当て、2 つ目のタスクに割り当てるリソースを少なくすることができます。

次の例は、ステップに CPU、メモリ、GPU 構成を設定する方法を示しています。

@dsl.pipeline(name='custom-container-pipeline')
def pipeline():
  generate = generate_op()
  train = (train_op(
      training_data=generate.outputs['training_data'],
      test_data=generate.outputs['test_data'],
      config_file=generate.outputs['config_file']).
    set_cpu_limit('CPU_LIMIT').
    set_memory_limit('MEMORY_LIMIT').
    add_node_selector_constraint(SELECTOR_CONSTRAINT).
    set_gpu_limit(GPU_LIMIT))

次のように置き換えます。

  • CPU_LIMIT: このオペレータの CPU の上限。この文字列値は、数値(CPU の数を表す整数値)か、数値の後に「m」が続く値になります(m は 1/1000 を意味します)。
  • MEMORY_LIMIT: このオペレータのメモリの上限。この文字列値は、数値か、数値の後に「K」(キロバイト)、M(メガバイト)、または「G」(ギガバイト)が後に続く文字列です。
  • SELECTOR_CONSTRAINT: 各制約は Key-Value ペアのラベルです。コンテナをノードで実行できるようにするには、ノードで各制約がラベルとして指定されている必要があります。次に例を示します。

    • 'cloud.google.com/gke-accelerator', 'nvidia-tesla-k80'

    GPU リソースの詳細については、カスタム トレーニング用のコンピューティング リソースの構成をご覧ください。

  • GPU_LIMIT: オペレータの GPU の上限(正の数)。

パイプライン内の Google Cloud リソースにアクセスする

パイプラインの実行時にサービス アカウントを指定しない場合、Vertex Pipelines は Compute Engine のデフォルトのサービス アカウントを使用してパイプラインを実行します。Vertex Pipelines はまた、パイプライン実行のサービス アカウントを使用して、パイプラインの Google Cloud リソースへのアクセスも承認します。Compute Engine のデフォルトのサービス アカウントには、プロジェクト編集者のロールがデフォルトで付与されています。これにより、パイプラインに対して、Google Cloud プロジェクト内の Google Cloud リソースへの過剰なアクセス権が付与されることがあります。

パイプラインを実行するサービス アカウントを作成し、パイプラインの実行に必要な Google Cloud リソースに対するきめ細かな権限をアカウントに付与することをおすすめします。

Identity and Access Management を使用してサービス アカウントを作成し、サービス アカウントに付与したアクセス権を管理する方法を確認してください。

実行キャッシュについて

Vertex Pipelines は、パイプラインを実行するときに、各パイプライン ステップのインターフェースを含む実行が、Vertex ML Metadata に存在するかどうかを確認します。ステップのインターフェースは、パイプライン、パイプライン ステップ、パイプライン ステップの入力、パイプライン ステップの出力の組み合わせとして定義されます。Vertex ML Metadata に一致する実行がある場合、その実行の出力が使用され、ステップはスキップされます。これにより、前のパイプライン実行で完了した計算がスキップされ、コストの削減に役立ちます。

Python を使用して作成されたパイプライン実行の実行キャッシュを無効にできます。create_run_from_job_spec を使用してパイプラインを実行する場合、このパイプライン実行がキャッシュを使用しないことを指定するために、enable_caching 引数を使用できます。パイプライン実行の作成の詳細について確認してください

Vertex Pipelines と Kubeflow Pipelines の違いを理解する

Kubeflow Pipelines を構築した経験があるデベロッパーにとって、Vertex Pipelines は Kubeflow Pipelines と次のような違いがあることを理解しておくことが重要です。

  • Vertex Pipelines は、TFX v0.30.0 以降または Kubeflow Pipelines SDK v2 ドメイン固有言語(DSL)を使用して構築されたパイプラインを実行できます。Kubeflow Pipelines SDK v2 DSL は、Kubeflow Pipelines SDK v1.6 以降で使用できます。

    Kubeflow Pipelines は、Kubeflow Pipelines SDK を使用して構築されたパイプラインを実行できます。Kubeflow Pipelines v1.6 以降は、Kubeflow Pipelines SDK v2 DSL を使用して構築されたパイプラインも実行できます。

  • Kubeflow Pipelines では、永続ボリューム クレーム(PVC)などの Kubernetes リソースをパイプラインで使用できます。

    Vertex Pipelines では、Google Cloud サービスを使用してリソースを利用できます。たとえば、パイプラインのステップでマウントされたボリュームとして Cloud Storage バケットにアクセスするには、Cloud Storage FUSE を使用できます。Cloud Storage URI が gs://example-bucket/example-pipeline の場合、パイプライン コンポーネントのコンテナは、Cloud Storage FUSE を使用して、/gcs/example-bucket/example-pipeline というパスとしてその URI にアクセスできます。

  • Vertex Pipelines を使用してパイプラインを実行する場合は、パイプライン ルートが @pipeline アノテーションで指定されているか、パイプライン実行の作成時に指定されている必要があります。

    Kubeflow Pipelines では、パイプラインのルートの指定は任意です。パイプライン実行のアーティファクトは、デフォルトで MinIO を使用して保存されます。

  • 現在、以下の Kubeflow Pipelines 機能は、Vertex Pipelines ではサポートされていません。

    • Kubeflow Pipelines では、Kubeflow Pipelines SDK v1 DSL を使用して、指定した期間の経過後に、キャッシュされたコンポーネントの実行が期限切れになるように指定できます。

      現時点では、Kubeflow Pipelines SDK v2 DSL を使用して、指定した期間の経過後に、コンポーネントの実行が期限切れになるように指定することはできません。

      Vertex Pipelines では、create_run_from_job_spec を使用してパイプラインを実行する場合、enable_caching 引数を使用して、このパイプライン実行でキャッシュを使用しないことを指定できます。

    • Kubeflow Pipelines では、再帰的に呼び出されるパイプライン コンポーネントを指定できます。

      現時点では、Vertex Pipelines は、再帰的に呼び出されるパイプライン コンポーネントをサポートしていません。

サンプルとチュートリアル

次のサンプルとチュートリアルを使用して、Vertex Pipelines の詳細について確認してください。

次のステップ