Crear una canalización para el entrenamiento continuo de modelos

En este documento se describen los pasos necesarios para crear una canalización que entrene automáticamente un modelo personalizado de forma periódica o cuando se inserten datos nuevos en el conjunto de datos mediante Vertex AI Pipelines y funciones de Cloud Run.

Objetivos

Estos son los pasos que debes seguir:

  1. Adquirir y preparar un conjunto de datos en BigQuery.

  2. Crea y sube un paquete de entrenamiento personalizado. Cuando se ejecuta, lee los datos del conjunto de datos y entrena el modelo.

  3. Crea un flujo de procesamiento de Vertex AI. Esta canalización ejecuta el paquete de entrenamiento personalizado, sube el modelo al registro de modelos de Vertex AI, ejecuta el trabajo de evaluación y envía una notificación por correo electrónico.

  4. Ejecuta el flujo de trabajo manualmente.

  5. Crea una función de Cloud con un activador de Eventarc que ejecute el flujo cada vez que se inserten datos nuevos en el conjunto de datos de BigQuery.

Antes de empezar

Configura tu proyecto y tu cuaderno.

Configuración del proyecto

  1. In the Google Cloud console, go to the project selector page.

    Go to project selector

  2. Select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
  3. Verify that billing is enabled for your Google Cloud project.

Crear cuaderno

Usamos un cuaderno de Colab Enterprise para ejecutar parte del código de este tutorial.

  1. Si no eres el propietario del proyecto, pídele a uno que te conceda los roles de gestión de identidades y accesos roles/resourcemanager.projectIamAdmin y roles/aiplatform.colabEnterpriseUser.

    Necesitas estos roles para usar Colab Enterprise y para conceder roles y permisos de gestión de identidades y accesos a ti mismo y a las cuentas de servicio.

    Ir a IAM

  2. En la Google Cloud consola, ve a la página Cuadernos de Colab Enterprise.

    Colab Enterprise te pedirá que habilites las siguientes APIs obligatorias si aún no lo has hecho.

    • API de Vertex AI
    • API de Dataform
    • API de Compute Engine

    Ir a Colab Enterprise

  3. En el menú Región, selecciona la región en la que quieras crear tu cuaderno. Si no lo tienes claro, usa us-central1 como región.

    Usa la misma región para todos los recursos de este tutorial.

  4. Haz clic en Crear un cuaderno.

El nuevo cuaderno aparecerá en la pestaña Mis cuadernos. Para ejecutar código en tu cuaderno, añade una celda de código y haz clic en el botón  Ejecutar celda.

Configurar el entorno de desarrollo

  1. En tu cuaderno, instala los siguientes paquetes de Python 3.

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. Para definir el proyecto de Google Cloud CLI, ejecuta el siguiente comando:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    Sustituye PROJECT_ID por el ID del proyecto. Si es necesario, puede encontrar el ID de su proyecto en la Google Cloud consola.

  3. Concede roles a tu cuenta de Google:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. Habilita las siguientes APIs

    • API de Artifact Registry
    • API de BigQuery
    • API de Cloud Build
    • Cloud Functions API
    • API de registro en la nube
    • Pub/Sub API
    • API Admin de Cloud Run
    • API de Cloud Storage
    • API de Eventarc
    • API de Uso de Servicio
    • API de Vertex AI
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. Asigna roles a las cuentas de servicio de tu proyecto:

    1. Ver los nombres de tus cuentas de servicio

      ! gcloud iam service-accounts list
      

      Anota el nombre de tu agente de servicio de Compute. Debe tener el formato xxxxxxxx-compute@developer.gserviceaccount.com.

    2. Asigna los roles necesarios al agente de servicio.

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
      

Adquirir y preparar un conjunto de datos

En este tutorial, crearás un modelo que predice la tarifa de un viaje en taxi en función de características como el tiempo, la ubicación y la distancia del trayecto. Usaremos datos del conjunto de datos público Viajes en taxi de Chicago. Este conjunto de datos incluye viajes en taxi desde el 2013 hasta la actualidad, que se han comunicado a la ciudad de Chicago en su calidad de agencia reguladora. Para proteger la privacidad de los conductores y los usuarios del taxi al mismo tiempo y permitir que el agregador analice los datos, el ID del taxi se mantiene constante para cualquier número de licencia de taxi, pero no muestra el número. Además, los tramos censales se suprimen en algunos casos y las horas se redondean a los 15 minutos más cercanos.

