BigQuery 데이터로 TensorFlow 모델 학습

Colab 로고 Colab에서 실행 GitHub 로고 GitHub에서 보기 Vertex AI 로고 Vertex AI Workbench에서 열기

개요

이 튜토리얼에서는 Python용 Vertex AI SDK를 사용하여 온라인 예측을 위한 커스텀 테이블 형식 분류 모델을 학습시키고 배포하는 방법을 보여줍니다.

목표

이 노트북에서는 Python용 Vertex AI SDK를 사용하여 Docker 컨테이너의 Python 스크립트에서 커스텀 학습 모델을 만든 다음 데이터를 전송하여 배포된 모델에서 예측을 수행하는 방법을 알아봅니다. 또는 gcloud 명령줄 도구를 사용하거나 Google Cloud 콘솔을 사용해서 온라인으로 커스텀 학습 모델을 만들 수 있습니다.

이 튜토리얼에는 다음 Google Cloud ML 서비스 및 리소스가 사용됩니다.

  • BigQuery
  • Cloud Storage
  • Vertex AI 관리형 데이터 세트
  • Vertex AI Training
  • Vertex AI 엔드포인트

구체적인 단계는 다음과 같습니다.

  • 모델 학습을 위한 Vertex AI 커스텀 TrainingPipeline 만들기
  • TensorFlow 모델 학습
  • 제공 Endpoint 리소스에 Model 리소스 배포
  • 예측
  • Model 리소스의 배포 취소

데이터 세트

이 튜토리얼에 사용된 데이터 세트는 BigQuery 공개 데이터 세트의 펭귄 데이터 세트입니다. 이 튜토리얼에서는 데이터 세트의 culmen_length_mm, culmen_depth_mm, flipper_length_mm, body_mass_g 필드만 사용하여 펭귄 종(species)을 예측합니다.

비용

이 튜토리얼에서는 비용이 청구될 수 있는 Google Cloud 구성요소를 사용합니다.

  • Vertex AI
  • Cloud Storage
  • BigQuery

Vertex AI 가격 책정, Cloud Storage 가격 책정, BigQuery 가격 책정에 대해 알아보고 가격 계산기를 사용하여 예상 사용량 기반의 비용 예측을 생성합니다.

설치

Python용 Cloud Storage, BigQuery, Vertex AI SDK의 최신 버전을 설치합니다.

# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        google-cloud-bigquery \
                        pyarrow

Colab에만 해당: 커널을 다시 시작하려면 다음 셀의 주석 처리 삭제

# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

시작하기 전에

프로젝트 ID 설정

프로젝트 ID를 모르는 경우 다음을 시도해 보세요.

  • gcloud config list를 실행합니다.
  • gcloud projects list를 실행합니다.
  • 지원 페이지 참조: 프로젝트 ID 찾기
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

리전

Vertex AI에서 사용하는 REGION 변수를 변경할 수도 있습니다. Vertex AI 리전에 대해 자세히 알아보기

REGION = "us-central1"  # @param {type: "string"}

Google Cloud 계정 인증

Jupyter 환경에 따라 수동으로 인증해야 할 수도 있습니다. 아래의 관련 안내를 따르세요.

1. Vertex AI Workbench

  • 이미 인증된 상태이므로 아무것도 수행하지 않습니다.

2. 로컬 JupyterLab 인스턴스, 주석 처리 삭제 및 실행:

# ! gcloud auth login

3. Colab, 주석 처리 삭제 및 실행:

# from google.colab import auth
# auth.authenticate_user()

4. 서비스 계정 또는 기타

Cloud Storage 버킷 만들기

데이터 세트와 같은 중간 아티팩트를 저장할 스토리지 버킷을 만듭니다.

BUCKET_URI = "gs://your-bucket-name-unique"  # @param {type:"string"}

버킷이 없는 경우에만 다음 셀을 실행하여 Cloud Storage 버킷을 만듭니다.

 gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Python용 Vertex AI SDK 초기화

프로젝트 및 해당 버킷에 Python용 Vertex AI SDK를 초기화합니다.

from google.cloud import aiplatform

