构建用于持续模型训练的流水线

本文档将引导您完成构建流水线所需的步骤,该流水线将使用 Vertex AI Pipelines 和 Cloud Run functions 定期或者在有新数据插入数据集时自动训练自定义模型。

目标

此流程包含以下步骤:

  1. 在 BigQuery 中获取和准备数据集。

  2. 创建并上传自定义训练软件包。执行时,它会从数据集中读取数据并训练模型。

  3. 构建 Vertex AI Pipelines 流水线。此流水线执行自定义训练软件包,将模型上传到 Vertex AI Model Registry,运行评估作业,并发送电子邮件通知。

  4. 手动运行流水线。

  5. 使用 Eventarc 触发器创建一个 Cloud Functions 函数,该函数会在有新数据插入 BigQuery 数据集时运行流水线。

准备工作

设置项目和笔记本。

项目设置

  1. In the Google Cloud console, go to the project selector page.

    Go to project selector

  2. Select or create a Google Cloud project.

  3. Make sure that billing is enabled for your Google Cloud project.

创建笔记本

我们使用 Colab Enterprise 笔记本来执行本教程中的部分代码。

  1. 如果您不是项目所有者,请让项目所有者为您授予 roles/resourcemanager.projectIamAdminroles/aiplatform.colabEnterpriseUser IAM 角色。

    您需要具有这些角色,才能使用 Colab Enterprise 并为您自己和服务账号授予 IAM 角色和权限。

    进入 IAM

  2. 在 Google Cloud 控制台中,进入 Colab Enterprise Notebooks 页面。

    Colab Enterprise 会要求您启用以下必需的 API(如果尚未启用)。

    • Vertex AI API
    • Dataform API
    • Compute Engine API

    进入 Colab Enterprise

  3. 区域菜单中,选择要创建笔记本的区域。如果您不确定,请使用 us-central1 区域。

    请对本教程中的所有资源使用同一区域。

  4. 点击创建新的笔记本

您的新笔记本会显示在我的笔记本标签页上。如需在笔记本中运行代码,请添加代码单元,然后点击  运行单元按钮。

设置开发环境

  1. 在笔记本中,安装以下 Python3 软件包。

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. 通过运行以下命令设置 Google Cloud CLI 项目:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    PROJECT_ID 替换为您的项目 ID。如有必要,您可以在 Google Cloud 控制台中查找项目 ID。

  3. 向您的 Google 账号授予角色:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. 启用以下 API

    • Artifact Registry API
    • BigQuery API
    • Cloud Build API
    • Cloud Functions API
    • Cloud Logging API
    • Pub/Sub API
    • Cloud Run Admin API
    • Cloud Storage API
    • Eventarc API
    • Service Usage API
    • Vertex AI API
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. 向项目的服务账号授予角色:

    1. 查看服务账号的名称

      ! gcloud iam service-accounts list
      

      记下您的计算服务代理的名称。其格式应为 xxxxxxxx-compute@developer.gserviceaccount.com

    2. 向服务代理授予所需角色。

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID-compute@developer.gserviceaccount.com"" --role=roles/eventarc.eventReceiver
      

获取和准备数据集

在本教程中您将构建一个模型,该模型根据行程时间、位置和距离等特征预测出租车行程费用。我们将使用公开的芝加哥出租车行程数据集中的数据。该数据集包含 2013 年至今,向芝加哥市(即监管机构)报告的出租车行程。为保护出租车司机和乘客的隐私并允许汇总程序分析数据,任何给定出租车牌照号对应的出租车 ID 都是一致的,而不会显示具体号码;在某些情况下也不会显示普查区;时间均会四舍五入到最接近的 15 分钟。

如需了解详情,请查看 Marketplace 中的芝加哥出租车行程

创建 BigQuery 数据集

  1. 在 Google Cloud 控制台中,进入 BigQuery Studio。

    转到 BigQuery

  2. 探索器面板中,找到您的项目,点击 操作,然后点击创建数据集

  3. 创建数据集页面中执行以下操作:

    • 数据集 ID 部分,输入 mlops。 如需了解详情,请参阅命名数据集

    • 对于位置类型,选择您的多区域。例如,如果您使用的是 us-central1,请选择 US(美国的多个区域)。创建数据集后,就无法再更改此位置。

    • 点击创建数据集

如需了解详情,请参阅如何创建数据集

创建并填充 BigQuery 表

