자체 파이프라인 구성요소 빌드

사용자는 구성요소를 실행할 때 실행 중인 구성요소 작업에 대한 링크뿐만 아니라 Vertex 일괄 예측 작업 또는 Dataflow 작업과 같은 기본 클라우드 리소스에 대한 링크도 확인하려고 하는 것이 일반적입니다.

gcp_resource proto는 구성요소에서 Google Cloud Console이 Vertex AI Pipelines 콘솔에서 리소스의 로그 및 상태에 대한 커스텀 보기를 제공하도록 사용 설정하는 특수한 매개변수입니다.

gcp_resource 매개변수 출력

컨테이너 기반 구성요소 사용

먼저 component.py 예시 파일에 표시된 대로 구성요소에 gcp_resource 매개변수를 정의해야 합니다.

Python용 Vertex AI SDK

Python용 Vertex AI SDK를 설치하거나 업데이트하는 방법은 Python용 Vertex AI SDK 설치를 참조하세요. 자세한 내용은 Python용 Vertex AI SDK API 참조 문서를 확인하세요.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath


@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

그런 다음 컨테이너 내에서 Google Cloud 파이프라인 구성요소 패키지를 설치합니다.

pip install --upgrade google-cloud-pipeline-components

다음으로 Python 코드에서 리소스를 gcp_resource 매개변수로 정의합니다.

Python용 Vertex AI SDK

Python용 Vertex AI SDK를 설치하거나 업데이트하는 방법은 Python용 Vertex AI SDK 설치를 참조하세요. 자세한 내용은 Python용 Vertex AI SDK API 참조 문서를 확인하세요.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Python 구성요소 사용

또는 문자열 출력 매개변수와 같이 gcp_resources 출력 매개변수를 반환할 수 있습니다.

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.19.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

지원되는 resource_type

resource_type을 임의의 문자열로 설정할 수 있지만 Google Cloud 콘솔에는 다음 유형만 링크가 있습니다.

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

기본 리소스를 취소하는 구성요소 작성

파이프라인 작업이 취소될 때 기본 동작은 기본 Google Cloud 리소스가 계속 실행되는 것입니다. 이러한 리소스는 자동으로 취소되지 않습니다. 이 동작을 변경하려면 SIGTERM 핸들러를 파이프라인 작업에 연결해야 합니다. 이를 수행하기 위한 적합한 위치는 장시간 실행될 수 있는 작업의 폴링 루프 바로 앞입니다.

취소는 다음을 포함한 여러 Google Cloud 파이프라인 구성요소에서 구현되었습니다.

  • 일괄 예측 작업
  • BigQuery ML 작업
  • 커스텀 작업
  • Dataproc 서버리스 일괄 작업
  • 초매개변수 조정 작업

SIGTERM 핸들러를 연결하는 방법을 보여주는 샘플 코드를 포함한 상세 설명은 다음 GitHub 링크를 참조하세요.

SIGTERM 핸들러를 구현할 때는 다음을 고려하세요.

  • 취소는 구성요소가 몇 분 동안 실행된 후에야 전파됩니다. 일반적으로 이는 Python 신호 핸들러가 호출되기 전에 처리해야 하는 백그라운드 시작 태스크로 인한 것입니다.
  • 일부 Google Cloud 리소스에는 최소가 구현되지 않았을 수 있습니다. 예를 들어 Vertex AI 엔드포인트 또는 모델을 만들거나 삭제하면 REST API를 통해 취소 요청을 수락하는 장기 실행 작업이 생성되지만 취소 작업 자체가 구현되지 않습니다.