# Initialize the Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

BigQuery 클라이언트 초기화

프로젝트의 BigQuery Python 클라이언트를 초기화합니다.

BigQuery를 사용하려면 계정에 'BigQuery 사용자' 역할이 있어야 합니다.

from google.cloud import bigquery

# Set up BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

데이터 사전 처리 및 데이터 분할

먼저 학습 및 테스트를 위해 데이터를 다운로드하고 사전 처리해야 합니다.

  • 카테고리형 특성을 숫자로 변환
  • 사용하지 않는 열 제거
  • 사용할 수 없는 행 삭제
  • 학습 및 테스트 데이터 분할
import numpy as np
import pandas as pd

LABEL_COLUMN = "species"

# Define the BigQuery source dataset
BQ_SOURCE = "bigquery-public-data.ml_datasets.penguins"

# Define NA values
NA_VALUES = ["NA", "."]

# Download a table
table = bq_client.get_table(BQ_SOURCE)
df = bq_client.list_rows(table).to_dataframe()

# Drop unusable rows
df = df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()

# Convert categorical columns to numeric
df["island"], _ = pd.factorize(df["island"])
df["species"], _ = pd.factorize(df["species"])
df["sex"], _ = pd.factorize(df["sex"])

# Split into a training and holdout dataset
df_train = df.sample(frac=0.8, random_state=100)
df_holdout = df[~df.index.isin(df_train.index)]

BigQuery 데이터 세트에서 Vertex AI 테이블 형식 데이터 세트 만들기

BigQuery 학습 데이터에서 Vertex AI 테이블 형식 데이터 세트 리소스를 만듭니다.

자세한 내용은 https://cloud.google.com/vertex-ai/docs/training/using-managed-datasets를 참조하세요.

# Create BigQuery dataset
bq_dataset_id = f"{PROJECT_ID}.dataset_id_unique"
bq_dataset = bigquery.Dataset(bq_dataset_id)
bq_client.create_dataset(bq_dataset, exists_ok=True)
dataset = aiplatform.TabularDataset.create_from_dataframe(
    df_source=df_train,
    staging_path=f"bq://{bq_dataset_id}.table-unique",
    display_name="sample-penguins",
)

모델 학습

컨테이너 이미지를 사용하여 모델을 학습시키는 방법은 두 가지입니다.

  • Vertex AI 사전 빌드된 컨테이너 사용. 사전 빌드된 학습 컨테이너를 사용하는 경우 컨테이너 이미지에 설치할 Python 패키지를 추가로 지정해야 합니다. 이 Python 패키지에는 학습 코드가 포함됩니다.

  • 자체 커스텀 컨테이너 이미지 사용. 자체 컨테이너를 사용하는 경우 컨테이너 이미지에 학습 코드가 포함되어 있어야 합니다.

이 데모에서는 사전 빌드된 컨테이너를 사용합니다.

학습 스크립트의 명령어 인수 정의

학습 스크립트에 전달할 명령줄 인수를 준비합니다.

  • args: 해당 Python 모듈에 전달할 명령줄 인수입니다. 이 예시에서는 다음과 같습니다.
    • label_column: 예측할 데이터의 라벨 열입니다.
    • epochs: 학습의 세대 수
    • batch_size: 학습의 배치 크기
JOB_NAME = "custom_job_unique"

EPOCHS = 20
BATCH_SIZE = 10

CMDARGS = [
    "--label_column=" + LABEL_COLUMN,
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
]

학습 스크립트

다음 셀에서 학습 스크립트 task.py의 콘텐츠를 작성합니다. 요약하자면 스크립트가 다음을 수행합니다.

  • BigQuery Python 클라이언트 라이브러리를 사용하여 BigQuery 테이블에서 데이터를 로드합니다.
  • TF.Keras 모델 API를 사용하여 모델을 빌드합니다.
  • 모델(compile())을 컴파일합니다.
  • args.distribute 인수에 따라 학습 배포 전략을 설정합니다.
  • args.epochsargs.batch_size 인수에 따라 모델(fit())을 세대 및 배치 크기로 학습시킵니다.
  • 환경 변수 AIP_MODEL_DIR에서 모델 아티팩트를 저장할 디렉터리를 가져옵니다. 이 변수는 학습 서비스로 설정됩니다.
  • 학습된 모델을 모델 디렉터리에 저장합니다.