Para obtener más información, consulta Viajes en taxi de Chicago en Marketplace.

Crear un conjunto de datos de BigQuery

  1. En la Google Cloud consola, ve a BigQuery Studio.

    Ir a BigQuery

  2. En el panel Explorador, busque su proyecto, haga clic en Acciones y, a continuación, en Crear conjunto de datos.

  3. En la página Crear conjunto de datos, haz lo siguiente:

    • En ID del conjunto de datos, introduce mlops. Para obtener más información, consulta Nombres de conjuntos de datos.

    • En Tipo de ubicación, elige tu multirregión. Por ejemplo, elige EE. UU. (varias regiones de Estados Unidos) si usas us-central1. Una vez creado el conjunto de datos, la ubicación no se puede cambiar.

    • Haz clic en Crear conjunto de datos.

Para obtener más información, consulta cómo crear conjuntos de datos.

Crear y rellenar una tabla de BigQuery

En esta sección, creará la tabla e importará un año de datos del conjunto de datos público al conjunto de datos de su proyecto.

  1. Ir a BigQuery Studio

    Ir a BigQuery

  2. Haz clic en Crear consulta de SQL y ejecuta la siguiente consulta de SQL haciendo clic en Ejecutar.

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta crea la tabla <PROJECT_ID>.mlops.chicago y la rellena con datos de la tabla pública bigquery-public-data.chicago_taxi_trips.taxi_trips.

  3. Para ver el esquema de la tabla, haga clic en Ir a tabla y, a continuación, en la pestaña Esquema.

  4. Para ver el contenido de la tabla, haz clic en la pestaña Vista previa.

Crear y subir el paquete de entrenamiento personalizado

En esta sección, crearás un paquete de Python que contenga el código que lee el conjunto de datos, divide los datos en conjuntos de entrenamiento y de prueba, y entrena tu modelo personalizado. El paquete se ejecutará como una de las tareas de tu flujo de trabajo. Para obtener más información, consulta el artículo sobre cómo crear una aplicación de entrenamiento de Python para un contenedor prediseñado.

Crear el paquete de entrenamiento personalizado

  1. En tu cuaderno de Colab, crea carpetas principales para la aplicación de entrenamiento:

    !mkdir -p training_package/trainer
    
  2. Crea un archivo __init__.py en cada carpeta para convertirla en un paquete con el siguiente comando:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    Puedes ver los nuevos archivos y carpetas en el panel de la carpeta Archivos.

  3. En el panel Archivos, crea un archivo llamado task.py en la carpeta training_package/trainer con el siguiente contenido.

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.joblib"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    El código realiza las siguientes tareas:

    1. Selección de funciones.
    2. Transformar la hora de recogida y entrega de los datos de UTC a la hora local de Chicago.
    3. Extraer la fecha, la hora, el día de la semana, el mes y el año de la fecha y hora de recogida.
    4. Calcula la duración del viaje usando la hora de inicio y la de finalización.
    5. Identificar y marcar los trayectos que han empezado o finalizado en un aeropuerto en función de las zonas comunitarias.
    6. El modelo de regresión de bosque aleatorio se entrena para predecir la tarifa del viaje en taxi mediante el framework scikit-learn.
    7. El modelo entrenado se guarda en un archivo pickle model.joblib.

      El enfoque y la ingeniería de funciones seleccionados se basan en la exploración y el análisis de datos de Predicción de la tarifa de los taxis de Chicago.

  4. En el panel Archivos, crea un archivo llamado setup.py en la carpeta training_package con el siguiente contenido.

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. En tu cuaderno, ejecuta setup.py para crear la distribución de origen de tu aplicación de entrenamiento:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

Al final de esta sección, el panel Archivos debe contener los siguientes archivos y carpetas en training-package.

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

Sube el paquete de entrenamiento personalizado a Cloud Storage

  1. Crea un segmento de Cloud Storage.

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
    

    Sustituye los siguientes valores de los parámetros:

    • REGION: elige la misma región que al crear el cuaderno de Colab.

    • BUCKET_NAME: el nombre del segmento.

  2. Sube tu paquete de entrenamiento al depósito de Cloud Storage.

    # Copy the training package to the bucket
    ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

