Compila una canalización para el entrenamiento de modelos continuo

En este documento, se explican los pasos necesarios para compilar una canalización que entrene un modelo personalizado de forma automática según un programa periódico o cuando se insertan datos nuevos en el conjunto de datos mediante Vertex AI Pipelines y Cloud Functions.

Objetivos

En los siguientes pasos, se abarca este proceso:

  1. Adquiere y prepara un conjunto de datos en BigQuery

  2. Crea y sube un paquete de entrenamiento personalizado. Cuando se ejecuta, lee datos del conjunto de datos y entrena el modelo.

  3. Crea la canalización de Vertex AI. Esta canalización ejecuta el paquete de entrenamiento personalizado, sube el modelo a Vertex AI Model Registry, ejecuta el trabajo de evaluación y envía una notificación por correo electrónico.

  4. Ejecuta la canalización de forma manual.

  5. Crea una función de Cloud Functions con un activador de Eventarc que ejecute la canalización cada vez que se inserten datos nuevos en el conjunto de datos de BigQuery.

Antes de comenzar

Configura tu proyecto y tu notebook.

Configura el proyecto

  1. In the Google Cloud console, go to the project selector page.

    Go to project selector

  2. Select or create a Google Cloud project.

  3. Make sure that billing is enabled for your Google Cloud project.

Crear notebook

Usamos un notebook de Colab Enterprise para ejecutar parte del código de este instructivo.

  1. Si no eres el propietario del proyecto, pídele a un propietario que te otorgue los roles de IAM roles/resourcemanager.projectIamAdmin y roles/aiplatform.colabEnterpriseUser.

    Debes tener estos roles para usar Colab Enterprise y otorgar roles y permisos de IAM a ti mismo y a las cuentas de servicio.

    Ir a IAM

  2. En la consola de Google Cloud, ve a la página Notebooks de Colab Enterprise.

    Colab Enterprise te pedirá que habilites las siguientes APIs requeridas si aún no están habilitadas.

    • La API de Vertex AI
    • API de Dataform
    • API de Compute Engine

    Ir a Colab Enterprise

  3. En el menú Región, selecciona la región en la que deseas crear tu notebook. Si no estás seguro, usa us-central1 como la región.

    Usa la misma región para todos los recursos de este instructivo.

  4. Haz clic en Crear un notebook nuevo.

El notebook nuevo aparece en la pestaña Mis notebooks. Para ejecutar código en tu notebook, agrega una celda de código y haz clic en el botón Ejecutar celda.

Configura el entorno de desarrollo

  1. En tu notebook, instala los siguientes paquetes de Python3.

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. Para configurar el proyecto de Google Cloud CLI, ejecuta lo siguiente:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    Reemplaza PROJECT_ID con el ID del proyecto. Si es necesario, puedes ubicar el ID del proyecto en la consola de Google Cloud.

  3. Otorga roles a tu Cuenta de Google:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. Habilita las siguientes APIs

    • API de Artifact Registry
    • API de BigQuery
    • API de Cloud Build
    • API de Cloud Functions
    • API de Cloud Logging
    • API de Pub/Sub
    • API de Cloud Run Admin
    • API de Cloud Storage
    • API de Eventarc
    • API de Service Usage
    • La API de Vertex AI
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. Otorga roles a las cuentas de servicio de tu proyecto:

    1. Consulta los nombres de tus cuentas de servicio

      ! gcloud iam service-accounts list
      

      Toma nota del nombre de tu agente de servicio de Compute. Debe tener el formato xxxxxxxx-compute@developer.gserviceaccount.com.

    2. Otorga los roles necesarios al agente de servicio:

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
      

Adquiere y prepara el conjunto de datos

En este instructivo, compilarás un modelo que predice la tarifa de un viaje en taxi en función de características como la duración, la ubicación y la distancia del viaje. Usaremos los datos del conjunto de datos público Viajes en taxi de Chicago. Este conjunto de datos incluye viajes en taxi desde 2013 hasta la actualidad, informados a la ciudad de Chicago en su rol como agencia regulatoria. Para proteger la privacidad de los conductores y usuarios del taxi al mismo tiempo y permitir que el agregador analice los datos, el ID de taxi se mantiene coherente con cualquier número de medallones de taxis, pero no muestra el número, la sección censal se elimina en algunos casos y los tiempos se redondean a los 15 minutos más cercanos.

