Criar os próprios componentes de pipeline

É comum que, ao executar um componente, você queira ver não apenas o link para o job do componente que está sendo iniciado, mas também o link para os recursos de nuvem subjacentes, como os jobs de previsão em lote do Vertex ou jobs do Dataflow.

O proto do gcp_resource é um parâmetro especial que pode ser usado no componente para permitir que o Console do Google Cloud forneça uma visualização personalizada dos registros e do status do recurso no console do Vertex AI Pipelines.

Gerar o parâmetro gcp_resource

Como usar um componente baseado em contêiner

Primeiro, é necessário definir o parâmetro gcp_resource no seu componente, conforme mostrado no seguinte arquivo component.py de exemplo:

Python

Para saber como instalar o SDK da Vertex AI para Python, consulte Instalar o SDK da Vertex AI para Python. Para mais informações, consulte a documentação de referência da API Python.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath

@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

Em seguida, dentro do contêiner, instale o pacote Componentes do Pipeline do Google Cloud:

pip install --upgrade google-cloud-pipeline-components

Em seguida, no código Python, defina o recurso como um parâmetro gcp_resource:

Python

Para saber como instalar o SDK da Vertex AI para Python, consulte Instalar o SDK da Vertex AI para Python. Para mais informações, consulte a documentação de referência da API Python.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Como usar um componente para Python

Como alternativa, é possível retornar o parâmetro de saída gcp_resources como qualquer parâmetro de saída de string:

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.14.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

resource_type: valores aceitos:

É possível definir o resource_type como uma string arbitrária, mas apenas os seguintes tipos têm links no Console do Google Cloud:

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

Escrever um componente para cancelar os recursos subjacentes

Quando um job de pipeline é cancelado, o comportamento padrão é que os recursos subjacentes do Google Cloud continuem em execução. Elas não são canceladas automaticamente. Para alterar esse comportamento, anexe um gerenciador SIGTERM ao job do pipeline. Um bom lugar para fazer isso é antes de um loop de pesquisa para um job que pode ser executado por muito tempo.

O cancelamento foi implementado em vários componentes do Google Cloud Pipeline, incluindo:

  • Job de previsão em lote
  • Job do BigQuery ML
  • Job personalizado
  • Job em lote do Dataproc sem servidor
  • Job de ajuste de hiperparâmetros

Para mais informações, incluindo exemplos de código que mostram como anexar um gerenciador SIGTERM, consulte os seguintes links do GitHub:

Considere o seguinte ao implementar seu gerenciador SIGTERM:

  • A propagação de cancelamento só funciona depois que o componente estiver em execução por alguns minutos. Geralmente, isso ocorre devido a tarefas de inicialização em segundo plano que precisam ser processadas antes de os gerenciadores de sinal do Python serem chamados.
  • Alguns recursos do Google Cloud podem não ter o cancelamento implementado. Por exemplo, criar ou excluir um endpoint ou modelo do Vertex AI pode criar uma operação de longa duração que aceite uma solicitação de cancelamento pela API REST, mas não implemente a operação de cancelamento em si.