Programa la ejecución de la canalización con Cloud Scheduler

Puedes programar la ejecución de una canalización precompilada con Cloud Scheduler mediante el uso de Cloud Functions controlada por eventos con un activador HTTP.

Crea y compila una canalización simple

Con el SDK de Kubeflow Pipelines, crea una canalización programada y compílala en un archivo YAML.

Muestra de hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Sube el archivo YAML de la canalización compilada al bucket de Cloud Storage

  1. Abre el navegador de Cloud Storage en la consola de Google Cloud.

    Navegador de Cloud Storage

  2. Haz clic en el bucket de Cloud Storage que creaste cuando configuraste tu proyecto.

  3. Usa una carpeta existente o una nueva para subir el archivo YAML de la canalización compilado (en este ejemplo, hello_world_scheduled_pipeline.yaml).

  4. Haz clic en el archivo YAML subido para acceder a los detalles. Copia el URI de gsutil para usarlo más adelante.

Crea una función de Cloud Functions con un activador HTTP

  1. Visita la página de Cloud Functions en la consola.

    Ir a la página Cloud Functions

  2. Haz clic en el botón Create function (Crear función).

  3. En la sección Conceptos básicos, asígnale un nombre a tu función (por ejemplo, hello-world-scheduled-pipeline-function).

  4. En la sección Activador, selecciona HTTP como el tipo de activador.

    imagen de la configuración de creación de una función: elegir HTTP como tipo de activador

  5. Toma nota de la URL generada para este activador y guárdala para usarla en la siguiente sección.

  6. Deja los demás campos como predeterminados y haz clic en Guardar para guardar la configuración de la sección Activador.

  7. Deja todos los otros campos como predeterminados y haz clic en Siguiente para ir a la sección Código.

  8. En Entorno de ejecución, selecciona Python 3.7.

  9. En el Punto de entrada, ingresa process_request (el nombre de la función de punto de entrada del código de ejemplo).

  10. En Código fuente, selecciona Editor intercalado si aún no está seleccionado.

  11. En el archivo main.py, agrega el siguiente código:

    import json
    from google.cloud import aiplatform
    
    PROJECT_ID = 'your-project-id'         # <---CHANGE THIS
    REGION = 'your-region'                 # <---CHANGE THIS
    PIPELINE_ROOT = 'your-cloud-storage-pipeline-root'   # <---CHANGE THIS
    
    def process_request(request):
       """Processes the incoming HTTP request.
    
       Args:
         request (flask.Request): HTTP request object.
    
       Returns:
         The response text or any set of values that can be turned into a Response
         object using `make_response
         <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
       """
    
       # decode http request payload and translate into JSON object
       request_str = request.data.decode('utf-8')
       request_json = json.loads(request_str)
    
       pipeline_spec_uri = request_json['pipeline_spec_uri']
       parameter_values = request_json['parameter_values']
    
       aiplatform.init(
           project=PROJECT_ID,
           location=REGION,
       )
    
       job = aiplatform.PipelineJob(
           display_name=f'hello-world-cloud-function-pipeline',
           template_path=pipeline_spec_uri,
           pipeline_root=PIPELINE_ROOT,
           enable_caching=False,
           parameter_values=parameter_values
       )
    
       job.submit()
       return "Job submitted"
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el proyecto de Google Cloud en el que se ejecuta esta canalización.
    • REGION: Es la región en la que se ejecuta esta canalización.
    • PIPELINE_ROOT: Especifica un URI de Cloud Storage al que pueda acceder la cuenta de servicio de tus canalizaciones. Los artefactos de las ejecuciones de tus canalizaciones se almacenan en la raíz de la canalización.
  12. En el archivo requirements.txt, reemplaza el contenido por los siguientes requisitos de paquete:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  13. Haz clic en implementar para implementar la función.

Crea un trabajo de Cloud Scheduler

  1. Visita la página de Cloud Scheduler en la consola.

    Ir a la página Cloud Scheduler

  2. Haz clic en el botón Crear trabajo.

  3. Asigna un nombre a tu trabajo (por ejemplo, my-hello-world-schedule) y, de forma opcional, agrega una descripción.

  4. Especifica la frecuencia de tu trabajo usando el formato unix-cron. Por ejemplo:

    0 9 * * 1

    Este programa cron de ejemplo se ejecuta todos los lunes a las 9:00 a.m. Para obtener más información, consulta Configura programas de trabajos cron.

  5. Selecciona tu zona horaria.

  6. Haz clic en Continuar para configurar la ejecución.

  7. Selecciona HTTP en el menú Tipo de objetivo.

  8. En el campo URL, ingresa la URL del activador que guardaste en la sección anterior.

  9. Selecciona POST como método HTTP si aún no está seleccionado.

  10. En el cuadro Cuerpo (Body), ingresa una string JSON en el formato definido en el código. Con el ejemplo de la sección anterior:

     {
       "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
       "parameter_values": {
         "greet_name": "<any-greet-string>"
       }
     }
    
    

    Configura la sección de ejecuciones de la imagen para crear un cuadro de trabajo

  11. En el menú desplegable Auth, selecciona Agregar token OIDC.

  12. En el menú desplegable Cuenta de servicio, selecciona una cuenta de servicio que tenga permiso para invocar la función de Cloud Functions. Puedes usar la cuenta de servicio predeterminada de Compute Engine para este instructivo.

  13. Deja todos los otros campos con la configuración predeterminada y haz clic en Crear.

Ejecuta tu trabajo de forma manual (opcional)

De forma opcional, puedes ejecutar el trabajo que acabas de crear para verificar su funcionalidad.

  1. Abre la página de la consola de Cloud Scheduler.

  2. Haz clic en el botón Ejecutar ahora junto al nombre de tu trabajo.

    El primer trabajo creado en un proyecto puede tardar unos minutos en ejecutarse cuando se invoca por primera vez debido a la configuración necesaria.

  3. Puedes ver el estado del trabajo en la columna Resultado.