Para obtener más información, consulta Viajes en Taxi de Chicago en Marketplace.

Crea un conjunto de datos de BigQuery

  1. En la consola de Google Cloud, ve a BigQuery Studio.

    Ir a BigQuery

  2. En el panel Explorer, busca tu proyecto, haz clic en Actions y, luego, haz clic en Explorer.

  3. En la página Crear un conjunto de datos:

    • En ID del conjunto de datos, ingresa mlops. 'Para obtener más información, consulta Nombres de los conjuntos de datos.

    • En Tipo de ubicación, elige tu multirregión. Por ejemplo, elige US (varias regiones en Estados Unidos) si usas us-central1. Después de crear un conjunto de datos, la ubicación no se puede cambiar.

    • Haz clic en Crear conjunto de datos.

Para obtener más información, consulta cómo crear conjuntos de datos.

Crea y propaga una tabla de BigQuery

En esta sección, crearás la tabla y, además, importarás los datos de un año del conjunto de datos público al conjunto de datos de tu proyecto.

  1. Ve a BigQuery Studio

    Ir a BigQuery

  2. Haz clic en Crear consulta de SQL y ejecuta la siguiente consulta de SQL mediante un clic en Ejecutar.

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta crea la tabla <PROJECT_ID>.mlops.chicago y la propaga con datos de la tabla pública bigquery-public-data.chicago_taxi_trips.taxi_trips.

  3. Para ver el esquema de la tabla, haz clic en Ir a la tabla y, luego, en la pestaña Esquema.

  4. Para ver el contenido de la tabla, haz clic en la pestaña Vista previa.

Crea y sube el paquete de entrenamiento personalizado

En esta sección, crearás un paquete de Python que contiene el código que lee el conjunto de datos, divide los datos en conjuntos de entrenamiento y prueba, y entrena tu modelo personalizado. El paquete se ejecutará como una de las tareas en la canalización. Si deseas obtener más información, consulta Compila una aplicación de entrenamiento de Python para un contenedor compilado previamente.

Crea el paquete de entrenamiento personalizado

  1. En el notebook de Colab, crea carpetas superiores para la aplicación de entrenamiento:

    !mkdir -p training_package/trainer
    
  2. Crea un archivo __init__.py en cada carpeta para convertirlo en un paquete con el siguiente comando:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    Puedes ver los archivos y las carpetas nuevos en el panel Archivos .

  3. En el panel Archivos, crea un archivo llamado task.py en la carpeta training_package/trainer con el siguiente contenido.

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    El código realiza las siguientes tareas:

    1. Selección de los atributos.
    2. Transformar la hora de partida y destino de los datos de UTC a la hora local de Chicago.
    3. Extraer la fecha, la hora, el día de la semana, el mes y el año de la fecha y hora de partida.
    4. Calcular la duración del viaje con las horas de inicio y finalización.
    5. Identificar y marcar los viajes que comenzaron o terminaron en un aeropuerto según las áreas de la comunidad.
    6. El modelo de regresión de bosque aleatorio está entrenado para predecir la tarifa del viaje en taxi mediante el framework scikit-learn.
    7. El modelo entrenado se guarda en un archivo pickle model.pkl.

      El enfoque y la ingeniería de atributos seleccionados se basan en la exploración y el análisis de datos en Predice las tarifas de taxis de Chicago.

  4. En el panel Archivos, crea un archivo llamado setup.py en la carpeta training_package con el siguiente contenido.

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. En el notebook, ejecuta setup.py a fin de crear la distribución de fuente para la aplicación de entrenamiento:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

Al final de esta sección, el panel Archivos debe contener los siguientes archivos y carpetas en training-package.

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

Sube el paquete de entrenamiento personalizado a Cloud Storage

  1. Crear un bucket de Cloud Storage

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
    

    Reemplaza los siguientes valores de parámetro:

    • REGION: Elige la misma región que eliges cuando creas tu notebook de Colab.

    • BUCKET_NAME: El nombre del bucket.

  2. Sube el paquete de entrenamiento al bucket de Cloud Storage.

    # Copy the training package to the bucket
    ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

