Compila tus propios componentes de canalización

Es común que cuando ejecutas un componente, no solo desees ver el vínculo al trabajo de componente que se inicia, sino también el vínculo a los recursos subyacentes de la nube, como los trabajos de predicción por lotes de Vertex o los trabajos de Dataflow.

El proto gcp_resource es un parámetro especial que puedes usar en el componente para permitir que la consola de Google Cloud proporcione una vista personalizada de los registros y el estado del recurso en la consola de canalizaciones de Vertex AI.

Obtén el resultado del parámetro gcp_resource.

Usa un componente basado en contenedores

Primero, deberás definir el parámetro gcp_resource en el componente, como se muestra en el siguiente archivo component.py de ejemplo:

Python

Si deseas obtener información para instalar o actualizar el SDK de Python, consulta Instala el SDK de Vertex AI para Python. Si deseas obtener más información, consulta la documentación de referencia de la API de Python.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath

@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

A continuación, dentro del contenedor, instala el paquete de componentes de canalización de Google Cloud:

pip install --upgrade google-cloud-pipeline-components

A continuación, en el código de Python, define el recurso como un parámetro gcp_resource:

Python

Si deseas obtener información para instalar o actualizar el SDK de Python, consulta Instala el SDK de Vertex AI para Python. Si deseas obtener más información, consulta la documentación de referencia de la API de Python.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Usa un componente de Python

De manera alternativa, puedes mostrar el parámetro de salida gcp_resources como lo harías con cualquier parámetro de salida de string:

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.13.1'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

Valores admitidos de resource_type:

Puedes configurar resource_type para que sea una string arbitraria, pero solo los siguientes tipos tienen vínculos en la consola de Google Cloud:

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

Escribe un componente para cancelar los recursos subyacentes

Cuando se cancela un trabajo de canalización, el comportamiento predeterminado es que los recursos subyacentes de Google Cloud se sigan ejecutando. No se cancelan de forma automática. Para cambiar este comportamiento, debes adjuntar un controlador SIGTERM al trabajo de canalización. Un buen lugar para hacerlo es justo antes de un bucle de sondeo para un trabajo que podría ejecutarse durante mucho tiempo.

La cancelación se implementó en varios componentes de la canalización de Google Cloud, incluidos los siguientes:

  • Trabajo de predicción por lotes
  • Trabajo de BigQuery ML
  • Trabajo personalizado
  • Trabajo por lotes sin servidores de Dataproc
  • Trabajos de ajuste de hiperparámetros

Para obtener más información, incluido un código de muestra que indica cómo adjuntar un controlador de SIGTERM, consulta los siguientes vínculos de GitHub:

Ten en cuenta lo siguiente cuando implementes tu controlador de SIGTERM:

  • La propagación de cancelación funciona solo después de que el componente haya estado en ejecución durante unos minutos. Por lo general, esto se debe a las tareas de inicio en segundo plano que deben procesarse antes de que se llame a los controladores de señales de Python.
  • Es posible que algunos recursos de Google Cloud no tengan implementadas la cancelación. Por ejemplo, crear o borrar un extremo o modelo de Vertex AI podría crear una operación de larga duración que acepte una solicitud de cancelación a través de su API de REST, pero no implementa la operación de cancelación en sí.