本文档将引导您完成构建流水线所需的步骤,该流水线将使用 Vertex AI Pipelines 和 Cloud Run functions 定期或者在有新数据插入数据集时自动训练自定义模型。
目标
此流程包含以下步骤:
在 BigQuery 中获取和准备数据集。
创建并上传自定义训练软件包。执行时,它会从数据集中读取数据并训练模型。
构建 Vertex AI Pipelines 流水线。此流水线执行自定义训练软件包,将模型上传到 Vertex AI Model Registry,运行评估作业,并发送电子邮件通知。
手动运行流水线。
使用 Eventarc 触发器创建一个 Cloud Functions 函数,该函数会在有新数据插入 BigQuery 数据集时运行流水线。
准备工作
设置项目和笔记本。
项目设置
-
In the Google Cloud console, go to the project selector page.
-
Select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
创建笔记本
我们使用 Colab Enterprise 笔记本来执行本教程中的部分代码。
如果您不是项目所有者,请让项目所有者为您授予
roles/resourcemanager.projectIamAdmin
和roles/aiplatform.colabEnterpriseUser
IAM 角色。您需要具有这些角色,才能使用 Colab Enterprise 并为您自己和服务账号授予 IAM 角色和权限。
在 Google Cloud 控制台中,进入 Colab Enterprise Notebooks 页面。
Colab Enterprise 会要求您启用以下必需的 API(如果尚未启用)。
- Vertex AI API
- Dataform API
- Compute Engine API
在区域菜单中,选择要创建笔记本的区域。如果您不确定,请使用 us-central1 区域。
请对本教程中的所有资源使用同一区域。
点击创建新的笔记本。
您的新笔记本会显示在我的笔记本标签页上。如需在笔记本中运行代码,请添加代码单元,然后点击 运行单元按钮。
设置开发环境
在笔记本中,安装以下 Python3 软件包。
! pip3 install google-cloud-aiplatform==1.34.0 \ google-cloud-pipeline-components==2.6.0 \ kfp==2.4.0 \ scikit-learn==1.0.2 \ mlflow==2.10.0
通过运行以下命令设置 Google Cloud CLI 项目:
PROJECT_ID = "PROJECT_ID" # Set the project id ! gcloud config set project {PROJECT_ID}
将 PROJECT_ID 替换为您的项目 ID。如有必要,您可以在 Google Cloud 控制台中查找项目 ID。
向您的 Google 账号授予角色:
! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
启用以下 API
- Artifact Registry API
- BigQuery API
- Cloud Build API
- Cloud Functions API
- Cloud Logging API
- Pub/Sub API
- Cloud Run Admin API
- Cloud Storage API
- Eventarc API
- Service Usage API
- Vertex AI API
! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
向项目的服务账号授予角色:
查看服务账号的名称
! gcloud iam service-accounts list
记下您的计算服务代理的名称。其格式应为
xxxxxxxx-compute@developer.gserviceaccount.com
。向服务代理授予所需角色。
! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
获取和准备数据集
在本教程中您将构建一个模型,该模型根据行程时间、位置和距离等特征预测出租车行程费用。我们将使用公开的芝加哥出租车行程数据集中的数据。该数据集包含 2013 年至今,向芝加哥市(即监管机构)报告的出租车行程。为保护出租车司机和乘客的隐私并允许汇总程序分析数据,任何给定出租车牌照号对应的出租车 ID 都是一致的,而不会显示具体号码;在某些情况下也不会显示普查区;时间均会四舍五入到最接近的 15 分钟。
如需了解详情,请查看 Marketplace 中的芝加哥出租车行程。
创建 BigQuery 数据集
在 Google Cloud 控制台中,进入 BigQuery Studio。
在探索器面板中,找到您的项目,点击
操作,然后点击创建数据集。在创建数据集页面中执行以下操作:
在数据集 ID 部分,输入
mlops
。 如需了解详情,请参阅命名数据集。对于位置类型,选择您的多区域。例如,如果您使用的是
us-central1
,请选择 US(美国的多个区域)。创建数据集后,就无法再更改此位置。点击创建数据集。
如需了解详情,请参阅如何创建数据集。
创建并填充 BigQuery 表
在本部分中,您将创建表并将公共数据集中一年的数据导入到项目的数据集中。
进入 BigQuery Studio
点击创建 SQL 查询,然后点击
运行以运行以下 SQL 查询。CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago` AS ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
此查询会创建
<PROJECT_ID>.mlops.chicago
表,并使用公共bigquery-public-data.chicago_taxi_trips.taxi_trips
表中的数据填充它。如需查看表的架构,请点击转到表,然后点击架构标签页。
如需查看表内容,请点击预览标签页。
创建并上传自定义训练软件包
在本部分中,您将创建一个 Python 软件包,其中包含读取数据集、将数据拆分为训练集和测试集以及训练自定义模型的代码。该软件包将作为流水线中的一项任务运行。如需了解详情,请参阅为预构建容器创建一个 Python 训练应用。
创建自定义训练软件包
在 Colab 笔记本中,为训练应用创建父级文件夹:
!mkdir -p training_package/trainer
使用以下命令在每个文件夹中创建一个
__init__.py
文件,使其成为软件包:! touch training_package/__init__.py ! touch training_package/trainer/__init__.py
您可以在文件 folder 面板中查看新文件和文件夹。
在文件面板中,在 training_package/trainer 文件夹中创建一个名为
task.py
的文件,其中包含以下内容。# Import the libraries from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import OneHotEncoder, StandardScaler from google.cloud import bigquery, bigquery_storage from sklearn.ensemble import RandomForestRegressor from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from google import auth from scipy import stats import numpy as np import argparse import joblib import pickle import csv import os # add parser arguments parser = argparse.ArgumentParser() parser.add_argument('--project-id', dest='project_id', type=str, help='Project ID.') parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"), type=str, help='Dir to save the data and the trained model.') parser.add_argument('--bq-source', dest='bq_source', type=str, help='BigQuery data source for training data.') args = parser.parse_args() # data preparation code BQ_QUERY = """ with tmp_table as ( SELECT trip_seconds, trip_miles, fare, tolls, company, pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp, DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp, CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport, FROM `{}` WHERE dropoff_latitude IS NOT NULL and dropoff_longitude IS NOT NULL and pickup_latitude IS NOT NULL and pickup_longitude IS NOT NULL and fare > 0 and trip_miles > 0 and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99 ORDER BY RAND() LIMIT 10000) SELECT *, EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year, EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month, EXTRACT(DAY FROM trip_start_timestamp) trip_start_day, EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour, FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week FROM tmp_table """.format(args.bq_source) # Get default credentials credentials, project = auth.default() bqclient = bigquery.Client(credentials=credentials, project=args.project_id) bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials) df = ( bqclient.query(BQ_QUERY) .result() .to_dataframe(bqstorage_client=bqstorageclient) ) # Add 'N/A' for missing 'Company' df.fillna(value={'company':'N/A','tolls':0}, inplace=True) # Drop rows containing null data. df.dropna(how='any', axis='rows', inplace=True) # Pickup and dropoff locations distance df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100 # Remove extremes, outliers possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance'] df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy() # Reduce location accuracy df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3}) # Drop the timestamp col X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1) # Split the data into train and test X_train, X_test = train_test_split(X, test_size=0.10, random_state=123) ## Format the data for batch predictions # select string cols string_cols = X_test.select_dtypes(include='object').columns # Add quotes around string fields X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"') # Add quotes around column names X_test.columns = ['\"' + col + '\"' for col in X_test.columns] # Save DataFrame to csv X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Save test data without the target for batch predictions X_test.drop('\"fare\"',axis=1,inplace=True) X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ') # Separate the target column y_train=X_train.pop('fare') # Get the column indexes col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)} # Create a column transformer pipeline ct_pipe = ColumnTransformer(transformers=[ ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]), ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]), ('std_scaler', StandardScaler(), [ col_index_dict['trip_start_year'], col_index_dict['abs_distance'], col_index_dict['pickup_longitude'], col_index_dict['pickup_latitude'], col_index_dict['dropoff_longitude'], col_index_dict['dropoff_latitude'], col_index_dict['trip_miles'], col_index_dict['trip_seconds']]) ]) # Add the random-forest estimator to the pipeline rfr_pipe = Pipeline([ ('ct', ct_pipe), ('forest_reg', RandomForestRegressor( n_estimators = 20, max_features = 1.0, n_jobs = -1, random_state = 3, max_depth=None, max_leaf_nodes=None, )) ]) # train the model rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5) rfr_rmse = np.sqrt(-rfr_score) print ("Crossvalidation RMSE:",rfr_rmse.mean()) final_model=rfr_pipe.fit(X_train, y_train) # Save the model pipeline with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file: pickle.dump(final_model, model_file)
代码完成以下任务:
- 特征选择。
- 将上客和下客数据时间从世界协调时间 (UTC) 转换为芝加哥当地时间。
- 从上客日期时间中提取日期、小时、星期几、月份和年份。
- 使用开始时间和结束时间计算行程的时长。
- 根据社区区域识别和标记以机场为起点或终点的行程。
- 使用 scikit-learn 框架训练随机森林回归模型,以预测出租车行程的费用。
将经过训练的模型将保存到 pickle 文件
model.pkl
中。所选方法和特征工程基于预测芝加哥出租车费用的数据探索和分析。
在文件面板中,在 training_package 文件夹中创建一个名为
setup.py
的文件,其中包含以下内容。from setuptools import find_packages from setuptools import setup REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"] setup( name='trainer', version='0.1', install_requires=REQUIRED_PACKAGES, packages=find_packages(), include_package_data=True, description='Training application package for chicago taxi trip fare prediction.' )
在笔记本中,运行
setup.py
为训练应用创建源分发:! cd training_package && python setup.py sdist --formats=gztar && cd ..
在本部分结束时,文件面板应在 training-package
下包含以下文件和文件夹。
dist
trainer-0.1.tar.gz
trainer
__init__.py
task.py
trainer.egg-info
__init__.py
setup.py
将自定义训练软件包上传到 Cloud Storage
创建 Cloud Storage 存储桶。
REGION="REGION" BUCKET_NAME = "BUCKET_NAME" BUCKET_URI = f"gs://{BUCKET_NAME}" ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
替换以下参数值:
REGION
:选择您在创建 Colab 笔记本时选择的区域。BUCKET_NAME
:存储桶名称。
将训练软件包上传到 Cloud Storage 存储桶。
# Copy the training package to the bucket ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
构建流水线
流水线以步骤图表的形式描述 MLOps 工作流,步骤被称为流水线任务。
在本部分中,您将定义流水线任务,将其编译为 YAML,并在 Artifact Registry 中注册流水线,以便单个用户或多个用户对该流水线进行版本控制并多次运行。
下图直观呈现了流水线中的任务,包括模型训练、模型上传、模型评估和电子邮件通知:
如需了解详情,请参阅创建流水线模板。
定义常量并初始化客户端
在笔记本中,定义将在后续步骤中使用的常量:
import os EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ] PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial" WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial" os.environ['AIP_MODEL_DIR'] = WORKING_DIR EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" PIPELINE_FILE = PIPELINE_NAME + ".yaml"
将
NOTIFY_EMAIL
替换为电子邮件地址。流水线作业完成后,无论成功还是失败,系统都会向该电子邮件地址发送电子邮件。使用项目、暂存存储桶、位置和实验初始化 Vertex AI SDK。
from google.cloud import aiplatform aiplatform.init( project=PROJECT_ID, staging_bucket=BUCKET_URI, location=REGION, experiment=EXPERIMENT_NAME) aiplatform.autolog()
定义流水线任务
在笔记本中,定义流水线 custom_model_training_evaluation_pipeline
:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform
# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
project: str,
location: str,
training_job_display_name: str,
worker_pool_specs: list,
base_output_dir: str,
prediction_container_uri: str,
model_display_name: str,
batch_prediction_job_display_name: str,
target_field_name: str,
test_data_gcs_uri: list,
ground_truth_gcs_source: list,
batch_predictions_gcs_prefix: str,
batch_predictions_input_format: str="csv",
batch_predictions_output_format: str="jsonl",
ground_truth_format: str="csv",
parent_model_resource_name: str=None,
parent_model_artifact_uri: str=None,
existing_model: bool=False
):
# Notification task
notify_task = VertexNotificationEmailOp(
recipients= EMAIL_RECIPIENTS
)
with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
# Train the model
custom_job_task = CustomTrainingJobOp(
project=project,
display_name=training_job_display_name,
worker_pool_specs=worker_pool_specs,
base_output_directory=base_output_dir,
location=location
)
# Import the unmanaged model
import_unmanaged_model_task = importer(
artifact_uri=base_output_dir,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": prediction_container_uri,
},
},
).after(custom_job_task)
with dsl.If(existing_model == True):
# Import the parent model to upload as a version
import_registry_model_task = importer(
artifact_uri=parent_model_artifact_uri,
artifact_class=artifact_types.VertexModel,
metadata={
"resourceName": parent_model_resource_name
},
).after(import_unmanaged_model_task)
# Upload the model as a version
model_version_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
parent_model=import_registry_model_task.outputs["artifact"],
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
with dsl.Else():
# Upload the model
model_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
# Get the model (or model version)
model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])
# Batch prediction
batch_predict_task = ModelBatchPredictOp(
project= project,
job_display_name= batch_prediction_job_display_name,
model= model_resource,
location= location,
instances_format= batch_predictions_input_format,
predictions_format= batch_predictions_output_format,
gcs_source_uris= test_data_gcs_uri,
gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
machine_type= 'n1-standard-2'
)
# Evaluation task
evaluation_task = ModelEvaluationRegressionOp(
project= project,
target_field_name= target_field_name,
location= location,
# model= model_resource,
predictions_format= batch_predictions_output_format,
predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
ground_truth_format= ground_truth_format,
ground_truth_gcs_source= ground_truth_gcs_source
)
return
您的流水线由使用以下 Google Cloud 流水线组件的任务组成:
CustomTrainingJobOp
:在 Vertex AI 中运行自定义训练作业。ModelUploadOp
:将经过训练的机器学习模型上传到 Model Registry。ModelBatchPredictOp
:创建批量预测作业。ModelEvaluationRegressionOp
:评估回归批量作业。VertexNotificationEmailOp
:发送电子邮件通知。
编译流水线
使用 Kubeflow Pipelines (KFP) 编译器将流水线编译为包含流水线封闭表示形式的 YAML 文件。
from kfp import dsl
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=custom_model_training_evaluation_pipeline,
package_path="{}.yaml".format(PIPELINE_NAME),
)
您应该会在工作目录中看到名为 vertex-pipeline-datatrigger-tutorial.yaml
的 YAML 文件。
将流水线上传为模板
在 Artifact Registry 中创建
KFP
类型的存储库。REPO_NAME = "mlops" # Create a repo in the artifact registry ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
将已编译的流水线上传到存储库。
from kfp.registry import RegistryClient host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}" client = RegistryClient(host=host) TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline( file_name=PIPELINE_FILE, tags=["v1", "latest"], extra_headers={"description":"This is an example pipeline template."}) TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
在 Google Cloud 控制台中,验证您的模板显示在流水线模板中。
手动运行流水线
如需确保流水线可用,请手动运行流水线。
在笔记本中,指定将流水线作为作业运行所需的参数。
DATASET_NAME = "mlops" TABLE_NAME = "chicago" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False }
创建并运行流水线作业。
# Create a pipeline job job = aiplatform.PipelineJob( display_name="triggered_custom_regression_evaluation", template_path=TEMPLATE_URI , parameter_values=parameters, pipeline_root=BUCKET_URI, enable_caching=False ) # Run the pipeline job job.run()
作业大约需要 30 分钟才能完成。
在控制台中,您应该会在流水线页面中看到新的流水线运行作业:
流水线运行作业完成后,您应该会在 Vertex AI Model Registry 中看到名为
taxifare-prediction-model
的新模型或新模型版本:您还应该看到新的批量预测作业:
自动运行流水线
有两种方法来自动运行流水线:按计划运行或在有新数据插入数据集时运行。
按计划运行流水线
在笔记本中,调用
PipelineJob.create_schedule
。job_schedule = job.create_schedule( display_name="mlops tutorial schedule", cron="0 0 1 * *", # max_concurrent_run_count=1, max_run_count=12, )
cron
表达式会安排作业在每月 1 日午夜 12:00(世界协调时间 [UTC])运行。在本教程中,我们不希望多个作业并发运行,因此我们将
max_concurrent_run_count
设置为 1。在 Google Cloud 控制台中,验证您的
schedule
是否显示在流水线时间表中。
在有新数据时运行流水线
使用 Eventarc 触发器创建函数
创建 Cloud Functions (第 2 代) 函数,该函数将在有新数据插入 BigQuery 表时运行流水线。
具体来说,我们使用 Eventarc 在 google.cloud.bigquery.v2.JobService.InsertJob
事件发生时触发函数。然后,该函数会运行流水线模板。
如需了解详情,请参阅 Eventarc 触发器和支持的事件类型。
在 Google Cloud 控制台中,前往 Cloud Run functions。
点击创建函数按钮。在配置页面中:
选择第 2 代作为您的环境。
在函数名称字段中,使用 mlops。
在区域字段中,选择与您的 Cloud Storage 存储桶和 Artifact Registry 代码库相同的区域。
在触发器字段中,选择其他触发器。此时会打开 Eventarc 触发器窗格。
在触发器类型字段中,选择 Google 来源。
在事件提供方字段中,选择 BigQuery。
在事件类型字段中,选择
google.cloud.bigquery.v2.JobService.InsertJob
。在资源字段中,选择特定资源并指定 BigQuery 表
projects/PROJECT_ID/datasets/mlops/tables/chicago
在区域字段中,选择 Eventarc 触发器的位置(如果适用)。如需了解详情,请参阅触发器位置。
点击保存触发器。
如果系统要求您向服务账号授予角色,请点击全部授予。
点击下一步以进入代码页面。在代码页面中:
将运行时设置为 python 3.12。
将入口点设置为
mlops_entrypoint
。使用内嵌编辑器打开文件
main.py
,并将其内容替换为以下内容:将
PROJECT_ID
、REGION
、BUCKET_NAME
替换为您之前使用的值。import json import functions_framework import requests import google.auth import google.auth.transport.requests # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers! @functions_framework.cloud_event def mlops_entrypoint(cloudevent): # Print out the CloudEvent's (required) `type` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type print(f"Event type: {cloudevent['type']}") # Print out the CloudEvent's (optional) `subject` property # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject if 'subject' in cloudevent: # CloudEvent objects don't support `get` operations. # Use the `in` operator to verify `subject` is present. print(f"Subject: {cloudevent['subject']}") # Print out details from the `protoPayload` # This field encapsulates a Cloud Audit Logging entry # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure payload = cloudevent.data.get("protoPayload") if payload: print(f"API method: {payload.get('methodName')}") print(f"Resource name: {payload.get('resourceName')}") print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}") row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount') print(f"No. of rows: {row_count} !!") if row_count: if int(row_count) > 0: print ("Pipeline trigger Condition met !!") submit_pipeline_job() else: print ("No pipeline triggered !!!") def submit_pipeline_job(): PROJECT_ID = 'PROJECT_ID' REGION = 'REGION' BUCKET_NAME = "BUCKET_NAME" DATASET_NAME = "mlops" TABLE_NAME = "chicago" base_output_dir = BUCKET_NAME BUCKET_URI = "gs://{}".format(BUCKET_NAME) PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI) PIPELINE_NAME = "vertex-mlops-pipeline-tutorial" EXPERIMENT_NAME = PIPELINE_NAME + "-experiment" REPO_NAME ="mlops" TEMPLATE_NAME="custom-model-training-evaluation-pipeline" TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job" worker_pool_specs = [{ "machine_spec": {"machine_type": "e2-highmem-2"}, "replica_count": 1, "python_package_spec":{ "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest", "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"], "python_module": "trainer.task", "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"] }, }] parameters = { "project": PROJECT_ID, "location": REGION, "training_job_display_name": "taxifare-prediction-training-job", "worker_pool_specs": worker_pool_specs, "base_output_dir": BUCKET_URI, "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest", "model_display_name": "taxifare-prediction-model", "batch_prediction_job_display_name": "taxifare-prediction-batch-job", "target_field_name": "fare", "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"], "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"], "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output", "existing_model": False } TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest" print("TEMPLATE URI: ", TEMPLATE_URI) request_body = { "name": PIPELINE_NAME, "displayName": PIPELINE_NAME, "runtimeConfig":{ "gcsOutputDirectory": PIPELINE_ROOT, "parameterValues": parameters, }, "templateUri": TEMPLATE_URI } pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) headers = { 'Authorization': 'Bearer {}'.format(creds.token), 'Content-Type': 'application/json; charset=utf-8' } response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body)) print(response.text)
打开文件
requirements.txt
并将其内容替换为以下内容:requests==2.31.0 google-auth==2.25.1
点击部署以部署该函数。
插入数据以触发流水线
在 Google Cloud 控制台中,进入 BigQuery Studio。
点击创建 SQL 查询,然后点击
运行以运行以下 SQL 查询。INSERT INTO `PROJECT_ID.mlops.chicago` ( WITH taxitrips AS ( SELECT trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, payment_type, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, tips, tolls, fare, pickup_community_area, dropoff_community_area, company, unique_key FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE pickup_longitude IS NOT NULL AND pickup_latitude IS NOT NULL AND dropoff_longitude IS NOT NULL AND dropoff_latitude IS NOT NULL AND trip_miles > 0 AND trip_seconds > 0 AND fare > 0 AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022 ) SELECT trip_start_timestamp, EXTRACT(MONTH from trip_start_timestamp) as trip_month, EXTRACT(DAY from trip_start_timestamp) as trip_day, EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week, EXTRACT(HOUR from trip_start_timestamp) as trip_hour, trip_seconds, trip_miles, payment_type, ST_AsText( ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1) ) AS pickup_grid, ST_AsText( ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1) ) AS dropoff_grid, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude) ) AS euclidean, CONCAT( ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)), ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)) ) AS loc_cross, IF((tips/fare >= 0.2), 1, 0) AS tip_bin, tips, tolls, fare, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_community_area, dropoff_community_area, company, unique_key, trip_end_timestamp FROM taxitrips LIMIT 1000000 )
此 SQL 查询会在表中插入新行。
如需验证事件已触发,请在函数日志中搜索
pipeline trigger condition met
。如果函数成功触发,您应该会在 Vertex AI Pipelines 中看到新的流水线运行。流水线作业大约需要 30 分钟才能完成。
清理
如需清理此项目使用的所有 Google Cloud 资源,您可以删除用于本教程的 Google Cloud 项目。
或者,您也可以逐个删除为本教程创建的资源。
按如下方式删除模型:
在 Vertex AI 部分,进入 Model Registry 页面。
点击模型名称旁边的
操作菜单,然后选择删除模型。
删除流水线运行作业:
前往流水线运行作业页面。
点击各流水线运行作业名称旁边的
操作菜单,然后选择删除流水线运行作业。
删除自定义训练作业:
点击各自定义训练作业名称旁边的
操作菜单,然后选择删除自定义训练作业。
按如下方式删除批量预测作业:
点击各批量预测作业名称旁边的
操作菜单,然后选择删除批量预测作业。