Vertex AI의 Ray 클러스터에서 모델을 학습시킨 후 다음 프로세스를 사용하여 온라인 예측 요청 모델을 배포할 수 있습니다.
Ray 체크포인트에서 모델을 내보냅니다.
모델을 Vertex AI Model Registry에 업로드합니다.
엔드포인트에 모델을 배포합니다.
예측 요청을 수행합니다.
이 섹션의 단계에서는 대화형 Python 환경에서 Vertex AI SDK의 Ray를 사용한다고 가정합니다.
Vertex AI 온라인 예측과 Ray 추론 비교
기능 | Vertex AI 온라인 예측(권장) | Ray 추론(Ray 제공) |
---|---|---|
확장성 | 트래픽 기반 자동 확장(LLM 모델의 경우에도 확장성 높음) | 분산 백엔드 및 커스텀 리소스 관리로 확장성 높음 |
인프라 관리 | Google Cloud에서 완전 관리, 운영 오버헤드 감소 | 인프라 또는 Kubernetes 클러스터에서 수동 설정 및 관리 필요 |
API/지원 기능 | REST 및 gRPC API, 온라인 및 일괄 예측, 설명 기능, 일괄 처리, 캐시, 스트리밍 | REST 및 gRPC API, 실시간 및 일괄 추론, 모델 구성, 일괄 처리, 캐시, 스트리밍 |
모델 형식 | 사전 빌드된 컨테이너 또는 커스텀 컨테이너를 사용하여 TensorFlow, PyTorch, scikit-learn, XGBoost와 같은 다양한 프레임워크 지원 | TensorFlow, PyTorch, scikit-learn과 같은 다양한 프레임워크를 지원합니다. |
사용 편의성 | 설정 및 관리가 쉽고 다른 Vertex AI 기능과 통합 | 유연성 및 맞춤설정 수준이 높지만 Ray에 대한 심층 지식 필요 |
비용 | 비용은 머신 유형, 액셀러레이터, 복제본 수에 따라 다름 | 비용은 인프라 옵션에 따라 다름 |
전문 기능 | 모델 모니터링, A/B 테스트, 트래픽 분할, Vertex AI Model Registry 및 Vertex AI Pipelines 통합 | 고급 모델 작성, 앙상블 모델, 커스텀 추론 논리, Ray 에코시스템 통합 |
Vertex AI 클라이언트에서 Ray 가져오기 및 초기화
Vertex AI의 Ray 클러스터에 이미 연결되어 있으면 커널을 다시 시작하고 다음 코드를 실행합니다. runtime_env
변수는 연결 시 예측 명령어를 실행하기 위해 필요합니다.
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
각 항목의 의미는 다음과 같습니다.
CLUSTER_RESOURCE_NAME: 프로젝트 전체에서 고유해야 하는 Vertex AI 클러스터의 Ray에 대한 전체 리소스 이름입니다.
BUCKET_URI는 모델 아티팩트를 저장할 Cloud Storage 버킷입니다.
모델을 학습시켜 Vertex AI Model Registry로 내보내기
Ray 체크포인트에서 Vertex AI 모델을 내보내고 Vertex AI Model Registry에 업로드합니다.
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.33.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
Ray 체크포인트를 모델로 변환합니다.
model.mar
를 빌드합니다.model.mar
를 사용하여 LocalModel을 만듭니다.Vertex AI Model Registry에 업로드합니다.
온라인 예측을 위한 모델 배포
모델을 온라인 엔드포인트에 배포합니다. 자세한 내용은 엔드포인트에 모델 배포를 참조하세요.
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
각 항목의 의미는 다음과 같습니다.
(선택사항) DEPLOYED_NAME: 배포된 모델의 표시 이름입니다. 만들 때 제공하지 않으면 모델의
display_name
이 사용됩니다.(선택사항) TRAFFIC_SPLIT: 배포된 모델의 ID에서 배포된 모델로 전달되어야 하는 이 엔드포인트 트래픽 비율에 대한 지도입니다. 배포된 모델의 ID가 이 지도에 나열되지 않으면 트래픽을 수신하지 않습니다. 트래픽 비율 값은 최대 100까지 추가되거나 엔드포인트에서 현재 트래픽을 허용하지 않는 경우에는 지도가 비어 있어야 합니다. 배포 중인 모델의 키는
"0"
입니다. 예를 들면{"0": 100}
입니다.(선택사항) MACHINE_TYPE: 컴퓨팅 리소스를 지정합니다.
예측 요청을 수행합니다.
엔드포인트에 예측 요청을 보냅니다. 자세한 내용은 커스텀 학습 모델에서 온라인 예측 가져오기를 참조하세요.
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
다음과 같은 출력이 표시됩니다.
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)