Après avoir entraîné un modèle sur un cluster Ray sur Vertex AI, vous pouvez déployer le modèle pour les requêtes de prédiction en ligne en procédant comme suit :
Exportez le modèle à partir du point de contrôle Ray.
Importer le modèle dans Vertex AI Model Registry.
déployer le modèle sur un point de terminaison ;
Effectuez des requêtes de prédiction.
Les étapes de cette section supposent que vous utilisez le SDK Ray sur Vertex AI dans un environnement Python interactif.
Comparaison entre la prédiction en ligne Vertex AI et l'inférence Ray
Caractéristique | Prédiction en ligne Vertex AI (recommandé) | Inférence Ray (Diffusion Ray) |
---|---|---|
Évolutivité | Autoscaling basé sur le trafic (hautement évolutif, même pour les modèles LLM) | Haute évolutivité avec des backends distribués et une gestion des ressources personnalisées |
Gestion des infrastructures | Entièrement géré par Google Cloud, coûts opérationnels réduits | Nécessite une configuration et une gestion manuelles supplémentaires de votre infrastructure ou de votre cluster Kubernetes |
API/Fonctionnalités compatibles | API REST et gRPC, prédictions en ligne et par lots, fonctionnalités d'explicabilité, traitement par lot, mise en cache et diffusion | API REST et gRPC, inférence en temps réel et par lot, composition de modèles, traitement par lot, mise en cache et diffusion |
Format du modèle | Compatible avec différents frameworks tels que TensorFlow, PyTorch, scikit-learn et XGBoost avec des conteneurs prédéfinis ou tout conteneur personnalisé | Compatible avec différents frameworks, tels que TensorFlow, PyTorch et scikit-learn. |
Simplicité d'utilisation | Plus facile à configurer et à gérer, intégré à d'autres fonctionnalités Vertex AI | Plus flexible et personnalisable, mais nécessite une connaissance plus approfondie de Ray |
Coût | Le coût dépend des types de machines, des accélérateurs et du nombre d'instances répliquées | Le coût dépend de vos choix liés à l'infrastructure |
Fonctionnalités spécialisées | Surveillance des modèles, tests A/B, répartition du trafic, intégration de Vertex AI Model Registry et de Vertex AI Pipelines | Composition de modèles avancés, modèles d'ensemble, logique d'inférence personnalisée, intégration à l'écosystème Ray |
Importer et initialiser le client Ray sur Vertex AI
Si vous êtes déjà connecté à votre cluster Ray sur Vertex AI, redémarrez votre kernel et exécutez le code suivant. La variable runtime_env
est nécessaire au moment de la connexion pour exécuter des commandes de prédiction en ligne.
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
Où :
CLUSTER_RESOURCE_NAME : nom complet de la ressource Ray sur un cluster Vertex AI, qui doit être unique dans votre projet.
BUCKET_URI : bucket Cloud Storage pour stocker les artefacts de modèle.
Entraîner et exporter le modèle vers Vertex AI Model Registry
Exportez le modèle Vertex AI à partir du point de contrôle Ray et importez-le dans Vertex AI Model Registry.
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.33.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
Convertissez les points de contrôle Ray en modèle.
Créez
model.mar
.Créez un objet LocalModel à l'aide de
model.mar
.Importer dans Vertex AI Model Registry
Déployer le modèle pour les prédictions en ligne
Déployez le modèle sur le point de terminaison. Pour en savoir plus, consultez Déployer un modèle sur un point de terminaison.
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
Où :
(Facultatif) DEPLOYED_NAME : nom à afficher du modèle déployé. S'il n'est pas fourni lors de la création, la valeur
display_name
du modèle est utilisée.(Facultatif) TRAFFIC_SPLIT : mappage de l'ID d'un modèle déployé au pourcentage du trafic de ce point de terminaison à transférer vers ce modèle déployé. Si l'ID d'un modèle déployé ne figure pas dans ce mappage, il ne reçoit aucun trafic. La somme des valeurs de pourcentage du trafic doit être égale à 100, ou le mappage doit être vide si le point de terminaison n'accepte aucun trafic pour le moment. La clé du modèle déployé est
"0"
. Exemple :{"0": 100}
.(Facultatif) MACHINE_TYPE : spécifiez les ressources de calcul.
Envoyer une requête de prédiction
Envoyez une requête de prédiction au point de terminaison. Pour en savoir plus, consultez la section Obtenir des prédictions en ligne à partir d'un modèle entraîné personnalisé.
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
Vous devriez obtenir un résultat semblable à celui-ci :
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)