Entraînement d'un modèle TensorFlow sur des données BigQuery

Logo Colab Exécuter dans Colab Logo GitHub Afficher sur GitHub Logo Vertex AIOuvrir dans Vertex AI Workbench

Présentation

Ce tutoriel explique comment utiliser le SDK Vertex AI pour Python afin d'entraîner et de déployer un modèle de classification tabulaire personnalisé pour la prédiction en ligne.

Objectif

Dans ce notebook, vous allez apprendre à créer un modèle avec entraînement personnalisé à partir d'un script Python dans un conteneur Docker à l'aide du SDK Vertex AI pour Python, puis à obtenir une prédiction à partir du modèle déployé en envoyant des données. Vous pouvez également créer des modèles avec entraînement personnalisé à l'aide de l'outil de ligne de commande gcloud ou en ligne avec la console Google Cloud.

Ce tutoriel utilise les services et ressources de ML Google Cloud suivants :

  • BigQuery
  • Cloud Storage
  • Ensembles de données gérés Vertex AI
  • Vertex AI Training
  • Points de terminaison Vertex AI

La procédure comprend les étapes suivantes :

  • Créer un TrainingPipeline personnalisé Vertex AI pour entraîner un modèle.
  • Entraîner un modèle TensorFlow
  • Déployer la ressource Model sur une ressource Endpoint de diffusion.
  • Effectuer une prédiction.
  • Annuler le déploiement de la ressource Model.

Dataset

L'ensemble de données utilisé pour ce tutoriel est l'ensemble de données consacré aux pingouins issu des ensembles de données publics BigQuery. Dans ce tutoriel, vous n'utilisez que les champs culmen_length_mm, culmen_depth_mm, flipper_length_mm et body_mass_g de l'ensemble de données pour prédire la espèces de pingouins (species).

Coûts

Ce tutoriel utilise des composants facturables de Google Cloud :

  • Vertex AI
  • Cloud Storage
  • BigQuery

Consultez les tarifs de Vertex AI, les tarifs de Cloud Storage et les tarifs de BigQuery, et utilisez le simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue.

Installation

Installez la dernière version des SDK Cloud Storage, BigQuery et Vertex AI pour Python.

# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        google-cloud-bigquery \
                        pyarrow

Colab uniquement : Annulez la mise en commentaire de la cellule suivante pour redémarrer le noyau.

# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

Avant de commencer

Définir votre ID de projet

Si vous ne connaissez pas votre ID de projet, essayez les opérations suivantes :

  • Exécutez gcloud config list.
  • Exécutez gcloud projects list.
  • Consultez la page d'assistance : Localiser l'ID du projet.
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Région

Vous pouvez également modifier la variable REGION utilisée par Vertex AI. Apprenez-en plus sur les régions Vertex AI.

REGION = "us-central1"  # @param {type: "string"}

Authentifier votre compte Google Cloud

Suivant votre environnement Jupyter, vous devrez peut-être vous authentifier manuellement. Suivez les instructions correspondantes ci-dessous.

1. Vertex AI Workbench

  • Vous n'avez rien à faire, car vous êtes déjà authentifié.

2. Dans l'instance JupyterLab locale, annulez la mise en commentaire de la commande suivante et exécutez-la :

# ! gcloud auth login

3. Dans Colab, annulez la mise en commentaire de la commande suivante et exécutez-la :

# from google.colab import auth
# auth.authenticate_user()

4. Compte de service ou autre

Créer un bucket Cloud Storage

Créez un bucket de stockage pour stocker les artefacts intermédiaires, tels que les ensembles de données.

BUCKET_URI = "gs://your-bucket-name-unique"  # @param {type:"string"}

Uniquement si votre bucket n'existe pas : exécutez la cellule suivante pour créer votre bucket Cloud Storage.

 gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Initialiser le SDK Vertex AI pour Python

Initialisez le SDK Vertex AI pour Python dans votre projet et le bucket correspondant.

from google.cloud import aiplatform

