Executar um pipeline novamente

É possível executar novamente tarefas específicas de execuções de pipeline de machine learning (ML) concluídas, canceladas ou com falha. Ao iniciar uma nova execução, é possível modificar as configurações no nível da tarefa ou pular tarefas e criar uma execução com base na configuração atualizada. A nova execução de pipeline mantém uma referência à execução original para rastreabilidade. Se uma tarefa foi concluída em uma execução anterior, o Vertex AI Pipelines reutiliza os resultados armazenados em cache dessa tarefa. Caso contrário, se a etapa tiver falhado, o Vertex AI Pipelines a executará durante a nova execução do pipeline.

Isso permite resolver falhas no pipeline de ML de maneira eficiente fazendo ajustes nele sem precisar reiniciar todo o processo. É possível ajustar uma tarefa com falha, comparar resultados com diferentes conjuntos de parâmetros ou pular uma tarefa não essencial que está falhando.

A repetição de um pipeline é útil para profissionais de MLOps que gerenciam pipelines de ML complexos em produção. Confira alguns exemplos de cenários em que isso é útil:

  • Como lidar com falhas parciais em processos paralelos: quando uma parte de um grande processo paralelo falha, é possível pular a tarefa com falha e deixar o restante do pipeline continuar sendo executado. Por exemplo, se um pipeline de dados para uma de 100 tarefas falhar, você poderá ignorá-lo.

  • Executar novamente uma tarefa com dados de entrada atualizados: se uma única tarefa precisar ser executada novamente com dados atualizados, você poderá fazer isso.

  • Depurar problemas de produção sem exigir mudanças no código: execute novamente uma tarefa específica e todas as tarefas que dependem dela sem envolver o autor do código do pipeline.

Executar um pipeline novamente

Para executar um pipeline novamente, use o SDK da Vertex AI para Python.

Python

Use o exemplo a seguir para executar novamente um pipeline pulando uma tarefa com falha e executando outra tarefa com parâmetros atualizados usando o método PipelineJob.rerun():

from google.cloud import aiplatform
from google.cloud.aiplatform.preview.pipelinejob.pipeline_jobs import (
  _PipelineJob as PipelineJob
)
from google.cloud.aiplatform_v1beta1.types.ui_pipeline_spec import RuntimeArtifact
from google.protobuf.struct_pb2 import Value
from google.cloud.aiplatform_v1beta1.types import PipelineTaskRerunConfig
aiplatform.init(project="PROJECT_ID", location="LOCATION")
job = aiplatform.PipelineJob.get(resource_name="PIPELINE_RUN_RESOURCE_NAME")
original_job_name = job.resource_name
rerun_task_id = None
skip_failed_task_id = None
task_inputs_override = PipelineTaskRerunConfig.Inputs(
    parameter_values={
      "TASK_PARAMETER_1": Value(TASK_PARAMETER_1_VALUE),
      "TASK_PARAMETER_2": Value(TASK_PARAMETER_2_VALUE)
    }
)
for task in job.task_details:
    if task.task_name == RERUN_TASK_NAME:
        rerun_task_id = task.task_id
    if task.task_name == SKIP_FAILED_TASK_NAME:
        skip_failed_task_id = task.task_id
pipeline_job.rerun(original_pipelinejob_name=original_job_name,
  pipeline_task_rerun_configs=[
    PipelineTaskRerunConfig(task_id = rerun_task_id,
      skip_task = False,
      inputs = PipelineTaskRerunConfig.Inputs(task_inputs_override)
    ),
    PipelineTaskRerunConfig(task_id = skip_failed_task_id,
      skip_task = True
    )
  ],
  parameter_values=PIPELINE_PARAMETER_VALUES,
  job_id=RERUN_PIPELINE_JOB_ID)

Substitua:

  • PROJECT_ID: o Google Cloud projeto que contém a execução do pipeline.
  • LOCATION: a região em que a execução do pipeline está localizada. Para mais informações sobre as regiões em que a Vertex AI Pipelines está disponível, consulte o Guia de locais da Vertex AI.
  • PIPELINE_RUN_RESOURCE_NAME: o nome de recurso totalmente qualificado da execução de pipeline concluída, com falha ou cancelada que você quer executar novamente. Insira o nome do recurso no formato projects/PROJECT_NUMBER/locations/LOCATION/pipelineJobs/PIPELINE_RUN_ID, em que:

    • PROJECT_NUMBER: o número do projeto. Você pode localizar esse número do projeto no console Google Cloud . Para mais informações, consulte Encontrar o nome, o número e o ID do projeto.
    • PIPELINE_RUN_ID com o ID exclusivo da execução do pipeline que você quer executar novamente. O ID é exibido na guia Execuções da página Pipelines no console Google Cloud .
  • RERUN_TASK_NAME: o nome da tarefa a ser executada novamente com parâmetros atualizados.
  • SKIP_FAILED_TASK_NAME: o nome da tarefa com falha a ser ignorada durante a nova execução.
  • TASK_PARAMETER_1 e TASK_PARAMETER_2: os nomes dos parâmetros da tarefa que você quer substituir na nova execução do pipeline.
  • TASK_PARAMETER_1_VALUE e TASK_PARAMETER_2_VALUE: Os novos valores de TASK_PARAMETER_1 e TASK_PARAMETER_2 respectivamente na nova execução do pipeline.
  • PIPELINE_PARAMETER_VALUES: opcional. Os valores de parâmetros atualizados no nível da execução do pipeline a serem usados para a nova execução.
  • RERUN_PIPELINE_JOB_ID: opcional. Um ID exclusivo a ser atribuído ao novo job de nova execução do pipeline.