Créer vos propres composants de pipeline

Lors de l'exécution d'un composant, il est courant de voir non seulement le lien vers la tâche de composant lancée, mais également le lien vers les ressources cloud sous-jacentes, telles que les tâches de prédiction par lot Vertex ou les tâches Dataflow.

Le proto gcp_resource est un paramètre spécial que vous pouvez utiliser dans votre composant pour permettre à Google Cloud Console d'offrir une vue personnalisée des journaux et de l'état de la ressource dans la console Vertex AI Pipelines.

Générer le paramètre gcp_resource

Utiliser un composant basé sur un conteneur

Tout d'abord, vous devez définir le paramètre gcp_resource dans votre composant, comme indiqué dans cet exemple de fichier component.py :

Python

Pour savoir comment installer ou mettre à jour le SDK pour Python, consultez la page Installer le SDK Vertex AI pour Python. Pour en savoir plus, consultez la documentation de référence de l'API Python.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath

@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

Ensuite, à l'intérieur du conteneur, installez le package des composants du pipeline Google Cloud :

pip install --upgrade google-cloud-pipeline-components

Ensuite, dans le code Python, définissez la ressource en tant que paramètre gcp_resource :

Python

Pour savoir comment installer ou mettre à jour le SDK pour Python, consultez la page Installer le SDK Vertex AI pour Python. Pour en savoir plus, consultez la documentation de référence de l'API Python.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Utiliser un composant Python

Vous pouvez également renvoyer le paramètre de sortie gcp_resources comme vous le feriez pour n'importe quel paramètre de sortie de chaîne :

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.13.1'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

Valeurs resource_type compatibles

Vous pouvez définir resource_type sur une chaîne arbitraire, mais seuls les types suivants possèdent des liens dans Google Cloud Console :

  • BatchPredictionJob
  • BigQueryJob
  • Tâche personnalisée
  • DataflowJob
  • Tâche de réglage des hyperparamètres

Écrire un composant pour annuler les ressources sous-jacentes

Lorsqu'une tâche de pipeline est annulée, le comportement par défaut consiste à maintenir l'exécution des ressources Google Cloud sous-jacentes. Elles ne sont pas automatiquement annulées. Pour modifier ce comportement, vous devez associer un gestionnaire SIGTERM à la tâche de pipeline. Il est recommandé d'effectuer cette opération juste avant une boucle d'interrogation pour une tâche susceptible de s'exécuter sur une longue période.

L'annulation a été mise en œuvre dans plusieurs composants du pipeline Google Cloud, y compris :

  • Tâche de prédiction par lot
  • Tâche BigQuery ML
  • Custom job
  • Job par lot Dataproc sans serveur
  • Tâche de réglage des hyperparamètres

Pour en savoir plus et obtenir un exemple de code montrant comment associer un gestionnaire SIGTERM, consultez les liens GitHub suivants :

Tenez compte des points suivants lorsque vous mettez en œuvre votre gestionnaire SIGTERM :

  • La propagation des annulations ne fonctionne qu'après quelques minutes d'exécution du composant. Cela est généralement dû à des tâches de démarrage en arrière-plan qui doivent être traitées avant l'appel des gestionnaires de signaux Python.
  • L'annulation peut ne pas être mise en œuvre pour certaines ressources Google Cloud. Par exemple, la création ou la suppression d'un point de terminaison ou d'un modèle Vertex AI peut créer une opération de longue durée qui accepte une demande d'annulation via son API REST, mais ne met pas en œuvre l'opération d'annulation proprement dite.