Implantar um modelo na Vertex AI e receber previsões

Depois de treinar um modelo em um cluster do Ray na Vertex AI, é possível implantar o modelo para solicitações de previsão on-line usando o seguinte processo:

Before you begin, make sure to read the Ray on Vertex AI overview and set up all the prerequisite tools you need.

As etapas desta seção pressupõem que você esteja usando o SDK Ray na Vertex AI em um ambiente Python interativo.

Comparação entre a previsão on-line da Vertex AI e a inferência de Ray

Recurso Previsão on-line da Vertex AI (recomendado) Inferência de raios (disponibilização de raio)
Escalonabilidade Escalonamento automático com base no tráfego (altamente escalonável até mesmo para modelos LLM) Altamente escalonável com back-ends distribuídos e gerenciamento personalizado de recursos
Gerenciamento de infraestrutura Totalmente gerenciado pelo Google Cloud, menos overhead operacional Requer mais configuração e gerenciamento manuais na sua infraestrutura ou no cluster do Kubernetes
API/Recursos suportados APIs REST e gRPC, previsões on-line e em lote, recursos de explicabilidade, lotes, armazenamento em cache, streaming APIs REST e gRPC, inferência em tempo real e em lote, composição de modelos, lotes, armazenamento em cache, streaming
Formato de modelo Dá suporte a vários frameworks como TensorFlow, PyTorch, scikit-learn e XGBoost usando contêineres pré-criados ou qualquer contêiner personalizado Dá suporte a vários frameworks como TensorFlow, PyTorch, scikit-learn.
Fácil de usar Mais fácil de configurar e gerenciar, integrado a outros recursos da Vertex AI Mais flexível e personalizável, mas requer conhecimento mais profundo do Ray
Custo O custo depende de tipos de máquina, aceleradores e número de réplicas O custo depende das opções de infraestrutura
Recursos especializados Monitoramento de modelos, teste A/B, divisão de tráfego, Vertex AI Model Registry e integração do Vertex AI Pipelines Composição de modelos avançados, modelos de ensemble, lógica de inferência personalizada e integração com o ecossistema Ray

Importar e inicializar o Ray no cliente da Vertex AI

Se você já estiver conectado ao cluster do Ray na Vertex AI, reinicie o kernel e execute o código a seguir. A variável runtime_env é necessária no momento da conexão, para executar comandos de previsão on-line.

import ray
import vertexai

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
vertexai.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

Em que:

  • CLUSTER_RESOURCE_NAME: o nome completo do recurso do cluster Ray na Vertex AI que precisa ser exclusivo em todo o projeto.

  • BUCKET_URI é o bucket do Cloud Storage para armazenar os artefatos do modelo.

Treinar e exportar o modelo para o Vertex AI Model Registry

Exportar o modelo da Vertex AI do checkpoint do Ray e fazer o upload dele para o Vertex AI Model Registry.

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.9.3", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      tf.saved_model.save(model, "temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints. 
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • Converta os checkpoints do Ray em um modelo.

  • Crie o model.mar.

  • Crie o LocalModel usando model.mar.

  • Faça upload para o Vertex AI Model Registry.

Implantar o modelo para previsões on-line

Implante o modelo no endpoint on-line. Para mais informações, consulte Implantar o modelo em um endpoint.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

Em que:

  • (Opcional) DEPLOYED_NAME: o nome de exibição do modelo implantado. Se não for fornecido na criação, o display_name do modelo será usado.

  • (Opcional) TRAFFIC_SPLIT: um mapa do ID de um modelo implantado para a porcentagem do tráfego desse endpoint que precisa ser encaminhado para esse modelo implantado. Se o ID de um modelo implantado não estiver listado nesse mapa, ele não receberá tráfego. Os valores de porcentagem de tráfego precisam totalizar 100 ou o mapa precisa estar vazio se o endpoint não aceitar tráfego no momento. A chave do modelo que está sendo implantado é "0". Por exemplo, {"0": 100}.

  • (Opcional) MACHINE_TYPE: especifique os recursos de computação.

Fazer uma solicitação de previsão

Envie uma solicitação de previsão para o endpoint. Para mais informações, consulte Receber previsões on-line de um modelo treinado personalizado.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

O resultado será semelhante a este:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

A seguir