Crear tus propios componentes de la canalización

Es habitual que, al ejecutar un componente, no solo quieras ver el enlace al trabajo del componente que se está lanzando, sino también el enlace a los recursos de la nube subyacentes, como los trabajos de predicción por lotes de Vertex o los trabajos de Dataflow.

El gcp_resource proto es un parámetro especial que puedes usar en tu componente para permitir que la consola de Google Cloud proporcione una vista personalizada de los registros y el estado del recurso en la consola de Vertex AI Pipelines.

Generar el parámetro gcp_resource

Usar un componente basado en contenedores

Primero, debes definir el parámetro gcp_resource en tu componente, tal como se muestra en el siguiente archivo component.py de ejemplo:

Python

Para saber cómo instalar o actualizar el SDK de Vertex AI para Python, consulta Instalar el SDK de Vertex AI para Python. Para obtener más información, consulta la documentación de referencia de la API Python.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath


@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

A continuación, dentro del contenedor, instala el paquete Google Cloud Pipeline Components:

pip install --upgrade google-cloud-pipeline-components

A continuación, en el código Python, define el recurso como un parámetro gcp_resource:

Python

Para saber cómo instalar o actualizar el SDK de Vertex AI para Python, consulta Instalar el SDK de Vertex AI para Python. Para obtener más información, consulta la documentación de referencia de la API Python.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Usar un componente de Python

También puedes devolver el parámetro de salida gcp_resources como cualquier otro parámetro de salida de cadena:

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.19.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

Valores resource_type admitidos

Puede definir resource_type como una cadena arbitraria, pero solo los siguientes tipos tienen enlaces en la consola Google Cloud :

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

Escribir un componente para cancelar los recursos subyacentes

Cuando se cancela un trabajo de una canalización, el comportamiento predeterminado es que los recursos de Google Cloud subyacentes sigan ejecutándose. No se cancelan automáticamente. Para cambiar este comportamiento, debe adjuntar un controlador SIGTERM al trabajo de la canalización. Un buen lugar para hacerlo es justo antes de un bucle de sondeo de un trabajo que podría ejecutarse durante mucho tiempo.

La cancelación se ha implementado en varios componentes de la canalización, entre los que se incluyen los siguientes: Google Cloud

  • Tarea de predicción por lotes
  • Tarea de BigQuery ML
  • Trabajo personalizado
  • Tarea por lotes de Dataproc Serverless
  • Tarea de ajuste de hiperparámetros

Para obtener más información, incluido un código de ejemplo que muestra cómo adjuntar un controlador SIGTERM, consulta los siguientes enlaces de GitHub:

Ten en cuenta lo siguiente al implementar el controlador SIGTERM:

  • La propagación de la cancelación solo funciona después de que el componente se haya ejecutado durante unos minutos. Esto suele deberse a tareas de inicio en segundo plano que deben procesarse antes de que se llamen los controladores de señales de Python.
  • Es posible que no se haya implementado la cancelación de algunos Google Cloud recursos. Por ejemplo, al crear o eliminar un endpoint o un modelo de Vertex AI, se podría crear una operación de larga duración que acepte una solicitud de cancelación a través de su API REST, pero que no implemente la operación de cancelación en sí.