# Initialize the Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

Initialiser le client BigQuery

Initialisez le client BigQuery Python pour votre projet.

Pour utiliser BigQuery, assurez-vous que votre compte dispose du rôle "Utilisateur BigQuery".

from google.cloud import bigquery

# Set up BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

Prétraiter et diviser les données

Vous devez commencer par télécharger et prétraiter vos données pour l'entraînement et les tests.

  • Convertir les caractéristiques catégorielles en valeurs numériques
  • Supprimer les colonnes inutilisées
  • Supprimer les lignes inutilisables
  • Diviser les données d'entraînement et de test
import numpy as np
import pandas as pd

LABEL_COLUMN = "species"

# Define the BigQuery source dataset
BQ_SOURCE = "bigquery-public-data.ml_datasets.penguins"

# Define NA values
NA_VALUES = ["NA", "."]

# Download a table
table = bq_client.get_table(BQ_SOURCE)
df = bq_client.list_rows(table).to_dataframe()

# Drop unusable rows
df = df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()

# Convert categorical columns to numeric
df["island"], _ = pd.factorize(df["island"])
df["species"], _ = pd.factorize(df["species"])
df["sex"], _ = pd.factorize(df["sex"])

# Split into a training and holdout dataset
df_train = df.sample(frac=0.8, random_state=100)
df_holdout = df[~df.index.isin(df_train.index)]

Créer un ensemble de données tabulaire Vertex AI à partir d'un ensemble de données BigQuery

Créez une ressource d'ensemble de données tabulaires Vertex AI à partir de vos données d'entraînement BigQuery.

Pour en savoir plus, consultez la page suivante : https://cloud.google.com/vertex-ai/docs/training/using-managed-datasets

# Create BigQuery dataset
bq_dataset_id = f"{PROJECT_ID}.dataset_id_unique"
bq_dataset = bigquery.Dataset(bq_dataset_id)
bq_client.create_dataset(bq_dataset, exists_ok=True)
dataset = aiplatform.TabularDataset.create_from_dataframe(
    df_source=df_train,
    staging_path=f"bq://{bq_dataset_id}.table-unique",
    display_name="sample-penguins",
)

Entraîner un modèle

Il existe deux façons d'entraîner un modèle à l'aide d'une image de conteneur :

  • Utiliser un conteneur prédéfini Vertex AI. Si vous utilisez un conteneur d'entraînement prédéfini, vous devez également spécifier un package Python à installer dans l'image du conteneur. Ce package Python contient le code d'entraînement.

  • Utiliser votre propre image de conteneur personnalisée. Si vous utilisez votre propre conteneur, l'image de conteneur doit contenir votre code d'entraînement.

Vous allez utiliser un conteneur prédéfini pour cette démonstration.

Définir les arguments de commande du script d'entraînement

Préparez les arguments de ligne de commande à transmettre à votre script d'entraînement.

  • args : arguments de ligne de commande à transmettre au module Python correspondant. Dans cet exemple, il s'agit des éléments suivants :
    • label_column : colonne d'étiquette de vos données à prédire.
    • epochs : nombre d'époques pour l'entraînement.
    • batch_size : nombre de tailles de lots pour l'entraînement.
JOB_NAME = "custom_job_unique"

EPOCHS = 20
BATCH_SIZE = 10

CMDARGS = [
    "--label_column=" + LABEL_COLUMN,
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
]

Script d'entraînement

Dans la cellule suivante, écrivez le contenu du script d'entraînement, task.py. En résumé, le script effectue les opérations suivantes :

  • Il charge les données de la table BigQuery à l'aide de la bibliothèque cliente BigQuery pour Python.
  • Il crée un modèle à l'aide de l'API de modèle TF.Keras.
  • Il compile le modèle (compile()).
  • Définit une stratégie de distribution de l'entraînement en fonction de l'argument args.distribute.
  • Il entraîne le modèle (fit()) avec des époques et une taille de lot en fonction des arguments args.epochs et args.batch_size.
  • Il récupère le répertoire dans lequel enregistrer les artefacts de modèle à partir de la variable d'environnement AIP_MODEL_DIR. Cette variable est définie par le service d'entraînement.
  • Il enregistre le modèle entraîné dans le répertoire de modèle.

