En este documento, se explican los pasos necesarios para compilar una canalización que entrene un modelo personalizado de forma automática según un programa periódico o cuando se insertan datos nuevos en el conjunto de datos mediante Vertex AI Pipelines y Cloud Functions.
Objetivos
En los siguientes pasos, se abarca este proceso:
Adquiere y prepara un conjunto de datos en BigQuery
Crea y sube un paquete de entrenamiento personalizado. Cuando se ejecuta, lee datos del conjunto de datos y entrena el modelo.
Crea la canalización de Vertex AI. Esta canalización ejecuta el paquete de entrenamiento personalizado, sube el modelo a Vertex AI Model Registry, ejecuta el trabajo de evaluación y envía una notificación por correo electrónico.
Ejecuta la canalización de forma manual.
Crea una función de Cloud Functions con un activador de Eventarc que ejecute la canalización cada vez que se inserten datos nuevos en el conjunto de datos de BigQuery.
Antes de comenzar
Configura tu proyecto y tu notebook.
Configura el proyecto
-
In the Google Cloud console, go to the project selector page.
-
Select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
Crear notebook
Usamos un notebook de Colab Enterprise para ejecutar parte del código de este instructivo.
Si no eres el propietario del proyecto, pídele a un propietario que te otorgue los roles de IAM
roles/resourcemanager.projectIamAdmin
yroles/aiplatform.colabEnterpriseUser
.Debes tener estos roles para usar Colab Enterprise y otorgar roles y permisos de IAM a ti mismo y a las cuentas de servicio.
En la consola de Google Cloud, ve a la página Notebooks de Colab Enterprise.
Colab Enterprise te pedirá que habilites las siguientes APIs requeridas si aún no están habilitadas.
- La API de Vertex AI
- API de Dataform
- API de Compute Engine
En el menú Región, selecciona la región en la que deseas crear tu notebook. Si no estás seguro, usa us-central1 como la región.
Usa la misma región para todos los recursos de este instructivo.
Haz clic en Crear un notebook nuevo.
El notebook nuevo aparece en la pestaña Mis notebooks. Para ejecutar código en tu notebook, agrega una celda de código y haz clic en el botón Ejecutar celda.
Configura el entorno de desarrollo
En tu notebook, instala los siguientes paquetes de Python3.
! pip3 install google-cloud-aiplatform==1.34.0 \ google-cloud-pipeline-components==2.6.0 \ kfp==2.4.0 \ scikit-learn==1.0.2 \ mlflow==2.10.0
Para configurar el proyecto de Google Cloud CLI, ejecuta lo siguiente:
PROJECT_ID = "PROJECT_ID" # Set the project id ! gcloud config set project {PROJECT_ID}
Reemplaza PROJECT_ID con el ID del proyecto. Si es necesario, puedes ubicar el ID del proyecto en la consola de Google Cloud.
Otorga roles a tu Cuenta de Google:
! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
Habilita las siguientes APIs
- API de Artifact Registry
- API de BigQuery
- API de Cloud Build
- API de Cloud Functions
- API de Cloud Logging
- API de Pub/Sub
- API de Cloud Run Admin
- API de Cloud Storage
- API de Eventarc
- API de Service Usage
- La API de Vertex AI
! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
Otorga roles a las cuentas de servicio de tu proyecto:
Consulta los nombres de tus cuentas de servicio
! gcloud iam service-accounts list
Toma nota del nombre de tu agente de servicio de Compute. Debe tener el formato
xxxxxxxx-compute@developer.gserviceaccount.com
.Otorga los roles necesarios al agente de servicio:
! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
Adquiere y prepara el conjunto de datos
En este instructivo, compilarás un modelo que predice la tarifa de un viaje en taxi en función de características como la duración, la ubicación y la distancia del viaje. Usaremos los datos del conjunto de datos público Viajes en taxi de Chicago. Este conjunto de datos incluye viajes en taxi desde 2013 hasta la actualidad, informados a la ciudad de Chicago en su rol como agencia regulatoria. Para proteger la privacidad de los conductores y usuarios del taxi al mismo tiempo y permitir que el agregador analice los datos, el ID de taxi se mantiene coherente con cualquier número de medallones de taxis, pero no muestra el número, la sección censal se elimina en algunos casos y los tiempos se redondean a los 15 minutos más cercanos.
Para obtener más información, consulta Viajes en Taxi de Chicago en Marketplace.
Crea un conjunto de datos de BigQuery
En la consola de Google Cloud, ve a BigQuery Studio.
En el panel Explorer, busca tu proyecto, haz clic en
Actions y, luego, haz clic en Explorer.En la página Crear un conjunto de datos:
En ID del conjunto de datos, ingresa
mlops
. 'Para obtener más información, consulta Nombres de los conjuntos de datos.En Tipo de ubicación, elige tu multirregión. Por ejemplo, elige US (varias regiones en Estados Unidos) si usas
us-central1
. Después de crear un conjunto de datos, la ubicación no se puede cambiar.Haz clic en Crear conjunto de datos.
Para obtener más información, consulta cómo crear conjuntos de datos.
Crea y propaga una tabla de BigQuery
En esta sección, crearás la tabla y, además, importarás los datos de un año del conjunto de datos público al conjunto de datos de tu proyecto.
Ve a BigQuery Studio
Haz clic en Crear consulta de SQL y ejecuta la siguiente consulta de SQL mediante un clic en
Ejecutar.CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago` AS ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
Esta consulta crea la tabla
<PROJECT_ID>.mlops.chicago
y la propaga con datos de la tabla públicabigquery-public-data.chicago_taxi_trips.taxi_trips
.Para ver el esquema de la tabla, haz clic en Ir a la tabla y, luego, en la pestaña Esquema.
Para ver el contenido de la tabla, haz clic en la pestaña Vista previa.
Crea y sube el paquete de entrenamiento personalizado
En esta sección, crearás un paquete de Python que contiene el código que lee el conjunto de datos, divide los datos en conjuntos de entrenamiento y prueba, y entrena tu modelo personalizado. El paquete se ejecutará como una de las tareas en la canalización. Si deseas obtener más información, consulta Compila una aplicación de entrenamiento de Python para un contenedor compilado previamente.
Crea el paquete de entrenamiento personalizado
En el notebook de Colab, crea carpetas superiores para la aplicación de entrenamiento:
!mkdir -p training_package/trainer
Crea un archivo
__init__.py
en cada carpeta para convertirlo en un paquete con el siguiente comando:! touch training_package/__init__.py ! touch training_package/trainer/__init__.py
Puedes ver los archivos y las carpetas nuevos en el panel Archivos .
En el panel Archivos, crea un archivo llamado
task.py
en la carpeta training_package/trainer con el siguiente contenido.# Import the libraries from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import OneHotEncoder, StandardScaler from google.cloud import bigquery, bigquery_storage from sklearn.ensemble import RandomForestRegressor from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from google import auth from scipy import stats import numpy as np import argparse import joblib import pickle import csv import os # add parser arguments parser = argparse.ArgumentParser() parser.add_argument('--project-id', dest='project_id', type=str, help='Project ID.') parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"), type=str, help='Dir to save the data and the trained model.') parser.add_argument('--bq-source', dest='bq_source', type=str, help='BigQuery data source for training data.') args = parser.parse_args() # data preparation code BQ_QUERY = """ with tmp_table as ( SELECT trip_seconds, trip_miles, fare, tolls, company, pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp, DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp, CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport, FROM `{}` WHERE dropoff_latitude IS NOT NULL and dropoff_longitude IS NOT NULL and pickup_latitude IS NOT NULL and pickup_longitude IS NOT NULL and fare > 0 and trip_miles > 0 and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99 ORDER BY RAND() LIMIT 10000) SELECT *, EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year, EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month, EXTRACT(DAY FROM trip_start_timestamp) trip_start_day, EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour, FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week FROM tmp_table """.format(args.bq_source) # Get default credentials credentials, project = auth.default() bqclient = bigquery.Client(credentials=credentials, project=args.project_id) bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials) df = ( bqclient.query(BQ_QUERY) .result() .to_dataframe(bqstorage_client=bqstorageclient) ) # Add 'N/A' for missing 'Company' df.fillna(value={'company':'N/A','tolls':0}, inplace=True) # Drop rows containing null data. df.dropna(how='any', axis='rows', inplace=True) # Pickup and dropoff locations distance df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100 # Remove extremes, outliers possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance'] df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy() # Reduce location accuracy df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3}) # Drop the timestamp col X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1) # Split the data into train and test X_train, X_test = train_test_split(X, test_size=0.10, random_state=123) ## Format the data for batch predictions # select string cols string_cols = X_test.select_dtypes(include='object').columns # Add quotes around string fields X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"') # Add quotes around column names X_test.columns = ['\"' + col + '\"' for col in X_test.columns] # Save DataFrame to csv X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Save test data without the target for batch predictions X_test.drop('\"fare\"',axis=1,inplace=True) X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Separate the target column y_train=X_train.pop('fare') # Get the column indexes col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)} # Create a column transformer pipeline ct_pipe = ColumnTransformer(transformers=[ ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]), ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]), ('std_scaler', StandardScaler(), [ col_index_dict['trip_start_year'], col_index_dict['abs_distance'], col_index_dict['pickup_longitude'], col_index_dict['pickup_latitude'], col_index_dict['dropoff_longitude'], col_index_dict['dropoff_latitude'], col_index_dict['trip_miles'], col_index_dict['trip_seconds']]) ]) # Add the random-forest estimator to the pipeline rfr_pipe = Pipeline([ ('ct', ct_pipe), ('forest_reg', RandomForestRegressor( n_estimators = 20, max_features = 1.0, n_jobs = -1, random_state = 3, max_depth=None, max_leaf_nodes=None, )) ]) # train the model rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5) rfr_rmse = np.sqrt(-rfr_score) print ("Crossvalidation RMSE:",rfr_rmse.mean()) final_model=rfr_pipe.fit(X_train, y_train) # Save the model pipeline with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file: pickle.dump(final_model, model_file)
El código realiza las siguientes tareas:
- Selección de los atributos.
- Transformar la hora de partida y destino de los datos de UTC a la hora local de Chicago.
- Extraer la fecha, la hora, el día de la semana, el mes y el año de la fecha y hora de partida.
- Calcular la duración del viaje con las horas de inicio y finalización.
- Identificar y marcar los viajes que comenzaron o terminaron en un aeropuerto según las áreas de la comunidad.
- El modelo de regresión de bosque aleatorio está entrenado para predecir la tarifa del viaje en taxi mediante el framework scikit-learn.
El modelo entrenado se guarda en un archivo pickle
model.pkl
.El enfoque y la ingeniería de atributos seleccionados se basan en la exploración y el análisis de datos en Predice las tarifas de taxis de Chicago.
En el panel Archivos, crea un archivo llamado
setup.py
en la carpeta training_package con el siguiente contenido.from setuptools import find_packages from setuptools import setup REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"] setup( name='trainer', version='0.1', install_requires=REQUIRED_PACKAGES, packages=find_packages(), include_package_data=True, description='Training application package for chicago taxi trip fare prediction.' )
En el notebook, ejecuta
setup.py
a fin de crear la distribución de fuente para la aplicación de entrenamiento:! cd training_package && python setup.py sdist --formats=gztar && cd ..
Al final de esta sección, el panel Archivos debe contener los siguientes archivos y carpetas en training-package
.
dist
trainer-0.1.tar.gz
trainer
__init__.py
task.py
trainer.egg-info
__init__.py
setup.py
Sube el paquete de entrenamiento personalizado a Cloud Storage
Crear un bucket de Cloud Storage
REGION="REGION" BUCKET_NAME = "BUCKET_NAME" BUCKET_URI = f"gs://{BUCKET_NAME}" ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
Reemplaza los siguientes valores de parámetro:
REGION
: Elige la misma región que eliges cuando creas tu notebook de Colab.BUCKET_NAME
: El nombre del bucket.
Sube el paquete de entrenamiento al bucket de Cloud Storage.
# Copy the training package to the bucket ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
Compila tu canalización
Una canalización es una descripción de un flujo de trabajo de MLOps como un grafo de pasos llamados tareas de canalización.
En esta sección, definirás tus tareas de canalización, las compilarás en YAML y registrarás tu canalización en Artifact Registry para que se pueda controlar la versión y ejecutarla varias veces, por un solo usuario o por varios usuarios.
Aquí hay una visualización de las tareas, incluido el entrenamiento de modelos, la carga del modelo, la evaluación del modelo y la notificación por correo electrónico, en nuestra canalización:
Para obtener más información, consulta Crea plantillas de canalización.
Define constantes e inicializa clientes
En el notebook, define las constantes que se usarán en pasos posteriores:
import os EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ] PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial" WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial" os.environ['AIP_MODEL_DIR'] = WORKING_DIR EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" PIPELINE_FILE = PIPELINE_NAME + ".yaml"
Reemplaza
NOTIFY_EMAIL
por una dirección de correo electrónico. Cuando se completa el trabajo de canalización, ya sea de forma correcta o no, se envía un correo electrónico a esa dirección de correo electrónico.Inicializa el SDK de Vertex AI con el proyecto, el bucket de etapa de pruebas, la ubicación y el experimento.
from google.cloud import aiplatform aiplatform.init( project=PROJECT_ID, staging_bucket=BUCKET_URI, location=REGION, experiment=EXPERIMENT_NAME) aiplatform.autolog()
Define las tareas de la canalización
En el notebook, define la canalización custom_model_training_evaluation_pipeline
:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform
# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
project: str,
location: str,
training_job_display_name: str,
worker_pool_specs: list,
base_output_dir: str,
prediction_container_uri: str,
model_display_name: str,
batch_prediction_job_display_name: str,
target_field_name: str,
test_data_gcs_uri: list,
ground_truth_gcs_source: list,
batch_predictions_gcs_prefix: str,
batch_predictions_input_format: str="csv",
batch_predictions_output_format: str="jsonl",
ground_truth_format: str="csv",
parent_model_resource_name: str=None,
parent_model_artifact_uri: str=None,
existing_model: bool=False
):
# Notification task
notify_task = VertexNotificationEmailOp(
recipients= EMAIL_RECIPIENTS
)
with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
# Train the model
custom_job_task = CustomTrainingJobOp(
project=project,
display_name=training_job_display_name,
worker_pool_specs=worker_pool_specs,
base_output_directory=base_output_dir,
location=location
)
# Import the unmanaged model
import_unmanaged_model_task = importer(
artifact_uri=base_output_dir,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": prediction_container_uri,
},
},
).after(custom_job_task)
with dsl.If(existing_model == True):
# Import the parent model to upload as a version
import_registry_model_task = importer(
artifact_uri=parent_model_artifact_uri,
artifact_class=artifact_types.VertexModel,
metadata={
"resourceName": parent_model_resource_name
},
).after(import_unmanaged_model_task)
# Upload the model as a version
model_version_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
parent_model=import_registry_model_task.outputs["artifact"],
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
with dsl.Else():
# Upload the model
model_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
# Get the model (or model version)
model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])
# Batch prediction
batch_predict_task = ModelBatchPredictOp(
project= project,
job_display_name= batch_prediction_job_display_name,
model= model_resource,
location= location,
instances_format= batch_predictions_input_format,
predictions_format= batch_predictions_output_format,
gcs_source_uris= test_data_gcs_uri,
gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
machine_type= 'n1-standard-2'
)
# Evaluation task
evaluation_task = ModelEvaluationRegressionOp(
project= project,
target_field_name= target_field_name,
location= location,
# model= model_resource,
predictions_format= batch_predictions_output_format,
predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
ground_truth_format= ground_truth_format,
ground_truth_gcs_source= ground_truth_gcs_source
)
return
Tu canalización consta de un grafo de tareas que usan los siguientes componentes de canalización de Google Cloud:
CustomTrainingJobOp
: Ejecuta trabajos de entrenamiento personalizados en Vertex AI.ModelUploadOp
: Sube el modelo de aprendizaje automático entrenado a Model Registry.ModelBatchPredictOp
: Crea un trabajo de predicción por lotes.ModelEvaluationRegressionOp
: Evalúa un trabajo por lotes de regresión.VertexNotificationEmailOp
: Envía notificaciones por correo electrónico.
Compila la canalización
Compila la canalización con el compilador de Kubeflow Pipelines (KFP) en un archivo YAML que contenga una representación hermética de la canalización.
from kfp import dsl
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=custom_model_training_evaluation_pipeline,
package_path="{}.yaml".format(PIPELINE_NAME),
)
Deberías ver un archivo YAML llamado vertex-pipeline-datatrigger-tutorial.yaml
en tu directorio de trabajo.
Sube la canalización como una plantilla
Crea un repositorio de tipo
KFP
en Artifact Registry.REPO_NAME = "mlops" # Create a repo in the artifact registry ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
Sube la canalización compilada al repositorio.
from kfp.registry import RegistryClient host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}" client = RegistryClient(host=host) TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline( file_name=PIPELINE_FILE, tags=["v1", "latest"], extra_headers={"description":"This is an example pipeline template."}) TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
En la consola de Google Cloud, verifica que tu plantilla aparezca en Plantillas de canalización.
Ejecuta la canalización de forma manual
Para asegurarte de que la canalización funcione, ejecútala de forma manual.
En el notebook, especifica los parámetros necesarios para ejecutar la canalización como un trabajo.
DATASET_NAME = "mlops" TABLE_NAME = "chicago" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False }
Crea y ejecuta un trabajo de canalización.
# Create a pipeline job job = aiplatform.PipelineJob( display_name="triggered_custom_regression_evaluation", template_path=TEMPLATE_URI , parameter_values=parameters, pipeline_root=BUCKET_URI, enable_caching=False ) # Run the pipeline job job.run()
El trabajo toma unos 30 minutos en completarse.
En la consola, deberías ver una nueva canalización en ejecución en la página Pipelines:
Una vez que se complete la ejecución de la canalización, deberías ver un modelo nuevo llamado
taxifare-prediction-model
o una versión de modelo nueva en Vertex AI Model Registry:También deberías ver un trabajo de predicción por lotes nuevo:
Ejecuta la canalización automáticamente
Hay dos formas de ejecutar automáticamente la canalización: según un programa o cuando se insertan datos nuevos en el conjunto de datos.
Ejecuta la canalización según un programa
En tu notebook, llama a
PipelineJob.create_schedule
.job_schedule = job.create_schedule( display_name="mlops tutorial schedule", cron="0 0 1 * *", # max_concurrent_run_count=1, max_run_count=12, )
La expresión
cron
programa el trabajo para que se ejecute cada primer día del mes a las 12:00 a.m. UTC.Para este instructivo, no queremos que varios trabajos se ejecuten de forma simultánea, por lo que establecemos
max_concurrent_run_count
en 1.En la consola de Google Cloud, verifica que tu
schedule
aparezca en Programaciones de canalizaciones.
Ejecuta la canalización cuando haya datos nuevos
Crea la función con el activador de Eventarc
Crea una función de Cloud Functions (2nd gen) que ejecute la canalización cada vez que se inserten datos nuevos en la tabla de BigQuery.
Específicamente, usamos Eventarc para activar la función cada vez que se produce un evento google.cloud.bigquery.v2.JobService.InsertJob
. Luego, la función ejecuta la plantilla de canalización.
Para obtener más información, consulta Activadores de Eventarc y los tipos de eventos compatibles.
En la consola de Google Cloud, ve a la página Cloud Functions.
Haz clic en el botón Crear función. En la página Configuración:
Selecciona 2nd gen como tu entorno.
En Nombre de la función, usa mlops.
En Región, selecciona la misma región que tu bucket de Cloud Storage y el repositorio de Artifact Registry.
En Activador, selecciona Otro activador. Se abrirá el panel Activador de Eventarc.
En Tipo de activador, elige Fuentes de Google.
En Proveedor de eventos, elige BigQuery.
En Tipo de evento, elige
google.cloud.bigquery.v2.JobService.InsertJob
.En Recurso, elige Recurso específico y especifica la tabla de BigQuery.
projects/PROJECT_ID/datasets/mlops/tables/chicago
En el campo Región, selecciona una ubicación para el activador de Eventarc, si corresponde. Consulta Ubicación del activador para obtener más información.
Haz clic en Guardar activador.
Si se te solicita que otorgues roles a las cuentas de servicio, haz clic en Otorgar todo.
Haz clic en Siguiente para ir a la página Código. En la página Código:
Establece el Entorno de ejecución en Python 3.12.
Establece el Punto de entrada en
mlops_entrypoint
.Con el editor intercalado, abre el archivo
main.py
y reemplaza el contenido por lo siguiente:Reemplaza
PROJECT_ID
,REGION
,BUCKET_NAME
por los valores que usaste antes.import json import functions_framework import requests import google.auth import google.auth.transport.requests # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers! @functions_framework.cloud_event def mlops_entrypoint(cloudevent): # Print out the CloudEvent's (required) `type` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type print(f"Event type: {cloudevent['type']}") # Print out the CloudEvent's (optional) `subject` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject if 'subject' in cloudevent: # CloudEvent objects don't support `get` operations. # Use the `in` operator to verify `subject` is present. print(f"Subject: {cloudevent['subject']}") # Print out details from the `protoPayload` # This field encapsulates a Cloud Audit Logging entry # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure payload = cloudevent.data.get("protoPayload") if payload: print(f"API method: {payload.get('methodName')}") print(f"Resource name: {payload.get('resourceName')}") print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}") row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount') print(f"No. of rows: {row_count} !!") if row_count: if int(row_count) > 0: print ("Pipeline trigger Condition met !!") submit_pipeline_job() else: print ("No pipeline triggered !!!") def submit_pipeline_job(): PROJECT_ID = 'PROJECT_ID' REGION = 'REGION' BUCKET_NAME = "BUCKET_NAME" DATASET_NAME = "mlops" TABLE_NAME = "chicago" base_output_dir = BUCKET_NAME BUCKET_URI = "gs://{}".format(BUCKET_NAME) PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-mlops-pipeline-tutorial" EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" REPO_NAME ="mlops" TEMPLATE_NAME="custom-model-training-evaluation-pipeline" TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False } TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest" print("TEMPLATE URI: ", TEMPLATE_URI) request_body = { "name": PIPELINE_NAME, "displayName": PIPELINE_NAME, "runtimeConfig":{ "gcsOutputDirectory": PIPELINE_ROOT, "parameterValues": parameters, }, "templateUri": TEMPLATE_URI } pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) headers = { 'Authorization': 'Bearer {}'.format(creds.token), 'Content-Type': 'application/json; charset=utf-8' } response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body)) print(response.text)
Abre el archivo
requirements.txt
y reemplaza el contenido por lo siguiente:requests==2.31.0 google-auth==2.25.1
Haz clic en implementar para implementar la función.
Inserta datos para activar la canalización
En la consola de Google Cloud, ve a BigQuery Studio.
Haz clic en Crear consulta de SQL y ejecuta la siguiente consulta de SQL mediante un clic en
Ejecutar.INSERT INTO `PROJECT_ID.mlops.chicago` ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
Esta consulta de SQL para insertar filas nuevas en la tabla.
Para verificar si se activó el evento, busca
pipeline trigger condition met
en el registro de la función.Si la función se activa con éxito, deberías ver una nueva canalización en ejecución en Vertex AI Pipelines. El trabajo de canalización toma unos 30 minutos en completarse.
Limpia
Si deseas limpiar todos los recursos de Google Cloud que se usaron en este proyecto, puedes borrar el proyecto de Google Cloud que usaste para el instructivo.
De lo contrario, puedes borrar los recursos individuales que creaste para este instructivo.
Borra el modelo de la siguiente manera:
En la sección Vertex AI, ve a la página Registro de modelos.
Junto al nombre del modelo, haz clic en el menú
Acciones y selecciona Borrar modelo.
Borra las ejecuciones de canalizaciones:
Ve a la página Ejecuciones de canalizaciones.
Junto al nombre de cada canalización, haz clic en el menú
Acciones y elige Borrar ejecución de canalización.
Borra los trabajos de entrenamiento personalizados:
Junto al nombre de cada trabajo de entrenamiento personalizado, haz clic en el menú
Acciones y selecciona Borrar trabajo de entrenamiento personalizado.
Borra el trabajo de predicción por lotes de la siguiente manera:
Junto al nombre de tu trabajo de predicción por lotes, haz clic en el menú
Acciones y selecciona Borrar trabajo de predicción por lotes.