참고: 모델 성능을 개선하려면 학습 전에 모델에 대한 입력을 정규화하는 것이 좋습니다. 자세한 내용은 https://www.tensorflow.org/tutorials/structured_data/preprocessing_layers#numerical_columns 튜토리얼을 참조하세요.

참고: 다음 학습 코드를 사용하려면 학습 계정에 'BigQuery 읽기 세션 사용자' 역할을 부여해야 합니다. 이 계정을 찾는 방법에 대한 자세한 내용은 'https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents'를 참조하세요.

%%writefile task.py

import argparse
import numpy as np
import os

import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.cloud import storage

# Read environmental variables
training_data_uri = os.getenv("AIP_TRAINING_DATA_URI")
validation_data_uri = os.getenv("AIP_VALIDATION_DATA_URI")
test_data_uri = os.getenv("AIP_TEST_DATA_URI")

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--label_column', required=True, type=str)
parser.add_argument('--epochs', default=10, type=int)
parser.add_argument('--batch_size', default=10, type=int)
args = parser.parse_args()

# Set up training variables
LABEL_COLUMN = args.label_column

# See https://cloud.google.com/vertex-ai/docs/workbench/managed/executor#explicit-project-selection for issues regarding permissions.
PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]
bq_client = bigquery.Client(project=PROJECT_NUMBER)

# Download a table
def download_table(bq_table_uri: str):
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]

    # Download the BigQuery table as a dataframe
    # This requires the "BigQuery Read Session User" role on the custom training service account.
    table = bq_client.get_table(bq_table_uri)
    return bq_client.list_rows(table).to_dataframe()

# Download dataset splits
df_train = download_table(training_data_uri)
df_validation = download_table(validation_data_uri)
df_test = download_table(test_data_uri)

def convert_dataframe_to_dataset(
    df_train: pd.DataFrame,
    df_validation: pd.DataFrame,
):
    df_train_x, df_train_y = df_train, df_train.pop(LABEL_COLUMN)
    df_validation_x, df_validation_y = df_validation, df_validation.pop(LABEL_COLUMN)

    y_train = np.asarray(df_train_y).astype("float32")
    y_validation = np.asarray(df_validation_y).astype("float32")

    # Convert to numpy representation
    x_train = np.asarray(df_train_x)
    x_test = np.asarray(df_validation_x)

    # Convert to one-hot representation
    num_species = len(df_train_y.unique())
    y_train = tf.keras.utils.to_categorical(y_train, num_classes=num_species)
    y_validation = tf.keras.utils.to_categorical(y_validation, num_classes=num_species)

    dataset_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset_validation = tf.data.Dataset.from_tensor_slices((x_test, y_validation))
    return (dataset_train, dataset_validation)

# Create datasets
dataset_train, dataset_validation = convert_dataframe_to_dataset(df_train, df_validation)

# Shuffle train set
dataset_train = dataset_train.shuffle(len(df_train))

def create_model(num_features):
    # Create model
    Dense = tf.keras.layers.Dense
    model = tf.keras.Sequential(
        [
            Dense(
                100,
                activation=tf.nn.relu,
                kernel_initializer="uniform",
                input_dim=num_features,
            ),
            Dense(75, activation=tf.nn.relu),
            Dense(50, activation=tf.nn.relu),
            Dense(25, activation=tf.nn.relu),
            Dense(3, activation=tf.nn.softmax),
        ]
    )

    # Compile Keras model
    optimizer = tf.keras.optimizers.RMSprop(lr=0.001)
    model.compile(
        loss="categorical_crossentropy", metrics=["accuracy"], optimizer=optimizer
    )

    return model

# Create the model
model = create_model(num_features=dataset_train._flat_shapes[0].dims[0].value)

