Dopo aver addestrato un modello su un cluster Ray su Vertex AI, puoi eseguire il deployment il modello per le richieste di previsione online utilizzando il seguente processo:
Esporta il modello dal checkpoint Ray.
Carica il modello in Vertex AI Model Registry.
Eseguire il deployment del modello su un endpoint.
Effettuare richieste di previsione.
I passaggi in questa sezione presuppongono l'utilizzo di Ray su Vertex AI l'SDK in un ambiente Python interattivo.
Confronto tra previsione online di Vertex AI e inferenza di Ray
Funzionalità | Previsione online di Vertex AI (consigliata) | Inferenza dei raggi (pubblicazione di raggi) |
---|---|---|
Scalabilità | Scalabilità automatica basata sul traffico (elevata scalabilità anche per i modelli LLM) | Elevata scalabilità con backend distribuiti e gestione personalizzata delle risorse |
Gestione dell'infrastruttura | Completamente gestito da Google Cloud, meno overhead operativo | Richiede una configurazione e una gestione più manuali sull'infrastruttura o sul cluster Kubernetes |
API/Funzionalità supportate | API REST e gRPC, previsioni online e batch, caratteristiche di spiegabilità, batch, memorizzazione nella cache, flussi di dati | API REST e gRPC, inferenza in tempo reale e batch, composizione di modelli, raggruppamento in batch, memorizzazione nella cache, inserimento di flussi |
Formato del modello | Supporta vari framework come TensorFlow, PyTorch, scikit-learn, XGBoost utilizzando container predefiniti o qualsiasi container personalizzato | Supporta vari framework come TensorFlow, PyTorch e scikit-learn. |
Facilità di utilizzo | Più facili da configurare e gestire, grazie all'integrazione con altre funzionalità di Vertex AI | Più flessibile e personalizzabile, ma richiede una conoscenza più approfondita di Ray |
Costo | Il costo dipende da tipi di macchina, acceleratori e numero di repliche | Il costo dipende dalle scelte dell'infrastruttura |
Funzionalità specializzate | Monitoraggio dei modelli, test A/B, suddivisione del traffico, integrazione di Vertex AI Model Registry e Vertex AI Pipelines | Composizione avanzata dei modelli, modelli di insieme, logica di inferenza personalizzata, integrazione con l'ecosistema Ray |
Importa e inizializza Ray sul client Vertex AI
Se è già connesso al tuo cluster Ray su Vertex AI, riavvia il
ed eseguire il codice riportato di seguito. La variabile runtime_env
è necessaria in
tempo di connessione per eseguire i comandi di previsione online.
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
Dove:
CLUSTER_RESOURCE_NAME: il nome completo della risorsa per Ray on Vertex AI che deve essere univoco all'interno del progetto.
BUCKET_URI è il bucket Cloud Storage in cui archiviare il modello artefatti.
Addestra ed esporta il modello in Vertex AI Model Registry
Esporta il modello Vertex AI dal checkpoint Ray e carica il modello in Vertex AI Model Registry.
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.9.3", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
Converti i checkpoint Ray in un modello.
Crea
model.mar
.Crea un LocalModel utilizzando
model.mar
.Carica in Vertex AI Model Registry.
Esegui il deployment del modello per le previsioni online
Eseguire il deployment del modello nell'endpoint online. Per ulteriori informazioni, vedi Esegui il deployment del modello in un endpoint.
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
Dove:
(Facoltativo) DEPLOYED_NAME: il nome visualizzato del modello di cui è stato eseguito il deployment. Se non fornito al momento della creazione, viene utilizzato il valore
display_name
del modello.(Facoltativo) TRAFFIC_SPLIT: una mappa dall'ID di un modello di cui è stato eseguito il deployment alla percentuale del traffico di questo endpoint che deve essere inoltrato al modello di cui è stato eseguito il deployment. Se l'ID di un modello di cui è stato eseguito il deployment non è elencato in questa mappa, quindi non riceve traffico. La somma dei valori della percentuale di traffico deve essere pari a 100 oppure la mappa deve essere vuota se che non accetti traffico al momento. La chiave per il modello di cui si esegue il deployment è
"0"
. Ad esempio:{"0": 100}
.(Facoltativo) MACHINE_TYPE: specifica le risorse di computing.
Fai una richiesta di previsione
Invia una richiesta di previsione all'endpoint. Per ulteriori informazioni, consulta Generare previsioni online da un modello addestrato personalizzato.
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
L'output visualizzato dovrebbe essere simile al seguente:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)