À l'aide du SDK Kubeflow Pipelines, créez un pipeline planifié et compilez-le dans un fichier YAML.
Échantillon hello-world-scheduled-pipeline :
fromkfpimportcompilerfromkfpimportdsl# A simple component that prints and returns a greeting string@dsl.componentdefhello_world(message:str)-> str:greeting_str=f'Hello,{message}'
print(greeting_str)returngreeting_str# A simple pipeline that contains a single hello_world task@dsl.pipeline(name='hello-world-scheduled-pipeline')defhello_world_scheduled_pipeline(greet_name:str):hello_world_task=hello_world(greet_name)# Compile the pipeline and generate a YAML filecompiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,package_path='hello_world_scheduled_pipeline.yaml')
Importer le fichier YAML du pipeline compilé dans un bucket Cloud Storage
Ouvrez le navigateur Cloud Storage dans Google Cloud Console.
À l'aide d'un dossier existant ou d'un nouveau dossier, importez le fichier YAML de votre pipeline compilé (dans cet exemple, hello_world_scheduled_pipeline.yaml) dans le dossier sélectionné.
Cliquez sur le fichier YAML importé pour accéder aux détails. Copiez l'URI gsutil pour une utilisation ultérieure.
Créer une fonction Cloud avec un déclencheur Pub/Sub
Accédez à la page Cloud Run Functions dans la console.
Dans la section Basics (Informations générales), attribuez un nom à votre fonction (par exemple, my-scheduled-pipeline-function).
Dans la section Trigger (Déclencheur), sélectionnez Cloud Pub/Sub comme type de déclencheur.
Dans la liste déroulante Sélectionner un sujet Cloud Pub/Sub, cliquez sur Créer un sujet.
Dans la zone Créer un sujet, attribuez un nom au nouveau sujet (par exemple, my-scheduled-pipeline-topic), puis sélectionnez Créer un sujet.
Laissez tous les autres champs par défaut et cliquez sur Save (Enregistrer) pour enregistrer la configuration de la section "Trigger" (Déclencheur).
Laissez tous les autres champs par défaut et cliquez sur Next (Suivant) pour passer à la section "Code".
Sous Environnement d'exécution, sélectionnez Python 3.7.
Dans Point d'entrée, saisissez "subscribe" (exemple de nom de fonction en tant que point d'entrée du code).
Sous Code source, sélectionnez Éditeur intégré, si ce n'est pas déjà fait.
Dans le fichier main.py, ajoutez le code suivant :
importbase64importjsonfromgoogle.cloudimportaiplatformPROJECT_ID= 'your-project-id' # <---CHANGE THISREGION= 'your-region' # <---CHANGE THISPIPELINE_ROOT= 'your-cloud-storage-pipeline-root' # <---CHANGE THISdefsubscribe(event,context):
"""TriggeredfromamessageonaCloudPub/Subtopic.Args:event(dict):Eventpayload.context(google.cloud.functions.Context):Metadatafortheevent.
"""
# decode the event payload stringpayload_message=base64.b64decode(event['data']).decode('utf-8')# parse payload string into JSON objectpayload_json=json.loads(payload_message)# trigger pipeline run with payloadtrigger_pipeline_run(payload_json)deftrigger_pipeline_run(payload_json):
"""TriggersapipelinerunArgs:payload_json:expectedinthefollowingformat:{
"pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
"parameter_values":{
"greet_name": "<any-greet-string>"
}}
"""
pipeline_spec_uri=payload_json['pipeline_spec_uri']parameter_values=payload_json['parameter_values']# Create a PipelineJob using the compiled pipeline from pipeline_spec_uriaiplatform.init(project=PROJECT_ID,location=REGION,)job=aiplatform.PipelineJob(display_name='hello-world-pipeline-cloud-function-invocation',template_path=pipeline_spec_uri,pipeline_root=PIPELINE_ROOT,enable_caching=False,parameter_values=parameter_values)# Submit the PipelineJobjob.submit()
Remplacez les éléments suivants :
PROJECT_ID: projet Google Cloud dans lequel ce pipeline s'exécute.
REGION: région dans laquelle ce pipeline s'exécute.
PIPELINE_ROOT : spécifiez un URI Cloud Storage auquel votre compte de service de pipelines peut accéder. Les artefacts des exécutions de votre pipeline sont stockés dans la racine du pipeline.
Dans le fichier requirements.txt, remplacez le contenu par les conditions suivantes du package :
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2024/11/29 (UTC).
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Difficile à comprendre","hardToUnderstand","thumb-down"],["Informations ou exemple de code incorrects","incorrectInformationOrSampleCode","thumb-down"],["Il n'y a pas l'information/les exemples dont j'ai besoin","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],["Dernière mise à jour le 2024/11/29 (UTC)."],[],[],null,["# Trigger a pipeline run with Pub/Sub\n\nThis page shows you how to write, deploy, and trigger a pipeline run using an\n[Event-Driven Cloud Function](/functions/docs/writing#event-driven_functions) with a\n[Cloud Pub/Sub trigger](/functions/docs/calling/pubsub). Follow these steps:\n\n1. Define an ML pipeline using the Kubeflow Pipelines (KFP) SDK and compile it into a YAML file.\n\n2. Upload the compiled pipeline definition to a Cloud Storage bucket.\n\n3. Use Cloud Run functions to create, configure, and deploy a function that's\n triggered by a new or existing Pub/Sub topic.\n\n### Define and compile a pipeline\n\nUsing Kubeflow Pipelines SDK, build a scheduled pipeline and\ncompile it into a YAML file.\n\nSample `hello-world-scheduled-pipeline`: \n\n from kfp import compiler\n from kfp import dsl\n\n # A simple component that prints and returns a greeting string\n @dsl.component\n def hello_world(message: str) -\u003e str:\n greeting_str = f'Hello, {message}'\n print(greeting_str)\n return greeting_str\n\n # A simple pipeline that contains a single hello_world task\n @dsl.pipeline(\n name='hello-world-scheduled-pipeline')\n def hello_world_scheduled_pipeline(greet_name: str):\n hello_world_task = hello_world(greet_name)\n\n # Compile the pipeline and generate a YAML file\n compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,\n package_path='hello_world_scheduled_pipeline.yaml')\n\n### Upload compiled pipeline YAML to Cloud Storage bucket\n\n1. Open the Cloud Storage browser in the Google Cloud console. \n\n [Cloud Storage Browser](https://console.cloud.google.com/storage/browser/)\n2. Click the Cloud Storage bucket you created when you\n [configured your project](/vertex-ai/docs/pipelines/configure-project).\n\n3. Using either an existing folder or a new folder, upload your compiled\n pipeline YAML (in this example `hello_world_scheduled_pipeline.yaml`)\n to the selected folder.\n\n4. Click the uploaded YAML file to access the details. Copy the\n **gsutil URI** for later use.\n\n### Create a Cloud Run functions with a Pub/Sub trigger\n\n1. Visit the Cloud Run functions page in the console.\n\n [Go to the Cloud Run functions page](https://console.cloud.google.com/functions)\n2. Click the **Create function** button.\n\n3. In the **Basics** section, give your function a name (for\n example `my-scheduled-pipeline-function`).\n\n4. In the **Trigger** section, select **Cloud Pub/Sub** as the Trigger type.\n\n5. In the **Select a Cloud Pub/Sub topic** list, click **Create a topic**.\n\n6. In the **Create a topic** box, give your new topic a name\n (for example `my-scheduled-pipeline-topic`), and select **Create topic**.\n\n7. Leave all other fields as default and click **Save** to save the Trigger\n section configuration.\n\n8. Leave all other fields as default and click **Next** to proceed to the Code\n section.\n\n9. Under **Runtime** , select **Python 3.7**.\n\n10. In **Entry** point, input \"subscribe\" (the example code entry point\n function name).\n\n11. Under **Source code** , select **Inline Editor** if it's not already selected.\n\n12. In the `main.py` file, add in the following code:\n\n import base64\n import json\n from google.cloud import aiplatform\n\n PROJECT_ID = '\u003cvar translate=\"no\"\u003eyour-project-id\u003c/var\u003e' # \u003c---CHANGE THIS\n REGION = '\u003cvar translate=\"no\"\u003eyour-region\u003c/var\u003e' # \u003c---CHANGE THIS\n PIPELINE_ROOT = '\u003cvar translate=\"no\"\u003eyour-cloud-storage-pipeline-root\u003c/var\u003e' # \u003c---CHANGE THIS\n\n def subscribe(event, context):\n \"\"\"Triggered from a message on a Cloud Pub/Sub topic.\n Args:\n event (dict): Event payload.\n context (google.cloud.functions.Context): Metadata for the event.\n \"\"\"\n # decode the event payload string\n payload_message = base64.b64decode(event['data']).decode('utf-8')\n # parse payload string into JSON object\n payload_json = json.loads(payload_message)\n # trigger pipeline run with payload\n trigger_pipeline_run(payload_json)\n\n def trigger_pipeline_run(payload_json):\n \"\"\"Triggers a pipeline run\n Args:\n payload_json: expected in the following format:\n {\n \"pipeline_spec_uri\": \"\u003cpath-to-your-compiled-pipeline\u003e\",\n \"parameter_values\": {\n \"greet_name\": \"\u003cany-greet-string\u003e\"\n }\n }\n \"\"\"\n pipeline_spec_uri = payload_json['pipeline_spec_uri']\n parameter_values = payload_json['parameter_values']\n\n # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri\n aiplatform.https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.html(\n project=PROJECT_ID,\n location=REGION,\n )\n job = aiplatform.https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.types.PipelineJob.html(\n display_name='hello-world-pipeline-cloud-function-invocation',\n template_path=pipeline_spec_uri,\n pipeline_root=PIPELINE_ROOT,\n enable_caching=False,\n parameter_values=parameter_values\n )\n\n # Submit the PipelineJob\n job.submit()\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e: The Google Cloud project that this pipeline runs in.\n - \u003cvar translate=\"no\"\u003eREGION\u003c/var\u003e: The region that this pipeline runs in.\n - \u003cvar translate=\"no\"\u003ePIPELINE_ROOT\u003c/var\u003e: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored in the pipeline root.\n13. In the `requirements.txt` file, replace the contents with the following package\n requirements:\n\n google-api-python-client\u003e=1.7.8,\u003c2\n google-cloud-aiplatform\n\n14. Click **deploy** to deploy the Function.\n\nWhat's next\n-----------\n\n- Learn more about [Google Cloud Pub/Sub](/pubsub/docs).\n- [Visualize and analyze pipeline results](/vertex-ai/docs/pipelines/visualize-pipeline).\n- Learn how to [create triggers in Cloud Runfrom Pub/Sub events](/functions/docs/calling/pubsub).\n- To view code samples for using Pub/Sub, refer to the [Google Cloud sample browser](/docs/samples?p=pubsub)."]]