# Set up datasets
dataset_train = dataset_train.batch(args.batch_size)
dataset_validation = dataset_validation.batch(args.batch_size)

# Train the model
model.fit(dataset_train, epochs=args.epochs, validation_data=dataset_validation)

tf.saved_model.save(model, os.getenv("AIP_MODEL_DIR"))

모델 학습

Vertex AI에서 커스텀 TrainingPipeline을 정의하세요.

CustomTrainingJob 클래스를 사용하여 TrainingPipeline을 정의합니다. 이 클래스는 다음 매개변수를 사용합니다.

  • display_name: 이 학습 파이프라인의 사용자 정의 이름
  • script_path: 학습 스크립트의 로컬 경로
  • container_uri: 학습 컨테이너 이미지의 URI
  • requirements: 스크립트의 Python 패키지 종속 항목 목록
  • model_serving_container_image_uri: 모델에 대한 예측을 제공할 수 있는 컨테이너의 URI(사전 빌드된 컨테이너 또는 커스텀 컨테이너)

run 함수를 사용하여 학습을 시작합니다. 이 함수는 다음과 같은 매개변수가 필요합니다.

  • args: Python 스크립트에 전달되는 명령줄 인수
  • replica_count: 작업자 복제본 수
  • model_display_name: 스크립트가 관리형 Model을 생성하는 경우 Model의 표시 이름
  • machine_type: 학습에 사용할 머신 유형
  • accelerator_type: 하드웨어 가속기 유형
  • accelerator_count: 작업자 복제본에 연결할 가속기 수

run 함수는 Model 객체를 학습시키고 만드는 학습 파이프라인을 만듭니다. 학습 파이프라인이 완료되면 run 함수가 Model 객체를 반환합니다.

job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-8:latest",
    requirements=["google-cloud-bigquery>=2.20.0", "db-dtypes"],
    model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
)

MODEL_DISPLAY_NAME = "penguins_model_unique"

# Start the training
model = job.run(
    dataset=dataset,
    model_display_name=MODEL_DISPLAY_NAME,
    bigquery_destination=f"bq://{PROJECT_ID}",
    args=CMDARGS,
)

모델 배포

모델을 사용하여 예측을 수행하려면 먼저 Endpoint에 배포해야 합니다. 이렇게 하려면 Model 리소스에서 deploy 함수를 호출하면 됩니다. 다음 두 가지 작업이 수행됩니다.

  1. Model 리소스를 배포할 Endpoint 리소스를 만듭니다.
  2. Model 리소스를 Endpoint 리소스에 배포합니다.

이 함수는 다음과 같은 매개변수가 필요합니다.

  • deployed_model_display_name: 배포된 모델의 사람이 읽을 수 있는 이름
  • traffic_split: 이 모델로 이동하는 엔드포인트의 트래픽 비율로, 하나 이상의 키-값 쌍의 사전으로 지정됩니다.
    • 모델이 하나뿐인 경우 { "0": 100 }를 지정합니다. 여기서 '0'은 업로드되는 모델을 나타내며 100은 트래픽의 100%를 의미합니다.
    • 엔드포인트에 트래픽을 분할할 기존 모델이 있는 경우 model_id를 사용하여 { "0": percent, model_id: percent, ... }를 지정합니다. 여기서 model_id는 엔드포인트에 있는 기존 DeployedModel의 ID입니다. 백분율을 합치면 100이 되어야 합니다.
  • machine_type: 학습에 사용할 머신 유형
  • accelerator_type: 하드웨어 가속기 유형
  • accelerator_count: 작업자 복제본에 연결할 가속기 수
  • starting_replica_count: 처음 프로비저닝할 컴퓨팅 인스턴스의 수
  • max_replica_count: 확장할 최대 컴퓨팅 인스턴스 수입니다. 이 튜토리얼에서는 인스턴스 하나만 프로비저닝됩니다.

트래픽 분할

traffic_split 매개변수는 Python 사전으로 지정됩니다. 엔드포인트에 모델 인스턴스를 2개 이상 배포한 후 각 인스턴스로 가는 트래픽의 백분율을 설정할 수 있습니다.