REMARQUE : Pour améliorer les performances du modèle, il est recommandé de normaliser les entrées de votre modèle avant l'entraînement. Pour en savoir plus, consultez le tutoriel TensorFlow de la page https://www.tensorflow.org/tutorials/structured_data/preprocessing_layers#numerical_columns.

REMARQUE : Le code d'entraînement suivant nécessite que vous accordiez le rôle "Utilisateur de sessions de lecture BigQuery" au compte d'entraînement. Pour savoir comment trouver ce compte, consultez la page "https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents".

%%writefile task.py

import argparse
import numpy as np
import os

import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.cloud import storage

# Read environmental variables
training_data_uri = os.getenv("AIP_TRAINING_DATA_URI")
validation_data_uri = os.getenv("AIP_VALIDATION_DATA_URI")
test_data_uri = os.getenv("AIP_TEST_DATA_URI")

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--label_column', required=True, type=str)
parser.add_argument('--epochs', default=10, type=int)
parser.add_argument('--batch_size', default=10, type=int)
args = parser.parse_args()

# Set up training variables
LABEL_COLUMN = args.label_column

# See https://cloud.google.com/vertex-ai/docs/workbench/managed/executor#explicit-project-selection for issues regarding permissions.
PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]
bq_client = bigquery.Client(project=PROJECT_NUMBER)

# Download a table
def download_table(bq_table_uri: str):
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]

    # Download the BigQuery table as a dataframe
    # This requires the "BigQuery Read Session User" role on the custom training service account.
    table = bq_client.get_table(bq_table_uri)
    return bq_client.list_rows(table).to_dataframe()

# Download dataset splits
df_train = download_table(training_data_uri)
df_validation = download_table(validation_data_uri)
df_test = download_table(test_data_uri)

def convert_dataframe_to_dataset(
    df_train: pd.DataFrame,
    df_validation: pd.DataFrame,
):
    df_train_x, df_train_y = df_train, df_train.pop(LABEL_COLUMN)
    df_validation_x, df_validation_y = df_validation, df_validation.pop(LABEL_COLUMN)

    y_train = np.asarray(df_train_y).astype("float32")
    y_validation = np.asarray(df_validation_y).astype("float32")

    # Convert to numpy representation
    x_train = np.asarray(df_train_x)
    x_test = np.asarray(df_validation_x)

    # Convert to one-hot representation
    num_species = len(df_train_y.unique())
    y_train = tf.keras.utils.to_categorical(y_train, num_classes=num_species)
    y_validation = tf.keras.utils.to_categorical(y_validation, num_classes=num_species)

    dataset_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset_validation = tf.data.Dataset.from_tensor_slices((x_test, y_validation))
    return (dataset_train, dataset_validation)

# Create datasets
dataset_train, dataset_validation = convert_dataframe_to_dataset(df_train, df_validation)

# Shuffle train set
dataset_train = dataset_train.shuffle(len(df_train))

def create_model(num_features):
    # Create model
    Dense = tf.keras.layers.Dense
    model = tf.keras.Sequential(
        [
            Dense(
                100,
                activation=tf.nn.relu,
                kernel_initializer="uniform",
                input_dim=num_features,
            ),
            Dense(75, activation=tf.nn.relu),
            Dense(50, activation=tf.nn.relu),
            Dense(25, activation=tf.nn.relu),
            Dense(3, activation=tf.nn.softmax),
        ]
    )

    # Compile Keras model
    optimizer = tf.keras.optimizers.RMSprop(lr=0.001)
    model.compile(
        loss="categorical_crossentropy", metrics=["accuracy"], optimizer=optimizer
    )

    return model