在本部分中,您将创建表并将公共数据集中一年的数据导入到项目的数据集中。

  1. 进入 BigQuery Studio

    转到 BigQuery

  2. 点击创建 SQL 查询,然后点击 运行以运行以下 SQL 查询。

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    此查询会创建 <PROJECT_ID>.mlops.chicago 表,并使用公共 bigquery-public-data.chicago_taxi_trips.taxi_trips 表中的数据填充它。

  3. 如需查看表的架构,请点击转到表,然后点击架构标签页。

  4. 如需查看表内容,请点击预览标签页。

创建并上传自定义训练软件包

在本部分中,您将创建一个 Python 软件包,其中包含读取数据集、将数据拆分为训练集和测试集以及训练自定义模型的代码。该软件包将作为流水线中的一项任务运行。如需了解详情,请参阅为预构建容器创建一个 Python 训练应用

创建自定义训练软件包

  1. 在 Colab 笔记本中,为训练应用创建父级文件夹:

    !mkdir -p training_package/trainer
    
  2. 使用以下命令在每个文件夹中创建一个 __init__.py 文件,使其成为软件包:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    您可以在文件 folder 面板中查看新文件和文件夹。

  3. 文件面板中,在 training_package/trainer 文件夹中创建一个名为 task.py 的文件,其中包含以下内容。

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    代码完成以下任务:

    1. 特征选择。
    2. 将上客和下客数据时间从世界协调时间 (UTC) 转换为芝加哥当地时间。
    3. 从上客日期时间中提取日期、小时、星期几、月份和年份。
    4. 使用开始时间和结束时间计算行程的时长。
    5. 根据社区区域识别和标记以机场为起点或终点的行程。
    6. 使用 scikit-learn 框架训练随机森林回归模型,以预测出租车行程的费用。
    7. 将经过训练的模型将保存到 pickle 文件 model.pkl 中。

      所选方法和特征工程基于预测芝加哥出租车费用的数据探索和分析。

  4. 文件面板中,在 training_package 文件夹中创建一个名为 setup.py 的文件,其中包含以下内容。

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. 在笔记本中,运行 setup.py 为训练应用创建源分发:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

在本部分结束时,文件面板应在 training-package 下包含以下文件和文件夹。

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

将自定义训练软件包上传到 Cloud Storage

  1. 创建 Cloud Storage 存储桶。

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gcloud storage buckets create gs://$BUCKET_URI --location=$REGION --project=$PROJECT_ID
    

    替换以下参数值:

    • REGION:选择您在创建 Colab 笔记本时选择的区域。

    • BUCKET_NAME:存储桶名称。

  2. 将训练软件包上传到 Cloud Storage 存储桶。

    # Copy the training package to the bucket
    ! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

构建流水线

流水线以步骤图表的形式描述 MLOps 工作流,步骤被称为流水线任务

在本部分中,您将定义流水线任务,将其编译为 YAML,并在 Artifact Registry 中注册流水线,以便单个用户或多个用户对该流水线进行版本控制并多次运行。

下图直观呈现了流水线中的任务,包括模型训练、模型上传、模型评估和电子邮件通知:

流水线可视化

如需了解详情,请参阅创建流水线模板

定义常量并初始化客户端

  1. 在笔记本中,定义将在后续步骤中使用的常量:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    NOTIFY_EMAIL 替换为电子邮件地址。流水线作业完成后,无论成功还是失败,系统都会向该电子邮件地址发送电子邮件。

  2. 使用项目、暂存存储桶、位置和实验初始化 Vertex AI SDK。

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

定义流水线任务

在笔记本中,定义流水线 custom_model_training_evaluation_pipeline

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

您的流水线由使用以下 Google Cloud 流水线组件的任务组成:

编译流水线

使用 Kubeflow Pipelines (KFP) 编译器将流水线编译为包含流水线封闭表示形式的 YAML 文件。

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

您应该会在工作目录中看到名为 vertex-pipeline-datatrigger-tutorial.yaml 的 YAML 文件。

将流水线上传为模板

  1. 在 Artifact Registry 中创建 KFP 类型的存储库。

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. 将已编译的流水线上传到存储库。

    from kfp.registry import RegistryClient
    
    host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. 在 Google Cloud 控制台中,验证您的模板显示在流水线模板中。

    转到“流水线模板”

手动运行流水线

如需确保流水线可用,请手动运行流水线。

  1. 在笔记本中,指定将流水线作为作业运行所需的参数。

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. 创建并运行流水线作业。

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    作业大约需要 30 分钟才能完成。

  3. 在控制台中,您应该会在流水线页面中看到新的流水线运行作业:

    前往“流水线运行作业”

  4. 流水线运行作业完成后,您应该会在 Vertex AI Model Registry 中看到名为 taxifare-prediction-model 的新模型或新模型版本:

    前往 Model Registry

  5. 您还应该看到新的批量预测作业:

    前往“批量预测”

自动运行流水线

