Vertex AI 上の Ray クラスタでモデルをトレーニングした後、次の手順でオンライン予測リクエスト用のモデルをデプロイできます。
Ray チェックポイントからモデルをエクスポートします。
Vertex AI Model Registry にモデルをアップロードする
モデルをエンドポイントにデプロイする
予測リクエストを行います。
このセクションの手順では、インタラクティブな Python 環境で Ray on Vertex AI SDK を使用していることを前提としています。
Ray on Vertex AI クライアントをインポートして初期化する
Vertex AI 上の Ray クラスタにすでに接続している場合は、カーネルを再起動して次のコードを実行します。runtime_env
変数は、接続時にオンライン予測コマンドを実行するために必要です。
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. aiplatform.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
ここで
CLUSTER_RESOURCE_NAME: プロジェクト全体で一意である必要がある、Vertex AI クラスタ上の Ray の完全なリソース名。
BUCKET_URI は、モデル アーティファクトを保存する Cloud Storage バケットです。
モデルをトレーニングして Vertex AI Model Registry にエクスポートする
Ray チェックポイントから Vertex AI モデルをエクスポートし、モデルを Vertex AI Model Registry にアップロードします。
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray.tune.syncer import SyncConfig import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.4.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) model.save("temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") session.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( upload_dir=f'{BUCKET_URI}/ray_results/tensorflow', ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( SklearnCheckpoint.from_checkpoint(result.checkpoint) )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostCheckpoint vertex_model = xgboost.register_xgboost( XGBoostCheckpoint.from_checkpoint(result.checkpoint) )
モデルをデプロイする
オンライン エンドポイントにモデルをデプロイします。詳細については、エンドポイントにモデルをデプロイするをご覧ください。
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
ここで
(省略可)DEPLOYED_NAME: デプロイされたモデルの表示名。作成時に指定されていない場合は、モデルの
display_name
が使用されます。(省略可)TRAFFIC_SPLIT: デプロイ済みモデルの ID から、そのデプロイ済みモデルに転送する必要があるこのエンドポイントのトラフィックの割合へのマップ。デプロイされたモデルの ID がこのマップに一覧表示されていない場合、トラフィックは受信されていません。トラフィックの割合の値は合計で 100 となる必要があります。また、エンドポイントがその時点で一切のトラフィックを受け付けない場合はマップを空白にする必要があります。デプロイされるモデルのキーは
"0"
です。例:{"0": 100}
(省略可)MACHINE_TYPE: コンピューティング リソースを指定します。
予測リクエストを行う
予測リクエストをエンドポイントに送信します。詳細については、カスタム トレーニング済みモデルからオンライン予測を取得するをご覧ください。
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
出力は次のようになります。
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)