자체 파이프라인 구성요소 빌드

사용자는 구성요소를 실행할 때 실행 중인 구성요소 작업에 대한 링크뿐만 아니라 Vertex 일괄 예측 작업 또는 Dataflow 작업과 같은 기본 클라우드 리소스에 대한 링크도 확인하려고 하는 것이 일반적입니다.

gcp_resource proto는 구성요소에서 Google Cloud Console이 Vertex AI Pipelines 콘솔에서 리소스의 로그 및 상태에 대한 커스텀 보기를 제공하도록 사용 설정하는 특수한 매개변수입니다.

gcp_resource 매개변수 출력

컨테이너 기반 구성요소 사용

먼저 component.py 예시 파일에 표시된 대로 구성요소에 gcp_resource 매개변수를 정의해야 합니다.

Python

Python용 Vertex AI SDK를 설치하거나 업데이트하는 방법은 Python용 Vertex AI SDK 설치를 참조하세요. 자세한 내용은 Python API 참고 문서를 확인하세요.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath

@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

그런 후 컨테이너 내에서 Google Cloud 파이프라인 구성요소 패키지를 설치합니다.

pip install --upgrade google-cloud-pipeline-components

다음으로 Python 코드에서 리소스를 gcp_resource 매개변수로 정의합니다.

Python

Python용 Vertex AI SDK를 설치하거나 업데이트하는 방법은 Python용 Vertex AI SDK 설치를 참조하세요. 자세한 내용은 Python API 참고 문서를 확인하세요.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Python 구성요소 사용

또는 문자열 출력 매개변수와 같이 gcp_resources 출력 매개변수를 반환할 수 있습니다.

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.14.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

지원되는 resource_type

resource_type을 임의의 문자열로 설정할 수 있지만 Google Cloud Console에는 다음 유형만 링크가 있습니다.

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

기본 리소스를 취소하는 구성요소 작성

파이프라인 작업이 취소될 때 기본 동작은 기본 Google Cloud 리소스가 계속 실행되는 것입니다. 이러한 리소스는 자동으로 취소되지 않습니다. 이 동작을 변경하려면 SIGTERM 핸들러를 파이프라인 작업에 연결해야 합니다. 이를 수행하기 위한 적합한 위치는 장시간 실행될 수 있는 작업의 폴링 루프 바로 앞입니다.

취소는 다음을 포함한 여러 Google Cloud 파이프라인 구성요소에서 구현되었습니다.

  • 일괄 예측 작업
  • BigQuery ML 작업
  • 커스텀 작업
  • Dataproc 서버리스 일괄 작업
  • 초매개변수 조정 작업

SIGTERM 핸들러를 연결하는 방법을 보여주는 샘플 코드를 포함한 상세 설명은 다음 GitHub 링크를 참조하세요.

SIGTERM 핸들러를 구현할 때는 다음을 고려하세요.

  • 취소는 구성요소가 몇 분 동안 실행된 후에야 전파됩니다. 일반적으로 이는 Python 신호 핸들러가 호출되기 전에 처리해야 하는 백그라운드 시작 태스크로 인한 것입니다.
  • 일부 Google Cloud 리소스에는 최소가 구현되지 않았을 수 있습니다. 예를 들어 Vertex AI 엔드포인트 또는 모델을 만들거나 삭제하면 REST API를 통해 취소 요청을 수락하는 장기 실행 작업이 생성되지만 취소 작업 자체가 구현되지 않습니다.