Membangun komponen pipeline Anda sendiri

Umumnya, pada saat menjalankan komponen, Anda tidak hanya ingin melihat link ke tugas komponen yang diluncurkan, tetapi juga link ke resource cloud yang mendasarinya, seperti tugas prediksi batch Vertex atau tugas dataflow.

Proto gcp_resource adalah parameter khusus yang dapat Anda gunakan di dalam komponen untuk memungkinkan Konsol Google Cloud memberikan tampilan yang disesuaikan dari log dan status resource di dalam konsol Vertex AI Pipelines.

Menampilkan parameter gcp_resource

Menggunakan komponen berbasis container

Pertama, Anda harus menentukan parameter gcp_resource di dalam komponen, seperti yang ditunjukkan dalam contoh file component.py berikut ini:

Python

Untuk mempelajari cara menginstal atau mengupdate Vertex AI SDK untuk Python, lihat Menginstal Vertex AI SDK untuk Python. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Python API.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath

@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

Selanjutnya, di dalam container, instal paket Pipeline Components Google Cloud:

pip install --upgrade google-cloud-pipeline-components

Selanjutnya, dalam kode Python, tentukan resource sebagai parameter gcp_resource:

Python

Untuk mempelajari cara menginstal atau mengupdate Vertex AI SDK untuk Python, lihat Menginstal Vertex AI SDK untuk Python. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Python API.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Menggunakan komponen Python

Atau, Anda dapat menampilkan parameter output gcp_resources seperti yang biasa Anda lakukan terhadap parameter output string:

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.13.1'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

Nilai resource_type yang didukung

Anda dapat menetapkan resource_type sebagai string arbitrer, namun hanya jenis berikut ini yang memiliki link di dalam Konsol Google Cloud:

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

Menulis komponen untuk membatalkan resource yang mendasarinya

Saat tugas pipeline dibatalkan, perilaku default-nya adalah agar resource Google Cloud yang mendasarinya tetap berjalan. Tugas tersebut tidak dibatalkan secara otomatis. Untuk mengubah perilaku ini, Anda harus memasang pengendali SIGTERM ke tugas pipeline tersebut. Tempat yang baik untuk melakukan ini adalah sebelum terjadinya loop polling untuk tugas yang dapat berjalan untuk waktu yang lama.

Pembatalan telah diimplementasikan pada beberapa Pipeline Components Google Cloud, termasuk:

  • Tugas prediksi batch
  • Tugas ML BigQuery
  • Tugas kustom
  • Tugas batch Dataproc Serverless
  • Tugas penyesuaian hyperparameter

Untuk informasi selengkapnya, termasuk kode contoh yang menunjukkan cara untuk memasang pengendali SIGTERM, lihat link GitHub berikut ini:

Pertimbangkan hal berikut ini saat menerapkan pengendali SIGTERM:

  • Penerapan pembatalan hanya berfungsi setelah komponennya berjalan selama beberapa menit. Hal ini biasanya disebabkan oleh tugas startup latar belakang yang perlu diproses sebelum pengendali sinyal Python dipanggil.
  • Beberapa resource Google Cloud mungkin tidak menerapkan pembatalan. Misalnya, membuat atau menghapus Endpoint atau Model Vertex AI dapat membuat operasi yang berjalan lama yang menerima permintaan pembatalan melalui REST API-nya, tetapi tidak menerapkan operasi pembatalan itu sendiri.