有两种方法来自动运行流水线:按计划运行或在有新数据插入数据集时运行。

按计划运行流水线

  1. 在笔记本中,调用 PipelineJob.create_schedule

    job_schedule = job.create_schedule(
      display_name="mlops tutorial schedule",
      cron="0 0 1 * *", #
      max_concurrent_run_count=1,
      max_run_count=12,
    )
    

    cron 表达式会安排作业在每月 1 日午夜 12:00(世界协调时间 [UTC])运行。

    在本教程中,我们不希望多个作业并发运行,因此我们将 max_concurrent_run_count 设置为 1。

  2. 在 Google Cloud 控制台中,验证您的 schedule 是否显示在流水线时间表中。

    前往“流水线时间表”

在有新数据时运行流水线

使用 Eventarc 触发器创建函数

创建 Cloud Functions (第 2 代) 函数,该函数将在有新数据插入 BigQuery 表时运行流水线。

具体来说,我们使用 Eventarc 在 google.cloud.bigquery.v2.JobService.InsertJob 事件发生时触发函数。然后,该函数会运行流水线模板。

如需了解详情,请参阅 Eventarc 触发器支持的事件类型

  1. 在 Google Cloud 控制台中,前往 Cloud Run functions。

    前往 Cloud Run 函数

  2. 点击创建函数按钮。在配置页面中:

    1. 选择第 2 代作为您的环境。

    2. 函数名称字段中,使用 mlops

    3. 区域字段中,选择与您的 Cloud Storage 存储桶和 Artifact Registry 代码库相同的区域。

    4. 触发器字段中,选择其他触发器。此时会打开 Eventarc 触发器窗格。

      1. 触发器类型字段中,选择 Google 来源

      2. 事件提供方字段中,选择 BigQuery

      3. 事件类型字段中,选择 google.cloud.bigquery.v2.JobService.InsertJob

      4. 资源字段中,选择特定资源并指定 BigQuery 表

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. 区域字段中,选择 Eventarc 触发器的位置(如果适用)。如需了解详情,请参阅触发器位置

      6. 点击保存触发器

    5. 如果系统要求您向服务账号授予角色,请点击全部授予

  3. 点击下一步以进入代码页面。在代码页面中:

    1. 运行时设置为 python 3.12。

    2. 入口点设置为 mlops_entrypoint

    3. 使用内嵌编辑器打开文件 main.py,并将其内容替换为以下内容:

      PROJECT_IDREGIONBUCKET_NAME 替换为您之前使用的值。

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See https://cloud.google.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "https://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. 打开文件 requirements.txt 并将其内容替换为以下内容:

      requests==2.31.0
      google-auth==2.25.1
      
  4. 点击部署以部署该函数。

插入数据以触发流水线

  1. 在 Google Cloud 控制台中,进入 BigQuery Studio。

    转到 BigQuery

  2. 点击创建 SQL 查询,然后点击 运行以运行以下 SQL 查询。

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    此 SQL 查询会在表中插入新行。

  3. 如需验证事件已触发,请在函数日志中搜索 pipeline trigger condition met

    前往 Cloud Run 函数

  4. 如果函数成功触发,您应该会在 Vertex AI Pipelines 中看到新的流水线运行。流水线作业大约需要 30 分钟才能完成。

    前往 Vertex AI Pipelines

清理

如需清理此项目使用的所有 Google Cloud 资源,您可以删除用于本教程的 Google Cloud 项目

或者,您也可以逐个删除为本教程创建的资源。

  1. 删除 Colab Enterprise 笔记本

    进入 Colab Enterprise

  2. 删除 BigQuery 中的数据集

    转到 BigQuery

  3. 删除 Cloud Storage 存储桶

    转到 Cloud Storage

  4. 按如下方式删除模型:

    1. 在 Vertex AI 部分,进入 Model Registry 页面。

      进入 Model Registry 页面

    2. 点击模型名称旁边的操作菜单,然后选择删除模型

  5. 删除流水线运行作业:

    1. 前往流水线运行作业页面。

      前往“流水线运行作业”

    2. 点击各流水线运行作业名称旁边的操作菜单,然后选择删除流水线运行作业

  6. 删除自定义训练作业:

    1. 前往“自定义训练作业”

    2. 点击各自定义训练作业名称旁边的操作菜单,然后选择删除自定义训练作业

  7. 按如下方式删除批量预测作业:

    1. 前往“批量预测”页面

    2. 点击各批量预测作业名称旁边的操作菜单,然后选择删除批量预测作业

  8. 从 Artifact Registry 中删除仓库

    前往 Artifact Registry

  9. 删除 Cloud Functions 函数

    前往 Cloud Run 函数