En las siguientes muestras de código, se muestra cómo escribir, implementar y activar una canalización mediante una función de Cloud Functions controlada por eventos con un activador de Cloud Pub/Sub.
Crea y compila una canalización simple
Con el SDK de Kubeflow Pipelines, crea una canalización programada y compílala en un archivo YAML.
Muestra de hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Sube el archivo YAML de la canalización compilada al bucket de Cloud Storage
Abre el navegador de Cloud Storage en la consola de Google Cloud.
Haz clic en el bucket de Cloud Storage que creaste cuando configuraste tu proyecto.
Usa una carpeta existente o una nueva para subir el archivo YAML de la canalización compilado (en este ejemplo,
hello_world_scheduled_pipeline.yaml
).Haz clic en el archivo YAML subido para acceder a los detalles. Copia el URI de gsutil para usarlo más adelante.
Crea una función de Cloud Functions con un activador de Pub/Sub
Visita la página de las funciones de Cloud Run en la consola.
Haz clic en el botón Create function (Crear función).
En la sección Conceptos básicos, asígnale un nombre a tu función (por ejemplo,
my-scheduled-pipeline-function
).En la sección Activador, selecciona Cloud Pub/Sub como el tipo de activador.
En el menú desplegable Selecciona un tema de Cloud Pub/Sub, haz clic en Crear un tema.
En el cuadro Crear un tema, asigna un nombre al tema nuevo (por ejemplo,
my-scheduled-pipeline-topic
) y selecciona Crear tema.Deja los demás campos como predeterminados y haz clic en Guardar para guardar la configuración de la sección Activador.
Deja todos los otros campos como predeterminados y haz clic en Siguiente para ir a la sección Código.
En Entorno de ejecución, selecciona Python 3.7.
En el punto de Entrada, ingresa “subscribe” (el nombre de la función de punto de entrada del código de ejemplo).
En Código fuente, selecciona Editor intercalado si aún no está seleccionado.
En el archivo
main.py
, agrega el siguiente código:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Reemplaza lo siguiente:
- PROJECT_ID: Es el proyecto de Google Cloud en el que se ejecuta esta canalización.
- REGION: Es la región en la que se ejecuta esta canalización.
- PIPELINE_ROOT: Especifica un URI de Cloud Storage al que pueda acceder la cuenta de servicio de tus canalizaciones. Los artefactos de las ejecuciones de tus canalizaciones se almacenan en la raíz de la canalización.
En el archivo
requirements.txt
, reemplaza el contenido por los siguientes requisitos de paquete:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Haz clic en implementar para implementar la función.