Compila tu canalización

Una canalización es una descripción de un flujo de trabajo de MLOps como un grafo de pasos llamados tareas de canalización.

En esta sección, definirás tus tareas de canalización, las compilarás en YAML y registrarás tu canalización en Artifact Registry para que se pueda controlar la versión y ejecutarla varias veces, por un solo usuario o por varios usuarios.

Aquí hay una visualización de las tareas, incluido el entrenamiento de modelos, la carga del modelo, la evaluación del modelo y la notificación por correo electrónico, en nuestra canalización:

Visualización de la canalización

Para obtener más información, consulta Crea plantillas de canalización.

Define constantes e inicializa clientes

  1. En el notebook, define las constantes que se usarán en pasos posteriores:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    Reemplaza NOTIFY_EMAIL por una dirección de correo electrónico. Cuando se completa el trabajo de canalización, ya sea de forma correcta o no, se envía un correo electrónico a esa dirección de correo electrónico.

  2. Inicializa el SDK de Vertex AI con el proyecto, el bucket de etapa de pruebas, la ubicación y el experimento.

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

Define las tareas de la canalización

En el notebook, define la canalización custom_model_training_evaluation_pipeline:

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

Tu canalización consta de un grafo de tareas que usan los siguientes componentes de canalización de Google Cloud:

Compila la canalización

Compila la canalización con el compilador de Kubeflow Pipelines (KFP) en un archivo YAML que contenga una representación hermética de la canalización.

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

Deberías ver un archivo YAML llamado vertex-pipeline-datatrigger-tutorial.yaml en tu directorio de trabajo.

Sube la canalización como una plantilla

  1. Crea un repositorio de tipo KFP en Artifact Registry.

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. Sube la canalización compilada al repositorio.

    from kfp.registry import RegistryClient
    
    host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. En la consola de Google Cloud, verifica que tu plantilla aparezca en Plantillas de canalización.

    Ir a Plantillas de canalizaciones

Ejecuta la canalización de forma manual

Para asegurarte de que la canalización funcione, ejecútala de forma manual.

  1. En el notebook, especifica los parámetros necesarios para ejecutar la canalización como un trabajo.

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. Crea y ejecuta un trabajo de canalización.

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    El trabajo toma unos 30 minutos en completarse.

  3. En la consola, deberías ver una nueva canalización en ejecución en la página Pipelines:

    Ir a Ejecuciones de canalización

  4. Una vez que se complete la ejecución de la canalización, deberías ver un modelo nuevo llamado taxifare-prediction-model o una versión de modelo nueva en Vertex AI Model Registry:

    Ir al registro de modelos

  5. También deberías ver un trabajo de predicción por lotes nuevo:

    Ir a Predicciones por lotes

Ejecuta la canalización automáticamente

Hay dos formas de ejecutar automáticamente la canalización: según un programa o cuando se insertan datos nuevos en el conjunto de datos.

Ejecuta la canalización según un programa

  1. En tu notebook, llama a PipelineJob.create_schedule.

    job_schedule = job.create_schedule(
      display_name="mlops tutorial schedule",
      cron="0 0 1 * *", #
      max_concurrent_run_count=1,
      max_run_count=12,
    )
    

    La expresión cron programa el trabajo para que se ejecute cada primer día del mes a las 12:00 a.m. UTC.

    Para este instructivo, no queremos que varios trabajos se ejecuten de forma simultánea, por lo que establecemos max_concurrent_run_count en 1.

  2. En la consola de Google Cloud, verifica que tu schedule aparezca en Programaciones de canalizaciones.

    Ir a Programas de canalizaciones

Ejecuta la canalización cuando haya datos nuevos

Crea la función con el activador de Eventarc

Crea una función de Cloud Functions (2nd gen) que ejecute la canalización cada vez que se inserten datos nuevos en la tabla de BigQuery.

Específicamente, usamos Eventarc para activar la función cada vez que se produce un evento google.cloud.bigquery.v2.JobService.InsertJob. Luego, la función ejecuta la plantilla de canalización.

