Vertex AI 上の Ray クラスタでモデルをトレーニングした後、次の手順でオンライン予測リクエスト用のモデルをデプロイできます。
Ray チェックポイントからモデルをエクスポートする
Vertex AI Model Registry にモデルをアップロードする
モデルをエンドポイントにデプロイする
予測リクエストを行う
このセクションの手順では、インタラクティブな Python 環境で Ray on Vertex AI SDK を使用していることを前提としています。
Vertex AI オンライン予測と Ray 推論の比較
機能 | Vertex AI オンライン予測(推奨) | Ray 推論(Ray Serve) |
---|---|---|
スケーラビリティ | トラフィックに基づく自動スケーリング(LLM モデルでも高いスケーラビリティ) | 分散バックエンドとカスタム リソース管理による高いスケーラビリティ |
インフラストラクチャ管理 | Google Cloud によるフルマネージドで、運用上のオーバーヘッドが少ない | インフラストラクチャまたは Kubernetes クラスタの手動設定と管理が必要 |
API / サポートされている機能 | REST API と gRPC API、オンライン予測とバッチ予測、説明可能性機能、バッチ処理、キャッシュ、ストリーミング | REST API と gRPC API、リアルタイム推論とバッチ推論、モデルの合成、バッチ処理、キャッシュ、ストリーミング |
モデルの形式 | ビルド済みコンテナまたは任意のカスタム コンテナを使用して、TensorFlow、PyTorch、scikit-learn、XGBoost などのさまざまなフレームワークをサポート | TensorFlow、PyTorch、scikit-learn などのさまざまなフレームワークをサポート |
使いやすさ | 設定と管理が簡単で、他の Vertex AI 機能と統合されている | より柔軟でカスタマイズ性に優れているが、Ray に関する深い知識が必要 |
費用 | マシンタイプ、アクセラレータ、レプリカの数によって費用が異なる | 選択したインフラストラクチャによって費用が異なる |
専門的な機能 | モデル モニタリング、A/B テスト、トラフィック分割、Vertex AI Model Registry と Vertex AI Pipelines の統合 | 高度なモデルの構成、アンサンブル モデル、カスタム推論ロジック、Ray エコシステムとの統合 |
Ray on Vertex AI クライアントをインポートして初期化する
Vertex AI 上の Ray クラスタにすでに接続している場合は、カーネルを再起動して次のコードを実行します。runtime_env
変数は、接続時にオンライン予測コマンドを実行するために必要です。
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
ここで
CLUSTER_RESOURCE_NAME: プロジェクト全体で一意である必要がある、Vertex AI クラスタ上の Ray の完全なリソース名。
BUCKET_URI は、モデル アーティファクトを保存する Cloud Storage バケットです。
モデルをトレーニングして Vertex AI Model Registry にエクスポートする
Ray チェックポイントから Vertex AI モデルをエクスポートし、モデルを Vertex AI Model Registry にアップロードします。
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.33.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
Ray チェックポイントをモデルに変換します。
model.mar
をビルドします。model.mar
を使用して LocalModel を作成します。Vertex AI Model Registry にアップロードします。
オンライン予測用のモデルをデプロイする
オンライン エンドポイントにモデルをデプロイします。詳細については、エンドポイントにモデルをデプロイするをご覧ください。
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
ここで
(省略可)DEPLOYED_NAME: デプロイされたモデルの表示名。作成時に指定されていない場合は、モデルの
display_name
が使用されます。(省略可)TRAFFIC_SPLIT: デプロイ済みモデルの ID から、そのデプロイ済みモデルに転送する必要があるこのエンドポイントのトラフィックの割合へのマップ。デプロイされたモデルの ID がこのマップに一覧表示されていない場合、トラフィックは受信されていません。トラフィックの割合の値は合計で 100 となる必要があります。また、エンドポイントがその時点で一切のトラフィックを受け付けない場合はマップを空白にする必要があります。デプロイされるモデルのキーは
"0"
です。例:{"0": 100}
(省略可)MACHINE_TYPE: コンピューティング リソースを指定します。
予測リクエストを行う
予測リクエストをエンドポイントに送信します。詳細については、カスタム トレーニング済みモデルからオンライン予測を取得するをご覧ください。
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
出力は次のようになります。
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)