Eigene Pipeline-Komponenten erstellen

Es ist üblich, dass Sie beim Ausführen einer Komponente nicht nur den Link zum gestarteten Komponentenjob sehen möchten, sondern auch den Link zu den zugrunde liegenden Cloud-Ressourcen wie die Vertex-Batchvorhersagejobs oder Dataflow-Jobs.

Der gcp_resource-Proto ist ein spezieller Parameter, mit dem Sie in Ihrer Komponente die Google Cloud Console aktivieren können, um eine benutzerdefinierte Ansicht der Logs und des Status der Ressource in der Vertex AI Pipelines-Konsole bereitzustellen.

Parameter gcp_resource ausgeben

Containerbasierte Komponente verwenden

Zuerst müssen Sie den Parameter gcp_resource in Ihrer Komponente definieren, wie in der folgenden component.py-Beispieldatei gezeigt:

Python

Informationen zum Installieren oder Aktualisieren von Python finden Sie unter Vertex AI SDK für Python installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath

@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

Installieren Sie als Nächstes im Container das Paket der Google Cloud-Pipeline-Komponenten:

pip install --upgrade google-cloud-pipeline-components

Definieren Sie als Nächstes im Python-Code die Ressource als gcp_resource-Parameter:

Python

Informationen zum Installieren oder Aktualisieren von Python finden Sie unter Vertex AI SDK für Python installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Python-Komponente verwenden

Alternativ können Sie den Ausgabeparameter gcp_resources wie einen beliebigen Stringausgabeparameter zurückgeben:

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.13.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

Unterstützte resource_type-Werte

Sie können den resource_type als beliebigen String festlegen. Nur die folgenden Typen haben in der Google Cloud Console jedoch Links:

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

Komponente schreiben, um die zugrunde liegenden Ressourcen abzubrechen

Wenn ein Pipelinejob abgebrochen wird, werden die zugrunde liegenden Google Cloud-Ressourcen standardmäßig weiter ausgeführt. Sie werden nicht automatisch abgebrochen. Um dieses Verhalten zu ändern, sollten Sie an den Pipelinejob einen SIGTERM-Handler anhängen. Dies empfiehlt sich unmittelbar vor der Abfrageschleife für einen Job, die über einen längeren Zeitraum ausgeführt werden kann.

Das Abbrechen wurde für mehrere Google Cloud-Pipeline-Komponenten implementiert, darunter:

  • Batchvorhersagejob
  • BigQuery ML-Job
  • Benutzerdefinierter Job
  • Serverloser Dataproc-Batchjob
  • Hyperparameter-Abstimmungsjob

Weitere Informationen und Beispielcode zum Anhängen eines SIGTERM-Handlers finden Sie unter den folgenden GitHub-Links:

Beachten Sie bei der Implementierung des SIGTERM-Handlers Folgendes:

  • Die Abbruchfortführung funktioniert nur, nachdem die Komponente einige Minuten lang ausgeführt wurde. Dies ist in der Regel darauf zurückzuführen, dass Startaufgaben im Hintergrund verarbeitet werden müssen, bevor die Python-Signal-Handler aufgerufen werden.
  • Bei einigen Google Cloud-Ressourcen ist möglicherweise kein Abbruch implementiert. Beispielsweise kann das Erstellen oder Löschen eines Vertex AI-Endpunkts oder -Modells einen Vorgang mit langer Ausführungszeit erstellen, der eine Abbruchanfrage über seine REST API akzeptiert, aber den Abbruchvorgang nicht implementiert.