Para obtener más información, consulta Activadores de Eventarc y los tipos de eventos compatibles.

  1. En la consola de Google Cloud, ve a la página Cloud Functions.

    Ir a Cloud Functions

  2. Haz clic en el botón Crear función. En la página Configuración:

    1. Selecciona 2nd gen como tu entorno.

    2. En Nombre de la función, usa mlops.

    3. En Región, selecciona la misma región que tu bucket de Cloud Storage y el repositorio de Artifact Registry.

    4. En Activador, selecciona Otro activador. Se abrirá el panel Activador de Eventarc.

      1. En Tipo de activador, elige Fuentes de Google.

      2. En Proveedor de eventos, elige BigQuery.

      3. En Tipo de evento, elige google.cloud.bigquery.v2.JobService.InsertJob.

      4. En Recurso, elige Recurso específico y especifica la tabla de BigQuery.

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. En el campo Región, selecciona una ubicación para el activador de Eventarc, si corresponde. Consulta Ubicación del activador para obtener más información.

      6. Haz clic en Guardar activador.

    5. Si se te solicita que otorgues roles a las cuentas de servicio, haz clic en Otorgar todo.

  3. Haz clic en Siguiente para ir a la página Código. En la página Código:

    1. Establece el Entorno de ejecución en Python 3.12.

    2. Establece el Punto de entrada en mlops_entrypoint.

    3. Con el editor intercalado, abre el archivo main.py y reemplaza el contenido por lo siguiente:

      Reemplaza PROJECT_ID,REGION,BUCKET_NAME por los valores que usaste antes.

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. Abre el archivo requirements.txt y reemplaza el contenido por lo siguiente:

      requests==2.31.0
      google-auth==2.25.1
      
  4. Haz clic en implementar para implementar la función.

Inserta datos para activar la canalización

  1. En la consola de Google Cloud, ve a BigQuery Studio.

    Ir a BigQuery

  2. Haz clic en Crear consulta de SQL y ejecuta la siguiente consulta de SQL mediante un clic en Ejecutar.

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta de SQL para insertar filas nuevas en la tabla.

  3. Para verificar si se activó el evento, busca pipeline trigger condition met en el registro de la función.

    Ir a Cloud Functions

  4. Si la función se activa con éxito, deberías ver una nueva canalización en ejecución en Vertex AI Pipelines. El trabajo de canalización toma unos 30 minutos en completarse.

    Ir a Vertex AI Pipelines

Limpia

Si deseas limpiar todos los recursos de Google Cloud que se usaron en este proyecto, puedes borrar el proyecto de Google Cloud que usaste para el instructivo.

De lo contrario, puedes borrar los recursos individuales que creaste para este instructivo.

  1. Borra el notebook de Colab Enterprise.

    Ir a Colab Enterprise

  2. Elimina el conjunto de datos en BigQuery.

    Ir a BigQuery

  3. Borra el depósito de Cloud Storage

    Ir a Cloud Storage

  4. Borra el modelo de la siguiente manera:

    1. En la sección Vertex AI, ve a la página Registro de modelos.

      Ir a la página Model Registry

    2. Junto al nombre del modelo, haz clic en el menú Acciones y selecciona Borrar modelo.

  5. Borra las ejecuciones de canalizaciones:

    1. Ve a la página Ejecuciones de canalizaciones.

      Ir a Ejecuciones de canalizaciones

    2. Junto al nombre de cada canalización, haz clic en el menú Acciones y elige Borrar ejecución de canalización.

  6. Borra los trabajos de entrenamiento personalizados:

    1. Ir a Trabajos de entrenamiento personalizados

    2. Junto al nombre de cada trabajo de entrenamiento personalizado, haz clic en el menú Acciones y selecciona Borrar trabajo de entrenamiento personalizado.

  7. Borra el trabajo de predicción por lotes de la siguiente manera:

    1. Ir a la página Batch predictions

    2. Junto al nombre de tu trabajo de predicción por lotes, haz clic en el menú Acciones y selecciona Borrar trabajo de predicción por lotes.

  8. Borra el repositorio de Artifact Registry.

    Ir a Artifact Registry

  9. Borra la función de Cloud Functions

    Ir a Cloud Functions