Activa una ejecución de canalización con Cloud Pub/Sub

En las siguientes muestras de código, se muestra cómo escribir, implementar y activar una canalización mediante una función de Cloud Functions controlada por eventos con un activador de Cloud Pub/Sub.

Crea y compila una canalización simple

Con el SDK de Kubeflow Pipelines, crea una canalización programada y compílala en un archivo YAML.

Muestra de hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Sube el archivo YAML de la canalización compilada al bucket de Cloud Storage

  1. Abre el navegador de Cloud Storage en la consola de Google Cloud.

    Navegador de Cloud Storage

  2. Haz clic en el bucket de Cloud Storage que creaste cuando configuraste tu proyecto.

  3. Usa una carpeta existente o una nueva para subir el archivo YAML de la canalización compilado (en este ejemplo, hello_world_scheduled_pipeline.yaml).

  4. Haz clic en el archivo YAML subido para acceder a los detalles. Copia el URI de gsutil para usarlo más adelante.

Crea una función de Cloud Functions con un activador de Pub/Sub

  1. Visita la página de las funciones de Cloud Run en la consola.

    Ir a la página de las funciones de Cloud Run

  2. Haz clic en el botón Create function (Crear función).

  3. En la sección Conceptos básicos, asígnale un nombre a tu función (por ejemplo, my-scheduled-pipeline-function).

  4. En la sección Activador, selecciona Cloud Pub/Sub como el tipo de activador.

    imagen de la configuración de creación de una función: elegir pubsub como tipo de activador

  5. En el menú desplegable Selecciona un tema de Cloud Pub/Sub, haz clic en Crear un tema.

  6. En el cuadro Crear un tema, asigna un nombre al tema nuevo (por ejemplo, my-scheduled-pipeline-topic) y selecciona Crear tema.

  7. Deja los demás campos como predeterminados y haz clic en Guardar para guardar la configuración de la sección Activador.

  8. Deja todos los otros campos como predeterminados y haz clic en Siguiente para ir a la sección Código.

  9. En Entorno de ejecución, selecciona Python 3.7.

  10. En el punto de Entrada, ingresa “subscribe” (el nombre de la función de punto de entrada del código de ejemplo).

  11. En Código fuente, selecciona Editor intercalado si aún no está seleccionado.

  12. En el archivo main.py, agrega el siguiente código:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el proyecto de Google Cloud en el que se ejecuta esta canalización.
    • REGION: Es la región en la que se ejecuta esta canalización.
    • PIPELINE_ROOT: Especifica un URI de Cloud Storage al que pueda acceder la cuenta de servicio de tus canalizaciones. Los artefactos de las ejecuciones de tus canalizaciones se almacenan en la raíz de la canalización.
  13. En el archivo requirements.txt, reemplaza el contenido por los siguientes requisitos de paquete:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Haz clic en implementar para implementar la función.