Attivazione dell'esecuzione di una pipeline con Cloud Pub/Sub

I seguenti esempi di codice mostrano come scrivere, eseguire il deployment e attivare una pipeline utilizzando una funzione Cloud basata su eventi con un trigger Cloud Pub/Sub.

Creazione e compilazione di una semplice pipeline

Utilizzando l'SDK Kubeflow Pipelines, crea una pipeline pianificata e lo compili in un file YAML.

Esempio hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Carica YAML pipeline compilata nel bucket Cloud Storage

  1. Apri il browser Cloud Storage nella console Google Cloud.

    Browser Cloud Storage

  2. Fai clic sul bucket Cloud Storage che hai creato quando hai configurato il progetto.

  3. Utilizzando una cartella esistente o una nuova, carica la cartella compilata YAML pipeline (in questo esempio hello_world_scheduled_pipeline.yaml) alla cartella selezionata.

  4. Fai clic sul file YAML caricato per accedere ai dettagli. Copia l'URI gsutil per utilizzarlo in un secondo momento.

Crea una Cloud Function con trigger Pub/Sub

  1. Vai alla pagina Funzioni Cloud Run nella console.

    Vai alla pagina delle funzioni di Cloud Run

  2. Fai clic sul pulsante Crea funzione.

  3. Nella sezione Nozioni di base, assegna un nome alla funzione (ad esempio my-scheduled-pipeline-function).

  4. Nella sezione Trigger, seleziona Cloud Pub/Sub come tipo di trigger.

    immagine di configurazione della funzione per la scelta di Pub/Sub come tipo di attivatore

  5. Nel menu a discesa Seleziona un argomento Cloud Pub/Sub, fai clic su Crea un argomento.

  6. Nella casella Crea un argomento, assegna un nome al nuovo argomento (ad esempio my-scheduled-pipeline-topic) e seleziona Crea argomento.

  7. Lascia invariati i valori predefiniti degli altri campi e fai clic su Salva per salvare la configurazione della sezione Trigger.

  8. Lascia invariati gli altri campi predefiniti e fai clic su Avanti per passare al codice. .

  9. In Runtime, seleziona Python 3.7.

  10. In Punto di ingresso, inserisci "subscribe" (nome della funzione del punto di ingresso del codice di esempio).

  11. In Codice sorgente, seleziona Editor incorporato, se non è già selezionato.

  12. Nel file main.py, aggiungi il seguente codice:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto Google Cloud in cui viene eseguita questa pipeline.
    • REGION: la regione in cui viene eseguita questa pipeline.
    • PIPELINE_ROOT: specifica un URI Cloud Storage che le tue pipeline con un account di servizio. Gli artefatti delle esecuzioni della pipeline vengono memorizzati nella directory principale della pipeline.
  13. Nel file requirements.txt, sostituisci i contenuti con il seguente pacchetto requisiti:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Fai clic su deploy per eseguire il deployment della funzione.