# Create the model
model = create_model(num_features=dataset_train._flat_shapes[0].dims[0].value)

# Set up datasets
dataset_train = dataset_train.batch(args.batch_size)
dataset_validation = dataset_validation.batch(args.batch_size)

# Train the model
model.fit(dataset_train, epochs=args.epochs, validation_data=dataset_validation)

tf.saved_model.save(model, os.getenv("AIP_MODEL_DIR"))

Entraîner le modèle

Définissez votre fichier TrainingPipeline personnalisé dans Vertex AI.

Utilisez la classe CustomTrainingJob pour définir la classe TrainingPipeline. La classe utilise les paramètres suivants :

  • display_name : nom défini par l'utilisateur de ce pipeline d'entraînement.
  • script_path : chemin d'accès local au script d'entraînement.
  • container_uri : URI de l'image du conteneur d'entraînement.
  • requirements : liste des dépendances de packages Python du script.
  • model_serving_container_image_uri : URI d'un conteneur pouvant diffuser des prédictions pour votre modèle (soit un conteneur prédéfini, soit un conteneur personnalisé).

Utilisez la fonction run pour commencer l'entraînement. La fonction inclut les paramètres suivants :

  • args : arguments de ligne de commande à transmettre au script Python.
  • replica_count: nombre d'instances dupliquées de nœuds de calcul.
  • model_display_name : nom à afficher du Model si le script produit un Model géré.
  • machine_type : type de machine à utiliser pour l'entraînement.
  • accelerator_type : type d'accélérateur matériel.
  • accelerator_count : nombre d'accélérateurs à associer à une instance dupliquée de nœud de calcul.

La fonction run crée un pipeline d'entraînement qui crée et entraîne un objet Model. Une fois le pipeline d'entraînement terminé, la fonction run renvoie l'objet Model.

job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-8:latest",
    requirements=["google-cloud-bigquery>=2.20.0", "db-dtypes"],
    model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
)

MODEL_DISPLAY_NAME = "penguins_model_unique"

# Start the training
model = job.run(
    dataset=dataset,
    model_display_name=MODEL_DISPLAY_NAME,
    bigquery_destination=f"bq://{PROJECT_ID}",
    args=CMDARGS,
)

Déployer le modèle

Avant d'utiliser votre modèle pour effectuer des prédictions, vous devez le déployer sur un point de terminaison Endpoint. Pour ce faire, appelez la fonction deploy sur la ressource Model. Cette opération a deux effets :

  1. La création d'une ressource Endpoint pour déployer la ressource Model.
  2. Le déploiement de la ressource Model sur la ressource Endpoint.

La fonction inclut les paramètres suivants :

  • deployed_model_display_name : nom lisible du modèle déployé.
  • traffic_split : pourcentage de trafic au niveau du point de terminaison menant à ce modèle, spécifié en tant que dictionnaire d'une ou plusieurs paires clé/valeur.
    • Si un seul modèle est utilisé, spécifiez { "0": 100 }, où "0" fait référence au modèle importé et "100" signifie 100 % du trafic.
    • S'il existe sur le point de terminaison des modèles pour lesquels le trafic est réparti, utilisez model_id pour spécifier { "0": percent, model_id: percent, ... }, où model_id est l'ID d'un élément DeployedModel existant sur le point de terminaison. La somme des pourcentages doit être égale à 100.
  • machine_type : type de machine à utiliser pour l'entraînement.
  • accelerator_type : type d'accélérateur matériel.
  • accelerator_count : nombre d'accélérateurs à associer à une instance dupliquée de nœud de calcul.
  • starting_replica_count : nombre d'instances de calcul à configurer initialement.
  • max_replica_count : nombre maximal d'instances de calcul pour le scaling. Dans ce tutoriel, une seule instance est provisionnée.

Répartition du trafic

Le paramètre traffic_split est spécifié en tant que dictionnaire Python. Vous pouvez déployer plusieurs instances de votre modèle sur un point de terminaison, puis définir le pourcentage de trafic envoyé à chaque instance.

