在 Vertex AI 上的 Ray 集群中训练模型后,您可以使用以下流程为在线预测请求部署模型:
从 Ray 检查点导出模型。
将模型上传到 Vertex AI Model Registry。
本部分中的步骤假定您在交互式 Python 环境中使用 Ray on Vertex AI SDK。
Vertex AI Online Prediction 与 Ray 推理对比
功能 | Vertex AI Online Prediction(推荐) | Ray 推理 (Ray Serve) |
可伸缩性 | 基于流量的自动扩缩(即使对于 LLM 模型也具有很高的可扩缩性) | 具有分布式后端和自定义资源管理功能,可实现很高的可扩缩性 |
基础架构管理 | 完全由 Google Cloud 托管,运营开销更低 | 需要在基础架构或 Kubernetes 集群上进行更多手动设置和管理 |
API/支持的功能 | REST 和 gRPC API、在线预测和批量预测、可解释性功能、批处理、缓存、流式传输 | REST 和 gRPC API、实时和批量推理、模型组合、批处理、缓存、流式传输 |
模型格式 | 支持使用预构建容器或任何自定义容器的各种框架,例如 TensorFlow、PyTorch、scikit-learn、XGBoost | 支持各种框架,例如 TensorFlow、PyTorch、scikit-learn。 |
易用性 | 更易于设置和管理,并与其他 Vertex AI 功能集成 | 具有更高的灵活性和可自定义性,但需要对 Ray 有更深入的了解 |
费用 | 费用取决于机器类型、加速器和副本数量 | 费用取决于您的基础架构选择 |
专用功能 | 模型监控、A/B 测试、流量分配、Vertex AI Model Registry 和 Vertex AI Pipelines 集成 | 高级模型组合、集成学习模型、自定义推理逻辑、与 Ray 生态系统的集成 |
导入并初始化 Ray on Vertex AI 客户端
如果您已连接到 Vertex AI 上的 Ray 集群,请重启内核并运行以下代码。连接时必须采用 runtime_env
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
CLUSTER_RESOURCE_NAME:Ray on Vertex AI 集群的完整资源名称,该名称在整个项目中必须是唯一的。
BUCKET_URI 是用于存储模型工件的 Cloud Storage 存储桶。
训练模型并将其导出到 Vertex AI Model Registry
从 Ray 检查点导出 Vertex AI 模型并将模型上传到 Vertex AI Model Registry。
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.33.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
将 Ray 检查点转换为模型。
创建 LocalModel。上传到 Vertex AI Model Registry。
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
。(可选)TRAFFIC_SPLIT:从所部署模型的 ID 到此端点流量中应转发到所部署模型的百分比的映射。如果所部署模型的 ID 未在此映射中列出,则它不会收到流量。流量百分比值的总和必须等于 100,或者如果端点当前不接受任何流量,则映射必须为空。所部署的模型的键是
。例如{"0": 100}
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)