Crear un flujo de procesamiento

Un flujo de procesamiento es una descripción de un flujo de trabajo de MLOps como un gráfico de pasos denominado tareas de flujo de procesamiento.

En esta sección, se definen las tareas de la canalización, se compilan en YAML y se registra la canalización en Artifact Registry para que se pueda controlar la versión y ejecutar varias veces, ya sea por un solo usuario o por varios.

Aquí se muestra una visualización de las tareas de nuestra canalización, que incluyen el entrenamiento del modelo, la subida del modelo, la evaluación del modelo y la notificación por correo electrónico:

Visualización del flujo de procesamiento

Para obtener más información, consulta cómo crear plantillas de canalizaciones.

Definir constantes e inicializar clientes

  1. En tu cuaderno, define las constantes que se usarán en pasos posteriores:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    Sustituye NOTIFY_EMAIL por una dirección de correo. Cuando se completa el trabajo de la canalización, tanto si se ha completado correctamente como si no, se envía un correo a esa dirección.

  2. Inicializa el SDK de Vertex AI con el proyecto, el bucket de almacenamiento provisional, la ubicación y el experimento.

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

Definir las tareas del flujo de procesamiento

En tu cuaderno, define el flujo de procesamiento custom_model_training_evaluation_pipeline:

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

Tu flujo de trabajo consta de un gráfico de tareas que utiliza los siguientes Google Cloud componentes de flujo de trabajo:

Compilar el flujo de procesamiento

Compila la canalización con el compilador de Kubeflow Pipelines (KFP) en un archivo YAML que contenga una representación hermética de tu canalización.

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

Deberías ver un archivo YAML llamado vertex-pipeline-datatrigger-tutorial.yaml en tu directorio de trabajo.

Subir la canalización como plantilla

  1. Crea un repositorio de tipo KFP en Artifact Registry.

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. Sube la canalización compilada al repositorio.

    from kfp.registry import RegistryClient
    
    host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. En la consola Google Cloud , comprueba que tu plantilla aparece en Plantillas de canalización.

    Ir a Plantillas de canalizaciones

Ejecutar el flujo de trabajo manualmente

Para asegurarte de que la canalización funciona, ejecútala manualmente.

  1. En el cuaderno, especifica los parámetros necesarios para ejecutar la canalización como un trabajo.

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. Crea y ejecuta un trabajo de flujo de procesamiento.

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    La tarea tarda unos 30 minutos en completarse.

  3. En la consola, debería ver una nueva ejecución de la canalización en la página Canalizaciones:

    Ir a Ejecuciones de flujo de procesamiento

  4. Una vez que se haya completado la ejecución de la canalización, deberías ver un modelo nuevo llamado taxifare-prediction-model o una nueva versión del modelo en el registro de modelos de Vertex AI:

    Ir a Registro de modelos

  5. También debería ver una nueva tarea de predicción por lotes:

    Ir a Predicciones por lotes

Ejecutar automáticamente el flujo de trabajo

Hay dos formas de ejecutar la canalización automáticamente: según una programación o cuando se insertan datos nuevos en el conjunto de datos.

Ejecutar la canalización según una programación

  1. En tu cuaderno, llama a PipelineJob.create_schedule.

    job_schedule = job.create_schedule(
      display_name="mlops tutorial schedule",
      cron="0 0 1 * *", #
      max_concurrent_run_count=1,
      max_run_count=12,
    )
    

    La expresión cron programa el trabajo para que se ejecute el día 1 de cada mes a las 00:00 (UTC).

    En este tutorial, no queremos que se ejecuten varios trabajos simultáneamente, por lo que asignamos el valor 1 a max_concurrent_run_count.

  2. En la consola Google Cloud , comprueba que tu schedule aparezca en Programaciones de procesos.

    Ir a Programaciones de pipelines

Ejecutar la canalización cuando haya datos nuevos

Crear una función con un activador de Eventarc

Crea una función de Cloud (2.ª gen.) que ejecute la canalización cada vez que se inserten datos nuevos en la tabla de BigQuery.

En concreto, usamos Eventarc para activar la función cada vez que se produce un evento google.cloud.bigquery.v2.JobService.InsertJob. A continuación, la función ejecuta la plantilla de flujo de procesamiento.

