Kubeflow Pipelines SDK를 사용하여 예약된 파이프라인을 빌드하고 이를 YAML 파일로 컴파일합니다.
샘플 hello-world-scheduled-pipeline:
fromkfpimportcompilerfromkfpimportdsl# A simple component that prints and returns a greeting string@dsl.componentdefhello_world(message:str)-> str:greeting_str=f'Hello, {message}'print(greeting_str)returngreeting_str# A simple pipeline that contains a single hello_world task@dsl.pipeline(name='hello-world-scheduled-pipeline')defhello_world_scheduled_pipeline(greet_name:str):hello_world_task=hello_world(greet_name)# Compile the pipeline and generate a YAML filecompiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,package_path='hello_world_scheduled_pipeline.yaml')
기본사항 섹션에서 함수 이름을 지정합니다(예: my-scheduled-pipeline-function).
트리거 섹션에서 트리거 유형으로 Cloud Pub/Sub를 선택합니다.
Cloud Pub/Sub 주제 선택 드롭다운에서 주제 만들기를 클릭합니다.
주제 만들기 상자에서 새 주제 이름을 지정하고(예: my-scheduled-pipeline-topic) 주제 만들기를 선택합니다.
다른 필드는 모두 기본값으로 두고 저장을 클릭하여 트리거 섹션 구성을 저장합니다.
다른 필드는 모두 기본값으로 두고 다음을 클릭하여 코드 섹션으로 이동합니다.
런타임에서 Python 3.7을 선택합니다.
진입점에서 'subscribe'(예시 코드 진입점 함수 이름)를 입력합니다.
아직 선택하지 않았으면 소스 코드 아래에서 인라인 편집기를 선택합니다.
main.py 파일에서 다음 코드를 추가합니다.
importbase64importjsonfromgoogle.cloudimportaiplatformPROJECT_ID='your-project-id'# <---CHANGE THISREGION='your-region'# <---CHANGE THISPIPELINE_ROOT='your-cloud-storage-pipeline-root'# <---CHANGE THISdefsubscribe(event,context):"""Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """# decode the event payload stringpayload_message=base64.b64decode(event['data']).decode('utf-8')# parse payload string into JSON objectpayload_json=json.loads(payload_message)# trigger pipeline run with payloadtrigger_pipeline_run(payload_json)deftrigger_pipeline_run(payload_json):"""Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """pipeline_spec_uri=payload_json['pipeline_spec_uri']parameter_values=payload_json['parameter_values']# Create a PipelineJob using the compiled pipeline from pipeline_spec_uriaiplatform.init(project=PROJECT_ID,location=REGION,)job=aiplatform.PipelineJob(display_name='hello-world-pipeline-cloud-function-invocation',template_path=pipeline_spec_uri,pipeline_root=PIPELINE_ROOT,enable_caching=False,parameter_values=parameter_values)# Submit the PipelineJobjob.submit()
다음을 바꿉니다.
PROJECT_ID: 이 파이프라인이 실행되는 Google Cloud 프로젝트입니다.
REGION: 이 파이프라인이 실행되는 리전입니다.
PIPELINE_ROOT: 파이프라인 서비스 계정이 액세스할 수 있는 Cloud Storage URI를 지정합니다. 파이프라인 실행의 아티팩트는 파이프라인 루트에 저장됩니다.
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["이해하기 어려움","hardToUnderstand","thumb-down"],["잘못된 정보 또는 샘플 코드","incorrectInformationOrSampleCode","thumb-down"],["필요한 정보/샘플이 없음","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],["최종 업데이트: 2025-09-04(UTC)"],[],[],null,["# Trigger a pipeline run with Pub/Sub\n\nThis page shows you how to write, deploy, and trigger a pipeline run using an\n[Event-Driven Cloud Function](/functions/docs/writing#event-driven_functions) with a\n[Cloud Pub/Sub trigger](/functions/docs/calling/pubsub). Follow these steps:\n\n1. Define an ML pipeline using the Kubeflow Pipelines (KFP) SDK and compile it into a YAML file.\n\n2. Upload the compiled pipeline definition to a Cloud Storage bucket.\n\n3. Use Cloud Run functions to create, configure, and deploy a function that's\n triggered by a new or existing Pub/Sub topic.\n\n### Define and compile a pipeline\n\nUsing Kubeflow Pipelines SDK, build a scheduled pipeline and\ncompile it into a YAML file.\n\nSample `hello-world-scheduled-pipeline`: \n\n from kfp import compiler\n from kfp import dsl\n\n # A simple component that prints and returns a greeting string\n @dsl.component\n def hello_world(message: str) -\u003e str:\n greeting_str = f'Hello, {message}'\n print(greeting_str)\n return greeting_str\n\n # A simple pipeline that contains a single hello_world task\n @dsl.pipeline(\n name='hello-world-scheduled-pipeline')\n def hello_world_scheduled_pipeline(greet_name: str):\n hello_world_task = hello_world(greet_name)\n\n # Compile the pipeline and generate a YAML file\n compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,\n package_path='hello_world_scheduled_pipeline.yaml')\n\n### Upload compiled pipeline YAML to Cloud Storage bucket\n\n1. Open the Cloud Storage browser in the Google Cloud console. \n\n [Cloud Storage Browser](https://console.cloud.google.com/storage/browser/)\n2. Click the Cloud Storage bucket you created when you\n [configured your project](/vertex-ai/docs/pipelines/configure-project).\n\n3. Using either an existing folder or a new folder, upload your compiled\n pipeline YAML (in this example `hello_world_scheduled_pipeline.yaml`)\n to the selected folder.\n\n4. Click the uploaded YAML file to access the details. Copy the\n **gsutil URI** for later use.\n\n### Create a Cloud Run functions with a Pub/Sub trigger\n\n1. Visit the Cloud Run functions page in the console.\n\n [Go to the Cloud Run functions page](https://console.cloud.google.com/functions)\n2. Click the **Create function** button.\n\n3. In the **Basics** section, give your function a name (for\n example `my-scheduled-pipeline-function`).\n\n4. In the **Trigger** section, select **Cloud Pub/Sub** as the Trigger type.\n\n5. In the **Select a Cloud Pub/Sub topic** list, click **Create a topic**.\n\n6. In the **Create a topic** box, give your new topic a name\n (for example `my-scheduled-pipeline-topic`), and select **Create topic**.\n\n7. Leave all other fields as default and click **Save** to save the Trigger\n section configuration.\n\n8. Leave all other fields as default and click **Next** to proceed to the Code\n section.\n\n9. Under **Runtime** , select **Python 3.7**.\n\n10. In **Entry** point, input \"subscribe\" (the example code entry point\n function name).\n\n11. Under **Source code** , select **Inline Editor** if it's not already selected.\n\n12. In the `main.py` file, add in the following code:\n\n import base64\n import json\n from google.cloud import aiplatform\n\n PROJECT_ID = '\u003cvar translate=\"no\"\u003eyour-project-id\u003c/var\u003e' # \u003c---CHANGE THIS\n REGION = '\u003cvar translate=\"no\"\u003eyour-region\u003c/var\u003e' # \u003c---CHANGE THIS\n PIPELINE_ROOT = '\u003cvar translate=\"no\"\u003eyour-cloud-storage-pipeline-root\u003c/var\u003e' # \u003c---CHANGE THIS\n\n def subscribe(event, context):\n \"\"\"Triggered from a message on a Cloud Pub/Sub topic.\n Args:\n event (dict): Event payload.\n context (google.cloud.functions.Context): Metadata for the event.\n \"\"\"\n # decode the event payload string\n payload_message = base64.b64decode(event['data']).decode('utf-8')\n # parse payload string into JSON object\n payload_json = json.loads(payload_message)\n # trigger pipeline run with payload\n trigger_pipeline_run(payload_json)\n\n def trigger_pipeline_run(payload_json):\n \"\"\"Triggers a pipeline run\n Args:\n payload_json: expected in the following format:\n {\n \"pipeline_spec_uri\": \"\u003cpath-to-your-compiled-pipeline\u003e\",\n \"parameter_values\": {\n \"greet_name\": \"\u003cany-greet-string\u003e\"\n }\n }\n \"\"\"\n pipeline_spec_uri = payload_json['pipeline_spec_uri']\n parameter_values = payload_json['parameter_values']\n\n # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri\n aiplatform.https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.html(\n project=PROJECT_ID,\n location=REGION,\n )\n job = aiplatform.https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.types.PipelineJob.html(\n display_name='hello-world-pipeline-cloud-function-invocation',\n template_path=pipeline_spec_uri,\n pipeline_root=PIPELINE_ROOT,\n enable_caching=False,\n parameter_values=parameter_values\n )\n\n # Submit the PipelineJob\n job.submit()\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e: The Google Cloud project that this pipeline runs in.\n - \u003cvar translate=\"no\"\u003eREGION\u003c/var\u003e: The region that this pipeline runs in.\n - \u003cvar translate=\"no\"\u003ePIPELINE_ROOT\u003c/var\u003e: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored in the pipeline root.\n13. In the `requirements.txt` file, replace the contents with the following package\n requirements:\n\n google-api-python-client\u003e=1.7.8,\u003c2\n google-cloud-aiplatform\n\n14. Click **deploy** to deploy the Function.\n\nWhat's next\n-----------\n\n- Learn more about [Google Cloud Pub/Sub](/pubsub/docs).\n- [Visualize and analyze pipeline results](/vertex-ai/docs/pipelines/visualize-pipeline).\n- Learn how to [create triggers in Cloud Runfrom Pub/Sub events](/functions/docs/calling/pubsub).\n- To view code samples for using Pub/Sub, refer to the [Google Cloud sample browser](/docs/samples?p=pubsub)."]]