I seguenti esempi di codice mostrano come scrivere, eseguire il deployment e attivare una pipeline utilizzando una funzione Cloud basata su eventi con un trigger Cloud Pub/Sub.
Creazione e compilazione di una semplice pipeline
Utilizzando l'SDK Kubeflow Pipelines, crea una pipeline pianificata e lo compili in un file YAML.
Esempio hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Carica YAML pipeline compilata nel bucket Cloud Storage
Apri il browser Cloud Storage nella console Google Cloud.
Fai clic sul bucket Cloud Storage che hai creato quando hai configurato il progetto.
Utilizzando una cartella esistente o una nuova, carica la cartella compilata YAML pipeline (in questo esempio
hello_world_scheduled_pipeline.yaml
) alla cartella selezionata.Fai clic sul file YAML caricato per accedere ai dettagli. Copia l'URI gsutil per utilizzarlo in un secondo momento.
Crea una Cloud Function con trigger Pub/Sub
Vai alla pagina Funzioni Cloud Run nella console.
Fai clic sul pulsante Crea funzione.
Nella sezione Nozioni di base, assegna un nome alla funzione (ad esempio
my-scheduled-pipeline-function
).Nella sezione Trigger, seleziona Cloud Pub/Sub come tipo di trigger.
Nel menu a discesa Seleziona un argomento Cloud Pub/Sub, fai clic su Crea un argomento.
Nella casella Crea un argomento, assegna un nome al nuovo argomento (ad esempio
my-scheduled-pipeline-topic
) e seleziona Crea argomento.Lascia invariati i valori predefiniti degli altri campi e fai clic su Salva per salvare la configurazione della sezione Trigger.
Lascia invariati gli altri campi predefiniti e fai clic su Avanti per passare al codice. .
In Runtime, seleziona Python 3.7.
In Punto di ingresso, inserisci "subscribe" (nome della funzione del punto di ingresso del codice di esempio).
In Codice sorgente, seleziona Editor incorporato, se non è già selezionato.
Nel file
main.py
, aggiungi il seguente codice:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Sostituisci quanto segue:
- PROJECT_ID: il progetto Google Cloud in cui viene eseguita questa pipeline.
- REGION: la regione in cui viene eseguita questa pipeline.
- PIPELINE_ROOT: specifica un URI Cloud Storage che le tue pipeline con un account di servizio. Gli artefatti delle esecuzioni della pipeline vengono memorizzati nella directory principale della pipeline.
Nel file
requirements.txt
, sostituisci i contenuti con il seguente pacchetto requisiti:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Fai clic su deploy per eseguire il deployment della funzione.