Cloud Pub/Sub로 파이프라인 실행 트리거

다음 코드 샘플은 Cloud Pub/Sub 트리거와 함께 이벤트 기반 Cloud 함수를 사용하여 파이프라인을 작성, 배포, 트리거하는 방법을 보여줍니다.

간단한 파이프라인 빌드 및 컴파일

Kubeflow Pipelines SDK를 사용하여 예약된 파이프라인을 빌드하고 이를 YAML 파일로 컴파일합니다.

샘플 hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

컴파일된 파이프라인 YAML을 Cloud Storage 버킷에 업로드

  1. Google Cloud 콘솔에서 Cloud Storage 브라우저를 엽니다.

    Cloud Storage 브라우저

  2. 프로젝트를 구성할 때 만든 Cloud Storage 버킷을 클릭합니다.

  3. 기존 폴더 또는 새 폴더를 사용하여 컴파일된 파이프라인 YAML(이 예시에서는 hello_world_scheduled_pipeline.yaml)을 선택한 폴더에 업로드합니다.

  4. 업로드된 YAML 파일을 클릭하여 세부정보에 액세스합니다. 나중에 사용할 수 있도록 gsutil URI를 복사합니다.

Pub/Sub 트리거로 Cloud 함수 만들기

  1. Console에서 Cloud Functions 페이지로 이동합니다.

    Cloud Functions 페이지로 이동

  2. 함수 만들기 버튼을 클릭합니다.

  3. 기본사항 섹션에서 함수 이름을 지정합니다(예: my-scheduled-pipeline-function).

  4. 트리거 섹션에서 트리거 유형으로 Cloud Pub/Sub를 선택합니다.

    함수 구성 만들기, 트리거 유형으로 pubsub 선택 이미지

  5. Cloud Pub/Sub 주제 선택 드롭다운에서 주제 만들기를 클릭합니다.

  6. 주제 만들기 상자에서 새 주제 이름을 지정하고(예: my-scheduled-pipeline-topic) 주제 만들기를 선택합니다.

  7. 다른 필드는 모두 기본값으로 두고 저장을 클릭하여 트리거 섹션 구성을 저장합니다.

  8. 다른 필드는 모두 기본값으로 두고 다음을 클릭하여 코드 섹션으로 이동합니다.

  9. 런타임에서 Python 3.7을 선택합니다.

  10. 진입점에서 'subscribe'(예시 코드 진입점 함수 이름)를 입력합니다.

  11. 아직 선택하지 않았으면 소스 코드 아래에서 인라인 편집기를 선택합니다.

  12. main.py 파일에서 다음 코드를 추가합니다.

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    다음을 바꿉니다.

    • PROJECT_ID: 이 파이프라인이 실행되는 Google Cloud 프로젝트입니다.
    • REGION: 이 파이프라인이 실행되는 리전입니다.
    • PIPELINE_ROOT: 파이프라인 서비스 계정이 액세스할 수 있는 Cloud Storage URI를 지정합니다. 파이프라인 실행의 아티팩트는 파이프라인 루트에 저장됩니다.
  13. requirements.txt 파일에서 콘텐츠를 다음 패키지 요구사항으로 바꿉니다.

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. 배포를 클릭하여 함수를 배포합니다.