重新运行流水线

您可以重新运行已完成、已取消或失败的机器学习 (ML) 流水线运行中的特定任务。发起重新运行后,您可以修改任务级配置,也可以选择跳过任务,然后根据更新后的配置创建运行。新的流水线运行会保留对原始流水线运行的引用,以便进行跟踪。如果某项任务在之前的运行中成功完成,Vertex AI Pipelines 会重复使用该任务的缓存结果。否则,如果相应步骤之前失败,Vertex AI Pipelines 会在流水线重新运行时运行该步骤。

这样,您无需重启整个机器学习流水线,只需调整机器学习流水线即可高效解决机器学习流水线故障。您可以调整失败的任务、比较不同参数集的结果,或跳过失败的非必要任务。

对于在生产环境中管理复杂机器学习流水线的 MLOps 从业人员来说,重新运行流水线非常有用。以下是一些适用场景示例:

  • 处理并行流程中的部分失败:当大型并行流程的一部分失败时,您可以跳过失败的任务,让流水线的其余部分继续运行。例如,如果 100 个任务中有一个任务的数据流水线失败,您可以跳过该任务。

  • 使用更新后的输入数据重新运行任务:如果需要使用更新后的数据重新运行单个任务,您可以重新运行该特定任务。

  • 无需更改代码即可调试生产环境问题:重新运行特定任务及其所有依赖任务,而无需涉及流水线代码的作者。

重新运行流水线

如需重新运行流水线,请使用 Vertex AI SDK for Python。

Python

使用以下示例,通过 PipelineJob.rerun() 方法跳过失败的任务并使用更新后的参数重新运行另一项任务,从而重新运行流水线:

from google.cloud import aiplatform
from google.cloud.aiplatform.preview.pipelinejob.pipeline_jobs import (
  _PipelineJob as PipelineJob
)
from google.cloud.aiplatform_v1beta1.types.ui_pipeline_spec import RuntimeArtifact
from google.protobuf.struct_pb2 import Value
from google.cloud.aiplatform_v1beta1.types import PipelineTaskRerunConfig
aiplatform.init(project="PROJECT_ID", location="LOCATION")
job = aiplatform.PipelineJob.get(resource_name="PIPELINE_RUN_RESOURCE_NAME")
original_job_name = job.resource_name
rerun_task_id = None
skip_failed_task_id = None
task_inputs_override = PipelineTaskRerunConfig.Inputs(
    parameter_values={
      "TASK_PARAMETER_1": Value(TASK_PARAMETER_1_VALUE),
      "TASK_PARAMETER_2": Value(TASK_PARAMETER_2_VALUE)
    }
)
for task in job.task_details:
    if task.task_name == RERUN_TASK_NAME:
        rerun_task_id = task.task_id
    if task.task_name == SKIP_FAILED_TASK_NAME:
        skip_failed_task_id = task.task_id
pipeline_job.rerun(original_pipelinejob_name=original_job_name,
  pipeline_task_rerun_configs=[
    PipelineTaskRerunConfig(task_id = rerun_task_id,
      skip_task = False,
      inputs = PipelineTaskRerunConfig.Inputs(task_inputs_override)
    ),
    PipelineTaskRerunConfig(task_id = skip_failed_task_id,
      skip_task = True
    )
  ],
  parameter_values=PIPELINE_PARAMETER_VALUES,
  job_id=RERUN_PIPELINE_JOB_ID)

替换以下内容:

  • PROJECT_ID:包含流水线运行的 Google Cloud 项目。
  • LOCATION:流水线运行所在的区域。如需详细了解支持 Vertex AI Pipelines 的区域,请参阅 Vertex AI 位置指南
  • PIPELINE_RUN_RESOURCE_NAME:您要重新运行的已完成、失败或已取消的流水线运行的完全限定资源名称。 以 projects/PROJECT_NUMBER/locations/LOCATION/pipelineJobs/PIPELINE_RUN_ID 格式输入资源名称,其中:

    • PROJECT_NUMBER:您的项目的项目编号。您可以在 Google Cloud 控制台中找到此项目编号。如需了解详情,请参阅查找项目名称、编号和 ID
    • PIPELINE_RUN_ID 替换为您要重新运行的流水线运行的唯一 ID。该 ID 显示在 Google Cloud 控制台流水线页面上的运行标签页中。
  • RERUN_TASK_NAME:要使用更新后的参数重新运行的任务的名称。
  • SKIP_FAILED_TASK_NAME:在重新运行期间要跳过的失败任务的名称。
  • TASK_PARAMETER_1TASK_PARAMETER_2:您要在流水线重新运行中替换的任务的参数名称。
  • TASK_PARAMETER_1_VALUETASK_PARAMETER_2_VALUE:流水线重新运行时 TASK_PARAMETER_1TASK_PARAMETER_2 的新值。
  • PIPELINE_PARAMETER_VALUES:可选。用于流水线重新运行的更新后的流水线运行级参数值。
  • RERUN_PIPELINE_JOB_ID:可选。要分配给新的流水线重新运行作业的唯一 ID。