Para obtener más información, consulta Desencadenadores de Eventarc y tipos de eventos admitidos.

  1. En la Google Cloud consola, ve a las funciones de Cloud Run.

    Ir a Cloud Run Functions

  2. Haz clic en el botón Crear función. En la página Configuración:

    1. Selecciona 2.ª gen. como entorno.

    2. En Nombre de la función, usa mlops.

    3. En Región, selecciona la misma región que tu segmento de Cloud Storage y tu repositorio de Artifact Registry.

    4. En Activador, seleccione Otro activador. Se abrirá el panel Eventarc Trigger.

      1. En Trigger Type (Tipo de activador), elija Google Sources (Fuentes de Google).

      2. En Proveedor de eventos, elija BigQuery.

      3. En Tipo de evento, elija google.cloud.bigquery.v2.JobService.InsertJob.

      4. En Recurso, elija Recurso específico y especifique la tabla de BigQuery.

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. En el campo Región, selecciona una ubicación para el activador de Eventarc, si procede. Para obtener más información, consulta Ubicación del activador.

      6. Haz clic en Guardar activador.

    5. Si se te pide que concedas roles a cuentas de servicio, haz clic en Conceder todo.

  3. Haz clic en Siguiente para ir a la página Código. En la página Código:

    1. Define Runtime (Tiempo de ejecución) en Python 3.12.

    2. Define Punto de entrada como mlops_entrypoint.

    3. Con el editor insertado, abre el archivo main.py y sustituye el contenido por lo siguiente:

      Sustituye PROJECT_ID, REGION y BUCKET_NAME por los valores que has usado antes.

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. Abre el archivo requirements.txt y sustituye el contenido por lo siguiente:

      requests==2.31.0
      google-auth==2.25.1
      
  4. Haga clic en Desplegar para desplegar la función.

Insertar datos para activar el flujo de procesamiento

  1. En la Google Cloud consola, ve a BigQuery Studio.

    Ir a BigQuery

  2. Haz clic en Crear consulta de SQL y ejecuta la siguiente consulta de SQL haciendo clic en Ejecutar.

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta de SQL para insertar filas nuevas en la tabla.

  3. Para verificar si se ha activado el evento, busca pipeline trigger condition met en el registro de tu función.

    Ir a Cloud Run Functions

  4. Si la función se activa correctamente, deberías ver una nueva ejecución de la canalización en Vertex AI Pipelines. La tarea de la canalización tarda unos 30 minutos en completarse.

    Ir a Vertex AI Pipelines

Limpieza

Para eliminar todos los recursos utilizados en este proyecto, puedes eliminar el proyecto Google Cloud que has usado en el tutorial. Google Cloud

De lo contrario, puedes eliminar los recursos que hayas creado para este tutorial.

  1. Elimina el cuaderno de Colab Enterprise.

    Ir a Colab Enterprise

  2. Elimina el conjunto de datos en BigQuery.

    Ir a BigQuery

  3. Elimina el segmento de Cloud Storage.

    Ir a Cloud Storage

  4. Elimina el modelo de la siguiente manera:

    1. En la sección Vertex AI, ve a la página Registro de modelos.

      Ve a la página Registro de modelos.

    2. Junto al nombre de tu modelo, haz clic en el menú Acciones y elige Eliminar modelo.

  5. Elimina las ejecuciones del flujo de procesamiento:

    1. Ve a la página Ejecuciones de la canalización.

      Ir a Ejecuciones de flujos de procesamiento

    2. Junto al nombre de cada ejecución de la canalización, haz clic en el menú Acciones y elige Eliminar ejecución de la canalización.

  6. Elimina las tareas de entrenamiento personalizadas:

    1. Ir a Tareas de entrenamiento personalizadas

    2. Junto al nombre de cada trabajo de entrenamiento personalizado, haz clic en el menú Acciones y elige Eliminar trabajo de entrenamiento personalizado.

  7. Elimina las tareas de predicción por lotes de la siguiente manera:

    1. Ir a la página Predicciones por lotes

    2. Junto al nombre de cada trabajo de predicción por lotes, haga clic en el menú Acciones y elija Eliminar trabajo de predicción por lotes.

  8. Elimina el repositorio de Artifact Registry.

    Ir a Artifact Registry

  9. Elimina la función de Cloud.

    Ir a Cloud Run Functions