Planifier l'exécution d'un pipeline avec Cloud Scheduler

Vous pouvez planifier l'exécution d'un pipeline précompilé, à l'aide de Cloud Scheduler, via une fonction Cloud basée sur des événements avec un déclencheur HTTP.

Créer et compiler un pipeline simple

À l'aide du SDK Kubeflow Pipelines, créez un pipeline planifié et compilez-le dans un fichier JSON.

Échantillon hello-world-scheduled-pipeline :

import json
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import component

# A simple component that prints and returns a greeting string
@component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a JSON file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.json')

Importer le fichier JSON du pipeline compilé dans un bucket Cloud Storage

  1. Ouvrez le navigateur Cloud Storage dans Google Cloud Console.

    Navigateur Cloud Storage

  2. Cliquez sur le bucket Cloud Storage que vous avez créé lors de la configuration de votre projet.

  3. À l'aide d'un dossier existant ou d'un nouveau dossier, importez le fichier JSON de votre pipeline compilé (dans cet exemple, hello_world_scheduled_pipeline.json) dans le dossier sélectionné.

  4. Cliquez sur le fichier JSON importé pour accéder aux détails. Copiez l'URI gsutil pour une utilisation ultérieure.

Créer une fonction Cloud avec un déclencheur HTTP

  1. Accédez à la page Cloud Functions de la console.

    Accéder à la page "Cloud Functions"

  2. Cliquez sur le bouton Créer une fonction.

  3. Dans la section Basics (Informations générales), attribuez un nom à votre fonction (par exemple, hello-world-scheduled-pipeline-function).

  4. Dans la section Trigger (Déclencheur), sélectionnez HTTP comme type de déclencheur.

    image illustrant la configuration de la création d'une fonction avec HTTP comme type de déclencheur

  5. Notez l'URL générée pour ce déclencheur et enregistrez-la pour une utilisation ultérieure.

  6. Laissez tous les autres champs par défaut et cliquez sur Save (Enregistrer) pour enregistrer la configuration de la section "Trigger" (Déclencheur).

  7. Laissez tous les autres champs par défaut et cliquez sur Next (Suivant) pour passer à la section "Code".

  8. Sous Environnement d'exécution, sélectionnez Python 3.7.

  9. Dans Point d'entrée, saisissez process_request (exemple de nom de fonction en tant que point d'entrée du code).

  10. Sous Code source, sélectionnez Éditeur intégré, si ce n'est pas déjà fait.

  11. Dans le fichier main.py, ajoutez le code suivant :

    import json
    from google.cloud import aiplatform
    
    PROJECT_ID = 'your-project-id'         # <---CHANGE THIS
    REGION = 'your-region'                 # <---CHANGE THIS
    PIPELINE_ROOT = 'your-cloud-storage-pipeline-root'   # <---CHANGE THIS
    
    def process_request(request):
       """Processes the incoming HTTP request.
    
       Args:
         request (flask.Request): HTTP request object.
    
       Returns:
         The response text or any set of values that can be turned into a Response
         object using `make_response
         <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
       """
    
       # decode http request payload and translate into JSON object
       request_str = request.data.decode('utf-8')
       request_json = json.loads(request_str)
    
       pipeline_spec_uri = request_json['pipeline_spec_uri']
       parameter_values = request_json['parameter_values']
    
       aiplatform.init(
           project=PROJECT_ID,
           location=REGION,
       )
    
       job = aiplatform.PipelineJob(
           display_name=f'hello-world-cloud-function-pipeline',
           template_path=pipeline_spec_uri,
           pipeline_root=PIPELINE_ROOT,
           enable_caching=False,
           parameter_values=parameter_values
       )
    
       job.submit()
       return "Job submitted"
    

    Remplacez les éléments suivants :

    • PROJECT_ID : projet Google Cloud dans lequel ce pipeline s'exécute.
    • REGION: région dans laquelle ce pipeline s'exécute.
    • PIPELINE_ROOT : spécifiez un URI Cloud Storage auquel votre compte de service de pipelines peut accéder. Les artefacts des exécutions de votre pipeline sont stockés dans la racine du pipeline.
  12. Dans le fichier requirements.txt, remplacez le contenu par les conditions suivantes du package :

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  13. Cliquez sur Déployer pour déployer la fonction.

Créer une tâche Cloud Scheduler

  1. Accédez à la page Cloud Scheduler de la console.

    Accéder à la page Cloud Scheduler

  2. Cliquez sur le bouton Créer une tâche.

  3. Attribuez un nom à votre tâche (par exemple, my-hello-world-schedule). Vous pouvez éventuellement ajouter une description.

  4. Dans le champ Fréquence, spécifiez la fréquence de votre tâche en utilisant le format unix-cron. Exemple :

    0 9 * * 1

    Cet exemple de planification Cron s'exécute tous les lundis à 09h00. Pour en savoir plus, consultez la page Configurer des planifications de tâches Cron.

  5. Sélectionnez votre fuseau horaire.

  6. Cliquez sur Continuer pour configurer l'exécution.

  7. Sélectionnez HTTP dans le menu "Type de cible".

  8. Dans le champ URL, saisissez l'URL du déclencheur que vous avez enregistrée à la section précédente.

  9. Sélectionnez POST comme méthode HTTP si ce n'est pas déjà fait.

  10. Dans la zone Body (Corps), saisissez une chaîne JSON au format défini dans le code. Utilisez l'exemple de la section précédente :

     {
       "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
       "parameter_values": {
         "greet_name": "<any-greet-string>"
       }
     }
    
    

    Image de la section &quot;Configurer les exécutions&quot; de la zone de création d&#39;une tâche

  11. Dans le menu déroulant de l'en-tête Auth, sélectionnez Add OIDC token (Ajouter un jeton OIDC).

  12. Dans la liste déroulante "Compte de service", sélectionnez un compte de service autorisé à appeler la fonction Cloud. Vous pouvez utiliser le compte de service par défaut de Compute Engine pour ce tutoriel.

  13. Conservez tous les autres champs par défaut, puis cliquez sur Créer.

Exécuter manuellement votre tâche (facultatif)

Vous pouvez également exécuter la tâche que vous venez de créer pour vérifier si elle est opérationnelle.

  1. Ouvrez la page Cloud Scheduler de la console.

  2. Cliquez sur le bouton Exécuter à côté du nom de votre tâche.

    La première exécution de la première tâche créée dans un projet peut prendre quelques minutes, étant donné que des opérations de configuration sont requises.

  3. Vous pouvez consulter l'état de la tâche dans la colonne Résultat.