fromkfpimportcompilerfromkfpimportdsl# A simple component that prints and returns a greeting string@dsl.componentdefhello_world(message:str)-> str:greeting_str=f'Hello, {message}'print(greeting_str)returngreeting_str# A simple pipeline that contains a single hello_world task@dsl.pipeline(name='hello-world-scheduled-pipeline')defhello_world_scheduled_pipeline(greet_name:str):hello_world_task=hello_world(greet_name)# Compile the pipeline and generate a YAML filecompiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,package_path='hello_world_scheduled_pipeline.yaml')
importbase64importjsonfromgoogle.cloudimportaiplatformPROJECT_ID='your-project-id'# <---CHANGE THISREGION='your-region'# <---CHANGE THISPIPELINE_ROOT='your-cloud-storage-pipeline-root'# <---CHANGE THISdefsubscribe(event,context):"""Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """# decode the event payload stringpayload_message=base64.b64decode(event['data']).decode('utf-8')# parse payload string into JSON objectpayload_json=json.loads(payload_message)# trigger pipeline run with payloadtrigger_pipeline_run(payload_json)deftrigger_pipeline_run(payload_json):"""Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """pipeline_spec_uri=payload_json['pipeline_spec_uri']parameter_values=payload_json['parameter_values']# Create a PipelineJob using the compiled pipeline from pipeline_spec_uriaiplatform.init(project=PROJECT_ID,location=REGION,)job=aiplatform.PipelineJob(display_name='hello-world-pipeline-cloud-function-invocation',template_path=pipeline_spec_uri,pipeline_root=PIPELINE_ROOT,enable_caching=False,parameter_values=parameter_values)# Submit the PipelineJobjob.submit()
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-09-04。"],[],[],null,["# Trigger a pipeline run with Pub/Sub\n\nThis page shows you how to write, deploy, and trigger a pipeline run using an\n[Event-Driven Cloud Function](/functions/docs/writing#event-driven_functions) with a\n[Cloud Pub/Sub trigger](/functions/docs/calling/pubsub). Follow these steps:\n\n1. Define an ML pipeline using the Kubeflow Pipelines (KFP) SDK and compile it into a YAML file.\n\n2. Upload the compiled pipeline definition to a Cloud Storage bucket.\n\n3. Use Cloud Run functions to create, configure, and deploy a function that's\n triggered by a new or existing Pub/Sub topic.\n\n### Define and compile a pipeline\n\nUsing Kubeflow Pipelines SDK, build a scheduled pipeline and\ncompile it into a YAML file.\n\nSample `hello-world-scheduled-pipeline`: \n\n from kfp import compiler\n from kfp import dsl\n\n # A simple component that prints and returns a greeting string\n @dsl.component\n def hello_world(message: str) -\u003e str:\n greeting_str = f'Hello, {message}'\n print(greeting_str)\n return greeting_str\n\n # A simple pipeline that contains a single hello_world task\n @dsl.pipeline(\n name='hello-world-scheduled-pipeline')\n def hello_world_scheduled_pipeline(greet_name: str):\n hello_world_task = hello_world(greet_name)\n\n # Compile the pipeline and generate a YAML file\n compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,\n package_path='hello_world_scheduled_pipeline.yaml')\n\n### Upload compiled pipeline YAML to Cloud Storage bucket\n\n1. Open the Cloud Storage browser in the Google Cloud console. \n\n [Cloud Storage Browser](https://console.cloud.google.com/storage/browser/)\n2. Click the Cloud Storage bucket you created when you\n [configured your project](/vertex-ai/docs/pipelines/configure-project).\n\n3. Using either an existing folder or a new folder, upload your compiled\n pipeline YAML (in this example `hello_world_scheduled_pipeline.yaml`)\n to the selected folder.\n\n4. Click the uploaded YAML file to access the details. Copy the\n **gsutil URI** for later use.\n\n### Create a Cloud Run functions with a Pub/Sub trigger\n\n1. Visit the Cloud Run functions page in the console.\n\n [Go to the Cloud Run functions page](https://console.cloud.google.com/functions)\n2. Click the **Create function** button.\n\n3. In the **Basics** section, give your function a name (for\n example `my-scheduled-pipeline-function`).\n\n4. In the **Trigger** section, select **Cloud Pub/Sub** as the Trigger type.\n\n5. In the **Select a Cloud Pub/Sub topic** list, click **Create a topic**.\n\n6. In the **Create a topic** box, give your new topic a name\n (for example `my-scheduled-pipeline-topic`), and select **Create topic**.\n\n7. Leave all other fields as default and click **Save** to save the Trigger\n section configuration.\n\n8. Leave all other fields as default and click **Next** to proceed to the Code\n section.\n\n9. Under **Runtime** , select **Python 3.7**.\n\n10. In **Entry** point, input \"subscribe\" (the example code entry point\n function name).\n\n11. Under **Source code** , select **Inline Editor** if it's not already selected.\n\n12. In the `main.py` file, add in the following code:\n\n import base64\n import json\n from google.cloud import aiplatform\n\n PROJECT_ID = '\u003cvar translate=\"no\"\u003eyour-project-id\u003c/var\u003e' # \u003c---CHANGE THIS\n REGION = '\u003cvar translate=\"no\"\u003eyour-region\u003c/var\u003e' # \u003c---CHANGE THIS\n PIPELINE_ROOT = '\u003cvar translate=\"no\"\u003eyour-cloud-storage-pipeline-root\u003c/var\u003e' # \u003c---CHANGE THIS\n\n def subscribe(event, context):\n \"\"\"Triggered from a message on a Cloud Pub/Sub topic.\n Args:\n event (dict): Event payload.\n context (google.cloud.functions.Context): Metadata for the event.\n \"\"\"\n # decode the event payload string\n payload_message = base64.b64decode(event['data']).decode('utf-8')\n # parse payload string into JSON object\n payload_json = json.loads(payload_message)\n # trigger pipeline run with payload\n trigger_pipeline_run(payload_json)\n\n def trigger_pipeline_run(payload_json):\n \"\"\"Triggers a pipeline run\n Args:\n payload_json: expected in the following format:\n {\n \"pipeline_spec_uri\": \"\u003cpath-to-your-compiled-pipeline\u003e\",\n \"parameter_values\": {\n \"greet_name\": \"\u003cany-greet-string\u003e\"\n }\n }\n \"\"\"\n pipeline_spec_uri = payload_json['pipeline_spec_uri']\n parameter_values = payload_json['parameter_values']\n\n # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri\n aiplatform.https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.html(\n project=PROJECT_ID,\n location=REGION,\n )\n job = aiplatform.https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.types.PipelineJob.html(\n display_name='hello-world-pipeline-cloud-function-invocation',\n template_path=pipeline_spec_uri,\n pipeline_root=PIPELINE_ROOT,\n enable_caching=False,\n parameter_values=parameter_values\n )\n\n # Submit the PipelineJob\n job.submit()\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e: The Google Cloud project that this pipeline runs in.\n - \u003cvar translate=\"no\"\u003eREGION\u003c/var\u003e: The region that this pipeline runs in.\n - \u003cvar translate=\"no\"\u003ePIPELINE_ROOT\u003c/var\u003e: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored in the pipeline root.\n13. In the `requirements.txt` file, replace the contents with the following package\n requirements:\n\n google-api-python-client\u003e=1.7.8,\u003c2\n google-cloud-aiplatform\n\n14. Click **deploy** to deploy the Function.\n\nWhat's next\n-----------\n\n- Learn more about [Google Cloud Pub/Sub](/pubsub/docs).\n- [Visualize and analyze pipeline results](/vertex-ai/docs/pipelines/visualize-pipeline).\n- Learn how to [create triggers in Cloud Runfrom Pub/Sub events](/functions/docs/calling/pubsub).\n- To view code samples for using Pub/Sub, refer to the [Google Cloud sample browser](/docs/samples?p=pubsub)."]]