Vous pouvez utiliser une répartition du trafic pour introduire progressivement un nouveau modèle en production. Par exemple, si vous avez un modèle existant en production qui reçoit 100 % du trafic, vous pouvez déployer un nouveau modèle sur le même point de terminaison, lui attribuer 10 % du trafic et réduire le trafic du modèle d'origine à 90 %. Cela vous permet de surveiller les performances du nouveau modèle tout en minimisant les perturbations pour la majorité des utilisateurs.

Scaling d'instances de calcul

Vous pouvez spécifier une seule instance (ou un seul nœud) pour diffuser vos requêtes de prédiction en ligne. Dans ce tutoriel, nous utilisons un seul nœud. Les variables MIN_NODES et MAX_NODES sont donc toutes deux définies sur 1.

Si vous souhaitez utiliser plusieurs nœuds pour répondre à vos requêtes de prédiction en ligne, définissez MAX_NODES sur le nombre maximal de nœuds à utiliser. Vertex AI adapte le nombre de nœuds utilisés pour diffuser vos prédictions, dans la limite du nombre maximal que vous avez défini. Consultez la page des tarifs pour comprendre le coût de l'autoscaling avec plusieurs nœuds.

Endpoint

La méthode deploy attend que le modèle soit déployé et renvoie un objet Endpoint. Si c'est la première fois qu'un modèle est déployé sur le point de terminaison, le provisionnement des ressources peut prendre quelques minutes de plus.

DEPLOYED_NAME = "penguins_deployed_unique"

endpoint = model.deploy(deployed_model_display_name=DEPLOYED_NAME)

Envoyer une requête de prédiction en ligne

Envoyez une requête de prédiction en ligne au modèle déployé.

Préparer les données de test

Préparez les données de test en les convertissant en liste Python.

df_holdout_y = df_holdout.pop(LABEL_COLUMN)
df_holdout_x = df_holdout

# Convert to list representation
holdout_x = np.array(df_holdout_x).tolist()
holdout_y = np.array(df_holdout_y).astype("float32").tolist()

Envoyer la requête de prédiction

Maintenant que vous avez des données de test, vous pouvez les utiliser pour envoyer une requête de prédiction. Utilisez la fonction predict de l'objet Endpoint, qui accepte les paramètres suivants :

  • instances : liste des instances de mesure des pingouins. Selon votre modèle personnalisé, chaque instance doit être un tableau de nombres. Vous avez préparé cette liste à l'étape précédente.

La fonction predict renvoie une liste, où chaque élément de la liste correspond à une instance de la requête. Pour chaque prédiction, le résultat suivant s'affiche :

  • Niveau de confiance de la prédiction (predictions), entre 0 et 1 et pour chacune des dix classes.

Vous pouvez ensuite exécuter une évaluation rapide sur les résultats de la prédiction :

  1. np.argmax: convertir chaque liste de niveaux de confiance en une étiquette
  2. Sortie des prédictions
predictions = endpoint.predict(instances=holdout_x)
y_predicted = np.argmax(predictions.predictions, axis=1)

y_predicted

Annuler le déploiement de modèles

Pour annuler le déploiement de toutes les ressources Model de la ressource Endpoint de diffusion, utilisez la méthode undeploy_all du point de terminaison.

endpoint.undeploy_all()

Effectuer un nettoyage

Pour nettoyer toutes les ressources Google Cloud utilisées dans ce projet, vous pouvez supprimer le projet Google Cloud que vous avez utilisé dans le cadre de ce tutoriel.

Sinon, vous pouvez supprimer les ressources individuelles que vous avez créées dans ce tutoriel :

  • Tâche d'entraînement
  • Model
  • Point de terminaison
  • Bucket Cloud Storage
import os

# Delete the training job
job.delete()

# Delete the model
model.delete()

# Delete the endpoint
endpoint.delete()

# Warning: Setting this to true deletes everything in your bucket
delete_bucket = False

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI