パイプラインを再実行する

完了、キャンセル、失敗した ML パイプライン実行から特定のタスクを再実行できます。再実行を開始するときに、タスクレベルの構成を変更したり、タスクをスキップしたりして、更新された構成に基づいて実行を作成できます。新しいパイプライン実行では、追跡可能性を確保するために、元のパイプライン実行への参照が維持されます。タスクが以前の実行で成功した場合、Vertex AI Pipelines はそのタスクのキャッシュに保存された結果を再利用します。それ以外の場合、ステップが失敗した場合は、Vertex AI Pipelines はパイプラインの再実行時にステップを実行します。

これにより、ML パイプライン全体を再起動せずに ML パイプラインを調整することで、ML パイプラインの障害に効率的に対応できます。失敗したタスクを調整したり、さまざまなパラメータ セットで結果を比較したり、失敗している重要でないタスクをスキップしたりできます。

パイプラインの再実行は、本番環境で複雑な ML パイプラインを管理する MLOps 実務担当者にとって有用です。次のような場合に役立ちます。

  • 並列プロセスでの部分的な障害の処理: 大規模な並列プロセスの一部が失敗した場合、失敗したタスクをスキップして、パイプラインの残りの部分の実行を続行できます。たとえば、100 個のタスクのうち 1 つのデータ パイプラインが失敗した場合、そのタスクをスキップできます。

  • 更新された入力データでタスクを再実行する: 更新されたデータで 1 つのタスクを再実行する必要がある場合は、その特定のタスクを再実行できます。

  • コードの変更を必要とせずに本番環境の問題をデバッグする: パイプライン コードの作成者を関与させることなく、特定のタスクとそのタスクに依存するすべてのタスクを再実行します。

パイプラインを再実行する

パイプラインを再実行するには、Vertex AI SDK for Python を使用します。

Python

次のサンプルでは、PipelineJob.rerun() メソッドを使用して、失敗したタスクをスキップし、更新されたパラメータを使用して別のタスクを再実行することで、パイプラインを再実行します。

from google.cloud import aiplatform
from google.cloud.aiplatform.preview.pipelinejob.pipeline_jobs import (
  _PipelineJob as PipelineJob
)
from google.cloud.aiplatform_v1beta1.types.ui_pipeline_spec import RuntimeArtifact
from google.protobuf.struct_pb2 import Value
from google.cloud.aiplatform_v1beta1.types import PipelineTaskRerunConfig
aiplatform.init(project="PROJECT_ID", location="LOCATION")
job = aiplatform.PipelineJob.get(resource_name="PIPELINE_RUN_RESOURCE_NAME")
original_job_name = job.resource_name
rerun_task_id = None
skip_failed_task_id = None
task_inputs_override = PipelineTaskRerunConfig.Inputs(
    parameter_values={
      "TASK_PARAMETER_1": Value(TASK_PARAMETER_1_VALUE),
      "TASK_PARAMETER_2": Value(TASK_PARAMETER_2_VALUE)
    }
)
for task in job.task_details:
    if task.task_name == RERUN_TASK_NAME:
        rerun_task_id = task.task_id
    if task.task_name == SKIP_FAILED_TASK_NAME:
        skip_failed_task_id = task.task_id
pipeline_job.rerun(original_pipelinejob_name=original_job_name,
  pipeline_task_rerun_configs=[
    PipelineTaskRerunConfig(task_id = rerun_task_id,
      skip_task = False,
      inputs = PipelineTaskRerunConfig.Inputs(task_inputs_override)
    ),
    PipelineTaskRerunConfig(task_id = skip_failed_task_id,
      skip_task = True
    )
  ],
  parameter_values=PIPELINE_PARAMETER_VALUES,
  job_id=RERUN_PIPELINE_JOB_ID)

次のように置き換えます。

  • PROJECT_ID: パイプライン実行を含む Google Cloud プロジェクト。
  • LOCATION: パイプライン実行が配置されているリージョン。Vertex AI Pipelines を利用できるリージョンの詳細については、Vertex AI ロケーション ガイドをご覧ください。
  • PIPELINE_RUN_RESOURCE_NAME: 再実行する完了、失敗、またはキャンセルされたパイプライン実行の完全修飾リソース名。リソース名を projects/PROJECT_NUMBER/locations/LOCATION/pipelineJobs/PIPELINE_RUN_ID の形式で入力します。

    • PROJECT_NUMBER: プロジェクトのプロジェクト番号。このプロジェクト番号は、 Google Cloud コンソールで確認できます。詳細については、プロジェクト名、番号、ID を確認するをご覧ください。
    • PIPELINE_RUN_ID は、再実行するパイプライン実行の一意の ID です。この ID は、 Google Cloud コンソールの [パイプライン] ページの [実行] タブに表示されます。
  • RERUN_TASK_NAME: 更新されたパラメータで再実行するタスクの名前。
  • SKIP_FAILED_TASK_NAME: 再実行時にスキップする失敗したタスクの名前。
  • TASK_PARAMETER_1TASK_PARAMETER_2: パイプラインの再実行でオーバーライドするタスクのパラメータの名前。
  • TASK_PARAMETER_1_VALUETASK_PARAMETER_2_VALUE: パイプラインの再実行における TASK_PARAMETER_1TASK_PARAMETER_2 の新しい値。
  • PIPELINE_PARAMETER_VALUES: 省略可。パイプラインの再実行に使用する更新されたパイプライン実行レベルのパラメータ値。
  • RERUN_PIPELINE_JOB_ID: 省略可。新しいパイプライン再実行ジョブに割り当てる一意の ID。