Neste documento, mostramos as etapas necessárias para criar um pipeline que treina automaticamente um modelo personalizado com base em uma programação periódica ou quando novos dados são inseridos no conjunto de dados usando o Vertex AI Pipelines e o Cloud Run functions.
Objetivos
As etapas a seguir abrangem esse processo:
adquirir e preparar um conjunto de dados no BigQuery
Criar e fazer upload de um pacote de treinamento personalizado Quando executada, ela lê dados do conjunto e treina o modelo.
Criar um pipeline da Vertex AI. Esse pipeline executa o pacote de treinamento personalizado, faz upload do modelo para o Vertex AI Model Registry, executa o job de avaliação e envia uma notificação por e-mail.
Execute o pipeline manualmente.
Criar uma função do Cloud com um gatilho do Eventarc que executa o pipeline sempre que novos dados são inseridos no conjunto de dados do BigQuery.
Antes de começar
Configure o projeto e o notebook.
Configurar o projeto
-
In the Google Cloud console, go to the project selector page.
-
Select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
Criar notebook
Usamos um bloco do Colab Enterprise para executar uma parte do código deste tutorial.
Se você não for o proprietário do projeto, peça a um proprietário que conceda a você os papéis
roles/resourcemanager.projectIamAdmin
eroles/aiplatform.colabEnterpriseUser
do IAM.Você precisa ter esses papéis para usar o Colab Enterprise e conceder papéis e permissões do IAM a você e a contas de serviço.
No console do Google Cloud, acesse a página Notebooks do Colab Enterprise.
O Colab Enterprise vai solicitar que você ative as APIs necessárias a seguir, caso ainda não estejam ativadas.
- API Vertex AI
- API Dataform
- API Compute Engine
No menu Região, selecione a região em que você quer criar o notebook. Se você não tiver certeza, use us-central1 como região.
Use a mesma região para todos os recursos neste tutorial.
Clique em Criar um novo notebook.
O novo notebook aparece na guia Meus notebooks. Para executar o código no notebook, adicione uma célula de código e clique no botão Executar célula.
Configurar o ambiente de desenvolvimento
No seu notebook, instale os pacotes Python3 a seguir.
! pip3 install google-cloud-aiplatform==1.34.0 \ google-cloud-pipeline-components==2.6.0 \ kfp==2.4.0 \ scikit-learn==1.0.2 \ mlflow==2.10.0
Execute o comando a seguir para definir o projeto da CLI do Google Cloud:
PROJECT_ID = "PROJECT_ID" # Set the project id ! gcloud config set project {PROJECT_ID}
Substitua PROJECT_ID pela ID do seu projeto. Se necessário, localize o ID do projeto no console do Google Cloud.
Atribua os papéis à sua Conta do Google:
! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
Ative as APIs a seguir
- API Artifact Registry
- API BigQuery
- API Cloud Build
- API Cloud Functions
- API Cloud Logging
- API Pub/Sub
- API Cloud Run Admin
- API Cloud Storage
- API Eventarc
- API Service Usage
- API Vertex AI
! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
Conceda papéis às contas de serviço do seu projeto:
Ver os nomes das suas contas de serviço
! gcloud iam service-accounts list
Anote o nome do seu agente de serviço do Compute. Ele precisa estar no formato
xxxxxxxx-compute@developer.gserviceaccount.com
.Conceda os papéis necessários à conta de serviço.
! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
Adquirir e preparar conjunto de dados
Neste tutorial, você criará um modelo que prevê a tarifa de uma corrida de táxi com base em recursos como tempo de viagem, local e distância. Usaremos o conjunto de dados público Chicago Taxi Trips. Esse conjunto de dados inclui corridas de táxi de 2013 até o presente, reportadas à agência reguladora da cidade de Chicago. Para proteger a privacidade dos motoristas e usuários da táxi ao mesmo tempo e permitir que o agregador analise os dados, o ID do táxi é mantido consistente para qualquer número de medalhas, mas não mostra o{101 }, os Census Tracts são suprimidos em alguns casos e os tempos são arredondados para os 15 minutos mais próximos.
Para mais informações, confira Viagens de táxi de Chicago no Marketplace.
Criar um conjunto de dados do BigQuery
No console do Google Cloud, acesse o BigQuery Studio.
No painel Explorer, localize seu projeto, clique em
Ações e, em seguida, clique em Criar conjunto de dados.Na página Criar conjunto de dados:
Para o código do conjunto de dados, insira
mlops
. Para mais informações, consulte Nomenclatura do conjunto de dados.Em Tipo de local, escolha sua multirregião. Por exemplo, escolha US (várias regiões nos Estados Unidos) se estiver usando
us-central1
. Após a criação de um conjunto de dados, o local não pode ser alterado.Clique em Criar conjunto de dados.
Para saber mais, confira como criar conjuntos de dados.
Criar e preencher a tabela do BigQuery
Nesta seção, você cria a tabela e importa os dados de um ano do conjunto de dados público para o conjunto de dados do projeto.
Acessar o BigQuery Studio
Clique em Criar consulta SQL e execute a consulta SQL a seguir clicando em
Executar.CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago` AS ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
Essa consulta cria a tabela
<PROJECT_ID>.mlops.chicago
e a preenche com dados da tabela públicabigquery-public-data.chicago_taxi_trips.taxi_trips
.Para exibir o esquema da tabela, clique em Acessar a tabela e depois na guia Esquema.
Para acessar o conteúdo da tabela, clique na guia Visualização.
Criar e fazer upload do pacote de treinamento personalizado
Nesta seção, você cria um pacote do Python que contém o código que lê o conjunto de dados, divide os dados em conjuntos de treinamento e teste e treina seu modelo personalizado. O pacote será executado como uma das tarefas do pipeline. Para mais informações, consulte Como criar um aplicativo de treinamento em Python para um contêiner pré-criado.
Criar o pacote de treinamento personalizado
No bloco do Colab, crie pastas mãe para o aplicativo de treinamento:
!mkdir -p training_package/trainer
Crie um arquivo
__init__.py
em cada pasta para transformá-lo em um pacote usando o seguinte comando:! touch training_package/__init__.py ! touch training_package/trainer/__init__.py
Confira os novos arquivos e pastas no painel Arquivos pasta.
No painel Arquivos, crie um arquivo chamado
task.py
na pasta training_package/trainer com o conteúdo a seguir.# Import the libraries from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import OneHotEncoder, StandardScaler from google.cloud import bigquery, bigquery_storage from sklearn.ensemble import RandomForestRegressor from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from google import auth from scipy import stats import numpy as np import argparse import joblib import pickle import csv import os # add parser arguments parser = argparse.ArgumentParser() parser.add_argument('--project-id', dest='project_id', type=str, help='Project ID.') parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"), type=str, help='Dir to save the data and the trained model.') parser.add_argument('--bq-source', dest='bq_source', type=str, help='BigQuery data source for training data.') args = parser.parse_args() # data preparation code BQ_QUERY = """ with tmp_table as ( SELECT trip_seconds, trip_miles, fare, tolls, company, pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp, DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp, CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport, FROM `{}` WHERE dropoff_latitude IS NOT NULL and dropoff_longitude IS NOT NULL and pickup_latitude IS NOT NULL and pickup_longitude IS NOT NULL and fare > 0 and trip_miles > 0 and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99 ORDER BY RAND() LIMIT 10000) SELECT *, EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year, EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month, EXTRACT(DAY FROM trip_start_timestamp) trip_start_day, EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour, FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week FROM tmp_table """.format(args.bq_source) # Get default credentials credentials, project = auth.default() bqclient = bigquery.Client(credentials=credentials, project=args.project_id) bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials) df = ( bqclient.query(BQ_QUERY) .result() .to_dataframe(bqstorage_client=bqstorageclient) ) # Add 'N/A' for missing 'Company' df.fillna(value={'company':'N/A','tolls':0}, inplace=True) # Drop rows containing null data. df.dropna(how='any', axis='rows', inplace=True) # Pickup and dropoff locations distance df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100 # Remove extremes, outliers possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance'] df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy() # Reduce location accuracy df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3}) # Drop the timestamp col X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1) # Split the data into train and test X_train, X_test = train_test_split(X, test_size=0.10, random_state=123) ## Format the data for batch predictions # select string cols string_cols = X_test.select_dtypes(include='object').columns # Add quotes around string fields X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"') # Add quotes around column names X_test.columns = ['\"' + col + '\"' for col in X_test.columns] # Save DataFrame to csv X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Save test data without the target for batch predictions X_test.drop('\"fare\"',axis=1,inplace=True) X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Separate the target column y_train=X_train.pop('fare') # Get the column indexes col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)} # Create a column transformer pipeline ct_pipe = ColumnTransformer(transformers=[ ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]), ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]), ('std_scaler', StandardScaler(), [ col_index_dict['trip_start_year'], col_index_dict['abs_distance'], col_index_dict['pickup_longitude'], col_index_dict['pickup_latitude'], col_index_dict['dropoff_longitude'], col_index_dict['dropoff_latitude'], col_index_dict['trip_miles'], col_index_dict['trip_seconds']]) ]) # Add the random-forest estimator to the pipeline rfr_pipe = Pipeline([ ('ct', ct_pipe), ('forest_reg', RandomForestRegressor( n_estimators = 20, max_features = 1.0, n_jobs = -1, random_state = 3, max_depth=None, max_leaf_nodes=None, )) ]) # train the model rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5) rfr_rmse = np.sqrt(-rfr_score) print ("Crossvalidation RMSE:",rfr_rmse.mean()) final_model=rfr_pipe.fit(X_train, y_train) # Save the model pipeline with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file: pickle.dump(final_model, model_file)
O código realiza as seguintes tarefas:
- Seleção de atributos.
- Transformação do horário dos dados de embarque e desembarque de UTC para o horário local de Chicago.
- Extração da data, hora, dia da semana, mês e ano da data e hora de retirada.
- Calcular a duração da viagem usando os horários de início e término.
- Identificar e marcar viagens que começaram ou terminaram em um aeroporto com base nas áreas da comunidade.
- O modelo de regressão de floresta aleatória é treinado para prever a tarifa da viagem de táxi usando o framework scikit-learn.
O modelo treinado é salvo em um arquivo pickle
model.pkl
.A abordagem e a engenharia de atributos selecionadas são baseadas na exploração e análise de dados em Como prever tarifas de táxi de Chicago.
No painel Arquivos, crie um arquivo chamado
setup.py
na pasta training_package com o conteúdo a seguir.from setuptools import find_packages from setuptools import setup REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"] setup( name='trainer', version='0.1', install_requires=REQUIRED_PACKAGES, packages=find_packages(), include_package_data=True, description='Training application package for chicago taxi trip fare prediction.' )
No notebook, execute
setup.py
para criar a distribuição de origem do aplicativo de treinamento:! cd training_package && python setup.py sdist --formats=gztar && cd ..
No final desta seção, o painel Arquivos vai conter os seguintes arquivos e pastas em training-package
.
dist
trainer-0.1.tar.gz
trainer
__init__.py
task.py
trainer.egg-info
__init__.py
setup.py
Faça o upload do pacote de treinamento personalizado para o Cloud Storage
Criar um bucket do Cloud Storage.
REGION="REGION" BUCKET_NAME = "BUCKET_NAME" BUCKET_URI = f"gs://{BUCKET_NAME}" ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
Substitua os seguintes valores de parâmetro:
REGION
: escolha a mesma região ao criar o notebook do Colab.BUCKET_NAME
: O nome do bloco.
Faça o upload do pacote de treinamento no bucket do Cloud Storage.
# Copy the training package to the bucket ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
Crie o pipeline
Um pipeline é uma descrição de um fluxo de trabalho de MLOps como um gráfico de etapas chamadas tarefas de pipeline.
Nesta seção, você define as tarefas do pipeline, compila-as em YAML e registra seu pipeline no Artifact Registry para que ele possa ser controlado por versões e executado várias vezes, por um único usuário ou por vários usuários.
Veja a seguir uma visualização das tarefas em nosso pipeline, incluindo treinamento de modelo, upload de modelo, avaliação de modelo e notificação por e-mail:
Para mais informações, consulte Como criar modelos de pipeline.
Definir constantes e inicializar clientes
No notebook, defina as constantes que serão usadas nas etapas posteriores:
import os EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ] PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial" WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial" os.environ['AIP_MODEL_DIR'] = WORKING_DIR EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" PIPELINE_FILE = PIPELINE_NAME + ".yaml"
Substitua
NOTIFY_EMAIL
por um endereço de e-mail. Quando o job do pipeline é concluído, com ou sem sucesso, um e-mail é enviado para esse endereço.Inicialize o SDK da Vertex AI com o projeto, o bucket de preparo, o local e o experimento.
from google.cloud import aiplatform aiplatform.init( project=PROJECT_ID, staging_bucket=BUCKET_URI, location=REGION, experiment=EXPERIMENT_NAME) aiplatform.autolog()
Definir as tarefas do pipeline
No notebook, defina o pipeline custom_model_training_evaluation_pipeline
:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform
# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
project: str,
location: str,
training_job_display_name: str,
worker_pool_specs: list,
base_output_dir: str,
prediction_container_uri: str,
model_display_name: str,
batch_prediction_job_display_name: str,
target_field_name: str,
test_data_gcs_uri: list,
ground_truth_gcs_source: list,
batch_predictions_gcs_prefix: str,
batch_predictions_input_format: str="csv",
batch_predictions_output_format: str="jsonl",
ground_truth_format: str="csv",
parent_model_resource_name: str=None,
parent_model_artifact_uri: str=None,
existing_model: bool=False
):
# Notification task
notify_task = VertexNotificationEmailOp(
recipients= EMAIL_RECIPIENTS
)
with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
# Train the model
custom_job_task = CustomTrainingJobOp(
project=project,
display_name=training_job_display_name,
worker_pool_specs=worker_pool_specs,
base_output_directory=base_output_dir,
location=location
)
# Import the unmanaged model
import_unmanaged_model_task = importer(
artifact_uri=base_output_dir,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": prediction_container_uri,
},
},
).after(custom_job_task)
with dsl.If(existing_model == True):
# Import the parent model to upload as a version
import_registry_model_task = importer(
artifact_uri=parent_model_artifact_uri,
artifact_class=artifact_types.VertexModel,
metadata={
"resourceName": parent_model_resource_name
},
).after(import_unmanaged_model_task)
# Upload the model as a version
model_version_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
parent_model=import_registry_model_task.outputs["artifact"],
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
with dsl.Else():
# Upload the model
model_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
# Get the model (or model version)
model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])
# Batch prediction
batch_predict_task = ModelBatchPredictOp(
project= project,
job_display_name= batch_prediction_job_display_name,
model= model_resource,
location= location,
instances_format= batch_predictions_input_format,
predictions_format= batch_predictions_output_format,
gcs_source_uris= test_data_gcs_uri,
gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
machine_type= 'n1-standard-2'
)
# Evaluation task
evaluation_task = ModelEvaluationRegressionOp(
project= project,
target_field_name= target_field_name,
location= location,
# model= model_resource,
predictions_format= batch_predictions_output_format,
predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
ground_truth_format= ground_truth_format,
ground_truth_gcs_source= ground_truth_gcs_source
)
return
O pipeline consiste em um gráfico de tarefas que usam os seguintes componentes de pipeline do Google Cloud:
CustomTrainingJobOp
: executa jobs de treinamento personalizados na Vertex AI.ModelUploadOp
: faz upload do modelo de machine learning treinado no Model Registry.ModelBatchPredictOp
: cria um job de previsão em lote.ModelEvaluationRegressionOp
: avalia um job de lote de regressão.VertexNotificationEmailOp
: envia notificações por e-mail.
Compile o pipeline.
Compile o pipeline usando o compilador do Kubeflow Pipelines (KFP) em um arquivo YAML contendo uma representação hermética do pipeline.
from kfp import dsl
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=custom_model_training_evaluation_pipeline,
package_path="{}.yaml".format(PIPELINE_NAME),
)
Você verá um arquivo YAML chamado vertex-pipeline-datatrigger-tutorial.yaml
no
diretório de trabalho.
Fazer upload do pipeline como um modelo
Crie um repositório do tipo
KFP
no Artifact Registry.REPO_NAME = "mlops" # Create a repo in the artifact registry ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
Faça upload do pipeline compilado para o repositório.
from kfp.registry import RegistryClient host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}" client = RegistryClient(host=host) TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline( file_name=PIPELINE_FILE, tags=["v1", "latest"], extra_headers={"description":"This is an example pipeline template."}) TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
No console do Google Cloud, verifique se o modelo aparece em Modelos de pipeline.
Executar o pipeline manualmente
Para garantir que o pipeline funcione, execute-o manualmente.
No notebook, especifique os parâmetros necessários para executar o pipeline como um job.
DATASET_NAME = "mlops" TABLE_NAME = "chicago" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False }
Criar e executar um job de pipeline.
# Create a pipeline job job = aiplatform.PipelineJob( display_name="triggered_custom_regression_evaluation", template_path=TEMPLATE_URI , parameter_values=parameters, pipeline_root=BUCKET_URI, enable_caching=False ) # Run the pipeline job job.run()
O job leva cerca de 30 minutos para ser concluído.
No console, um novo pipeline deve ser executado na página Pipelines:
Depois que a execução do pipeline for concluída, será exibido um novo modelo chamado
taxifare-prediction-model
ou uma nova versão de modelo no Vertex AI Model Registry:Também deve aparecer um novo job de previsão em lote:
Executar o pipeline automaticamente
Há duas maneiras de executar automaticamente o pipeline: com base em uma programação ou quando novos dados são inseridos no conjunto de dados.
Executar o pipeline em uma programação
No notebook, chame
PipelineJob.create_schedule
.job_schedule = job.create_schedule( display_name="mlops tutorial schedule", cron="0 0 1 * *", # max_concurrent_run_count=1, max_run_count=12, )
A expressão
cron
programa o job para que seja executado no primeiro dia de cada mês às 12h UTC.Para este tutorial, não queremos que vários jobs sejam executados simultaneamente. Portanto, defina
max_concurrent_run_count
como 1.No console do Google Cloud, verifique se o
schedule
aparece em Programações de pipelines.
Executar o pipeline quando houver novos dados
Criar função com o gatilho do Eventarc
Crie uma função do Cloud (2ª geração) que executa o pipeline sempre que novos dados são inseridos na tabela do BigQuery.
Especificamente, usamos um Eventarc para acionar a função sempre que ocorre um
evento google.cloud.bigquery.v2.JobService.InsertJob
. Em seguida, a função executa o modelo de pipeline.
Para mais informações, consulte Acionadores do Eventarc e Tipos de evento compatíveis.
No console do Google Cloud, acesse a página do Cloud Run functions.
Clique no botão Criar função. Na página Configuração:
Selecione 2a geração como seu ambiente.
Em Nome da função, use mlops.
Em Região, selecione a mesma região do bucket do Cloud Storage e do repositório do Artifact Registry.
Em Acionador, selecione Outro acionador. O painel Gatilho do Eventarc é aberto.
Em Tipo de acionador, escolha Fontes do Google.
Em Provedor de eventos, escolha BigQuery.
Em Tipo de evento, escolha
google.cloud.bigquery.v2.JobService.InsertJob
.Em Recurso, escolha Recurso específico e especifique a tabela do BigQuery
projects/PROJECT_ID/datasets/mlops/tables/chicago
No campo Região, selecione um local para o acionador do Eventarc, se aplicável. Consulte Local do acionador para mais informações.
Clique em Salvar acionador.
Se for solicitado que você conceda papéis a contas de serviço, clique em Conceder todos.
Clique em Avançar para acessar a página Código. Na página Código:
Defina o Ambiente de execução como python 3.12.
Defina o Ponto de entrada como
mlops_entrypoint
.Com o editor in-line, abra o arquivo
main.py
e substitua o conteúdo pelo seguinte:Substitua
PROJECT_ID
,REGION
,BUCKET_NAME
pelos valores usados anteriormente.import json import functions_framework import requests import google.auth import google.auth.transport.requests # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers! @functions_framework.cloud_event def mlops_entrypoint(cloudevent): # Print out the CloudEvent's (required) `type` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type print(f"Event type: {cloudevent['type']}") # Print out the CloudEvent's (optional) `subject` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject if 'subject' in cloudevent: # CloudEvent objects don't support `get` operations. # Use the `in` operator to verify `subject` is present. print(f"Subject: {cloudevent['subject']}") # Print out details from the `protoPayload` # This field encapsulates a Cloud Audit Logging entry # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure payload = cloudevent.data.get("protoPayload") if payload: print(f"API method: {payload.get('methodName')}") print(f"Resource name: {payload.get('resourceName')}") print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}") row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount') print(f"No. of rows: {row_count} !!") if row_count: if int(row_count) > 0: print ("Pipeline trigger Condition met !!") submit_pipeline_job() else: print ("No pipeline triggered !!!") def submit_pipeline_job(): PROJECT_ID = 'PROJECT_ID' REGION = 'REGION' BUCKET_NAME = "BUCKET_NAME" DATASET_NAME = "mlops" TABLE_NAME = "chicago" base_output_dir = BUCKET_NAME BUCKET_URI = "gs://{}".format(BUCKET_NAME) PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-mlops-pipeline-tutorial" EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" REPO_NAME ="mlops" TEMPLATE_NAME="custom-model-training-evaluation-pipeline" TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False } TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest" print("TEMPLATE URI: ", TEMPLATE_URI) request_body = { "name": PIPELINE_NAME, "displayName": PIPELINE_NAME, "runtimeConfig":{ "gcsOutputDirectory": PIPELINE_ROOT, "parameterValues": parameters, }, "templateUri": TEMPLATE_URI } pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) headers = { 'Authorization': 'Bearer {}'.format(creds.token), 'Content-Type': 'application/json; charset=utf-8' } response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body)) print(response.text)
Abra o arquivo
requirements.txt
e substitua o conteúdo pelo seguinte:requests==2.31.0 google-auth==2.25.1
Clique em implantar para implantar a função.
Inserir dados para acionar o pipeline
No console do Google Cloud, acesse o BigQuery Studio.
Clique em Criar consulta SQL e execute a consulta SQL a seguir clicando em
Executar.INSERT INTO `PROJECT_ID.mlops.chicago` ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
Esta consulta SQL para inserir novas linhas na tabela.
Para verificar se o evento foi acionado, pesquise
pipeline trigger condition met
no registro da função.Se a função for acionada corretamente, um novo pipeline será executado no Vertex AI Pipelines. O job do pipeline leva cerca de 30 minutos para ser concluído.
Limpar
Para limpar todos os recursos do Google Cloud usados neste projeto, exclua o projeto do Google Cloud usado no tutorial.
Caso contrário, exclua os recursos individuais criados para este tutorial.
Exclua o modelo da seguinte maneira:
Na seção "Vertex AI", acesse a página Model Registry.
Ao lado do nome do modelo, clique no menu
Ações e escolha Excluir modelo.
Exclua as execuções do pipeline:
Acesse a página Execuções de pipeline.
Ao lado do nome de cada execução de pipeline, clique no
menu Ações e escolha Excluir execução do pipeline.
Exclua os jobs de treinamento personalizados:
Ao lado do nome de cada job de treinamento personalizado, clique no
menu Ações e selecione Excluir job de treinamento personalizado.
Exclua o job de previsão em lote da seguinte maneira:
Ao lado do nome do job de previsão em lote, clique em
Ações e escolha Excluir job de previsão em lote.