트래픽 분할을 사용하면 점진적으로 새 모델을 프로덕션에 도입할 수 있습니다. 예를 들어 프로덕션에 트래픽의 100%를 가지는 기존 모델이 하나 존재하는 경우 새 모델을 동일한 엔드포인트에 배포하고 트래픽 10%를 이 모델에 전달하여 원래 모델의 트래픽을 90%로 줄일 수 있습니다. 이렇게 하면 대부분의 사용자에게 방해가 되지 않으면서 새 모델의 성능을 모니터링할 수 있습니다.

컴퓨팅 인스턴스 확장

온라인 예측 요청을 처리하도록 단일 인스턴스(또는 노드)를 지정할 수 있습니다. 이 튜토리얼에서는 단일 노드가 사용됩니다. 따라서 MIN_NODESMAX_NODES 변수 모두 1로 설정됩니다.

여러 노드를 사용하여 온라인 예측 요청을 처리하려는 경우에는 MAX_NODES를 사용하려는 최대 노드 개수로 설정합니다. Vertex AI는 설정한 최대 개수까지, 예측 처리에 사용되는 노드 수를 자동 확장합니다. 여러 노드를 사용한 자동 확장 처리 비용을 이해하려면 가격 책정 페이지를 참조하세요.

엔드포인트

deploy 메서드는 모델이 배포될 때까지 기다렸다가 최종적으로 Endpoint 객체를 반환합니다. 모델을 엔드포인트에 처음 배포하는 경우 리소스 프로비저닝을 완료하기 위해 시간이 몇 분 더 걸릴 수 있습니다.

DEPLOYED_NAME = "penguins_deployed_unique"

endpoint = model.deploy(deployed_model_display_name=DEPLOYED_NAME)

온라인 예측 요청 수행

배포된 모델에 온라인 예측 요청을 보냅니다.

테스트 데이터 준비

테스트 데이터를 Python 목록으로 변환하여 준비하기

df_holdout_y = df_holdout.pop(LABEL_COLUMN)
df_holdout_x = df_holdout

# Convert to list representation
holdout_x = np.array(df_holdout_x).tolist()
holdout_y = np.array(df_holdout_y).astype("float32").tolist()

예측 요청 전송

테스트 데이터가 준비되었으면 이를 사용하여 예측 요청을 전송할 수 있습니다. Endpoint 객체의 predict 함수를 사용하여 다음 매개변수를 가져옵니다.

  • instances: 펭귄 측정 인스턴스 목록입니다. 커스텀 모델에 따라 각 인스턴스는 숫자 배열입니다. 이 목록은 이전 단계에서 준비되었습니다.

predict 함수는 목록을 반환합니다. 여기서 목록의 각 요소는 요청의 인스턴스에 해당합니다. 각 예측의 출력에는 다음이 표시됩니다.

  • 10개 클래스 각각에 대한 0에서 1 사이의 예측 신뢰도 수준(predictions)입니다.

그러면 예측 결과에 대한 간단한 평가를 실행할 수 있습니다.

  1. np.argmax: 각 신뢰도 수준의 목록을 라벨로 변환
  2. 예측 인쇄
predictions = endpoint.predict(instances=holdout_x)
y_predicted = np.argmax(predictions.predictions, axis=1)

y_predicted

모델 배포 취소

제공 Endpoint 리소스에서 모든 Model 리소스를 배포 취소하려면 엔드포인트의 undeploy_all 메서드를 사용합니다.

endpoint.undeploy_all()

삭제

이 프로젝트에 사용된 모든 Google Cloud 리소스를 삭제하려면 이 튜토리얼에서 사용한 Google Cloud 프로젝트를 삭제하면 됩니다.

또는 이 튜토리얼에서 만든 리소스를 개별적으로 삭제할 수도 있습니다.

  • 학습 작업
  • 모델
  • 엔드포인트
  • Cloud Storage 버킷
import os

# Delete the training job
job.delete()

# Delete the model
model.delete()

# Delete the endpoint
endpoint.delete()

# Warning: Setting this to true deletes everything in your bucket
delete_bucket = False

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI