Comparer des modèles de machine learning pour réaliser des prédictions dans les pipelines Cloud Dataflow

Last reviewed 2018-09-07 UTC

Cette solution décrit et compare les différentes approches de conception permettant d'appeler un modèle de machine learning (ML) dans le cadre d'un pipeline Dataflow, et examine les compromis nécessaires selon l'approche choisie. Nous présentons les résultats d'une série d'expériences que nous avons menées pour explorer ces différentes approches et illustrer ces compromis, tant dans les pipelines de traitement par lot que par flux. Cette solution est conçue pour les personnes qui intègrent des modèles entraînés dans des pipelines de traitement de données, plutôt que pour les data scientists qui souhaitent créer des modèles de machine learning.

Présentation

En tant que responsable de l'intégration de ce modèle de ML dans le pipeline Dataflow, vous cherchez probablement à comprendre quelles sont les différentes approches et quelle est celle qui convient le mieux à la configuration système requise. Pour cela, tenez compte de divers critères, tels que :

  • le débit ;
  • la latence ;
  • le coût ;
  • la mise en œuvre ;
  • la maintenance.

Il n'est pas toujours facile d'équilibrer ces critères, mais cette solution peut vous aider à naviguer dans le processus de prise de décision en fonction de vos priorités. La solution compare trois approches de la réalisation de prédictions avec un modèle de machine learning entraîné par TensorFlow dans des pipelines de données par lot et par flux :

  • Utiliser un modèle déployé en tant qu'API REST/HTTP pour les pipelines de traitement par flux
  • Utiliser des tâches de prédiction par lot d'AI Platform pour les pipelines par lot
  • Utiliser la fonctionnalité de prédiction par modèle direct de Dataflow pour les pipelines de traitement par lot et par flux

Toutes les expériences utilisent un modèle entraîné existant, appelé "ensemble de données sur la natalité", qui prédit le poids des bébés en fonction de diverses données entrées. Dans la mesure où l'objectif de cette solution n'est pas de créer un modèle, elle ne décrit pas comment le modèle a été conçu ou entraîné. Consultez la section Étapes suivantes pour plus de détails concernant l'ensemble de données sur la natalité.

Plate-forme

Il existe différentes manières d'exécuter un pipeline de données et d'appeler un modèle de ML entraîné. Cependant, les exigences fonctionnelles sont toujours les mêmes :

  1. Ingérer les données depuis une source limitée (lot) ou illimitée (flux). Les données de capteur, les interactions sur un site Web et les transactions financières sont des exemples de sources.
  2. Transformer et enrichir les données d'entrée en appelant des modèles de ML pour les prédictions. Vous pouvez par exemple analyser un fichier JSON afin d'extraire les champs pertinents pour prédire une date de maintenance, émettre une recommandation produit, ou détecter une fraude.
  3. Stocker les données et les prédictions transformées pour des raisons d'analyse ou de sauvegarde ou pour un transfert vers un système de file d'attente afin de déclencher un nouvel événement ou des pipelines supplémentaires. La détection d'une fraude potentielle en temps réel ou le stockage d'informations de planification de maintenance dans un magasin accessible à partir d'un tableau de bord sont des exemples de stockage de données.

Lorsque vous transformez et enrichissez des données avec des prédictions dans un processus ETL (Extract, Transform, Load) par lot, vous cherchez à maximiser les débits afin de réduire le temps total nécessaire au traitement de l'ensemble du lot de données. D'autre part, lorsque vous traitez des flux de données pour la prédiction en ligne, vous essayez de minimiser la latence afin de recevoir chaque prédiction en temps (quasi) réel. Ainsi, appeler le modèle peut créer un goulot d'étranglement.

Composants principaux

Dans cette solution, les expériences de traitement par lot et par flux font appel à trois technologies principales :

  • Apache Beam, qui s'exécute sur Dataflow pour traiter les données ;
  • TensorFlow afin de mettre en œuvre et d'entraîner le modèle ML ;
  • et pour certaines expériences, AI Platform comme plate-forme d'hébergement pour les modèles ML entraînés afin d'effectuer des prédictions par lot ou en ligne.

Nous avons choisi Apache Beam sous Dataflow pour exécuter des pipelines de données dans cette solution pour les raisons suivantes :

  • Apache Beam est un modèle de programmation unifiée Open Souce qui exécute des tâches de traitement de données par flux ou par lot.
  • Dataflow est un produit Google Cloud qui peut exécuter des tâches Apache Beam sans serveur.

TensorFlow est une bibliothèque mathématique Open Source de Google, utilisée comme framework de machine learning. TensorFlow permet de créer, d'entraîner et de diffuser des modèles sur une seule machine ou dans des environnements distribués. Les modèles sont portables vers différents appareils et peuvent également exploiter les ressources de type processeur, GPU ou TPU disponibles pour l'entraînement et la diffusion.

AI Platform est une plate-forme sans serveur qui peut entraîner, ajuster (à l'aide de la fonctionnalité de réglage des hyperparamètres) et diffuser des modèles TensorFlow à grande échelle, n'exigeant qu'une gestion minimale de la part des équipes DevOps. AI Platform est compatible avec le déploiement des modèles entraînés comme API REST pour les prédictions en ligne, ainsi que l'envoi de tâches de prédiction par lot. AI Platform est l'une des nombreuses options permettant de diffuser un modèle comme microservice.

Les approches détaillées dans cette solution utilisent Dataflow pour le pipeline de traitement des données et AI Platform pour héberger les modèles en tant que points de terminaison HTTP. Cependant, il est possible d'utiliser d'autres technologies. Les comparaisons de performances entre HTTP et un modèle TensorFlow direct ne révèlent pas une énorme différence.

Traiter des données par lot et par flux

Les expériences de cette solution incluent des cas d'utilisation par lot et par flux. Chaque expérience exploite différents produits Google Cloud pour les entrées et les sorties, car les sources illimitées et limitées présentent des exigences opérationnelles différentes.

Traitement par lot d'un ensemble de données limité

La figure 1 montre que, dans les pipelines de traitement par lot classiques, les données d'entrée brutes sont stockées dans un stockage d'objets, tel que Cloud Storage. Les formats utilisés pour le stockage des données structurées incluent CSV (Comma-Separated Values), ORC (Optimized Row Columnar), Parquet et Avro. Ces formats sont souvent utilisés lorsque les données proviennent de bases de données ou de journaux.

Architecture des pipelines de traitement par lot types
Figure 1. Architecture de traitement par lot

Certaines plates-formes d'analyse, telles que BigQuery, offrent un espace de stockage en plus des fonctionnalités de requête. BigQuery utilise Capacitor pour le stockage. Apache Beam sous Dataflow peut lire et écrire dans BigQuery et Cloud Storage, en plus d'autres options de stockage pour les pipelines de traitement par lot.

Traitement par flux d'un flux de données illimité

Dans le cadre du traitement par flux, les entrées dans un pipeline de traitement des données se font généralement sous forme d'un système de messagerie, comme l'illustre la figure 2. Des technologies telles que Pub/Sub ou Kafka sont généralement utilisées pour ingérer des points de données individuels au format JSON, CSV ou Protobuf.

Architecture des pipelines de traitement par flux types
Figure 2. Architecture de traitement par flux

Les points de données peuvent être traités individuellement ou sous forme de groupes de micro-lots en utilisant des fonctions de fenêtrage pour effectuer le traitement des événements temporels. Les données traitées peuvent être acheminées vers plusieurs destinations, y compris :

  1. BigQuery pour les analyses ad hoc, via les API de diffusion
  2. Bigtable pour diffuser des informations en temps réel
  3. Un sujet Pub/Sub pour le déclenchement de processus/pipelines ultérieurs

La liste complète des connecteurs source (entrées) et des récepteurs (sorties) destinés aux récepteurs de sources de données limitées et illimitées se trouve sur la page qui décrit les E/S Apache Beam.

Appeler un modèle TensorFlow

Un modèle entraîné par TensorFlow peut être appelé de trois manières :

  1. via un point de terminaison HTTP pour la prédiction en ligne ;
  2. directement, avec le fichier de modèle enregistré pour les prédictions par lot et en ligne ;
  3. via une tâche de prédiction par lot AI Platform pour une prédiction par lot.

Points de terminaison HTTP pour la prédiction en ligne

Les modèles TensorFlow sont déployés en tant que points de terminaison HTTP à appeler et fournissent des prédictions en temps réel, via un pipeline de traitement de données par flux ou via des applications clientes.

Vous pouvez déployer un modèle TensorFlow comme point de terminaison HTTP pour les prédictions en ligne à l'aide de TensorFlow Serving ou de tout autre service d'hébergement comme Seldon. Comme le montre la figure 3, vous pouvez choisir l'une des options suivantes :

  1. Déployer le modèle vous-même sur une ou plusieurs instances Compute Engine
  2. Utiliser une image Docker sur Compute Engine ou Google Kubernetes Engine
  3. Utiliser Kubeflow pour faciliter le déploiement sur Kubernetes ou Google Kubernetes Engine
  4. Utiliser App Engine avec Endpoints pour héberger le modèle dans une application Web
  5. Utiliser AI Platform, le service de diffusion et d'entraînement de ML entièrement géré sur Google Cloud
Options de Dataflow permettant de diffuser un modèle en tant que point de terminaison HTTP
Figure 3. Les différentes options de Dataflow permettant de diffuser un modèle en tant que point de terminaison HTTP

Service entièrement géré, AI Platform est plus facile à mettre en œuvre que les autres options. Par conséquent, dans nos expériences, nous l'utilisons comme option permettant de servir le modèle en tant que point de terminaison HTTP. Nous pouvons ensuite nous concentrer sur les performances d'un modèle direct par rapport à un point de terminaison HTTP dans AI Platform, plutôt que de comparer les différentes options de diffusion de modèle HTTP.

Diffuser les prédictions en ligne avec AI Platform Prediction

Deux tâches sont nécessaires afin de diffuser les prédictions en ligne :

  1. Déployer un modèle
  2. Interagir avec le modèle déployé pour une interférence (c'est-à-dire faire des prédictions)

Pour déployer un modèle comme point de terminaison HTTP à l'aide d'AI Platform Prediction, procédez comme suit :

  1. Assurez-vous que les fichiers du modèle entraîné sont disponibles dans Cloud Storage.
  2. Créez un modèle à l'aide de la commande gcloud ml-engine models create.
  3. Déployez une version de modèle à l'aide de la commande gcloud ml-engine versions create, les fichiers du modèle étant dans Cloud Storage.

Vous pouvez déployer un modèle via les commandes suivantes :


PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

Ce code crée un modèle AI Platform Prediction appelé "babyweight_estimator" dans le projet Google Cloud, avec la version de modèle v1.

Une fois le modèle déployé, vous pouvez l'appeler. Le code Python suivant montre comment appeler une version du modèle dans AI Platform Prediction en tant qu'API REST :

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

Si vous disposez d'un ensemble de données volumineux disponible dans BigQuery ou dans Cloud Storage, et que vous souhaitez optimiser le débit du processus global, il n'est pas recommandé de diffuser votre modèle de ML comme point de terminaison HTTP pour la prédiction par lot. Cette manière de procéder génère en effet une requête HTTP pour chaque point de données, ce qui entraîne un volume considérable de requêtes HTTP. La section suivante présente des options plus adaptées pour la prédiction par lot.

Modèle direct pour les prédictions par lot et en ligne

La technique de prédiction par modèle direct s'appuie sur un SavedModel TensorFlow local sur les instances Dataflow. Le modèle enregistré est une copie des fichiers de sortie créés après la conception et l'entraînement du modèle TensorFlow. Le SavedModel TensorFlow peut être :

  • une partie du code source du pipeline envoyée en tant que tâche Dataflow ;
  • téléchargé à partir de Cloud Storage, comme l'illustre la figure 4.
Prédiction par modèle direct dans Dataflow
Figure 4. Prédiction par modèle direct dans Dataflow

Dans cette solution, nous utilisons un SavedModel qui fait partie du code source sur GitHub. Pour charger un modèle sur les instances, procédez comme suit :

  1. Lorsque vous créez la tâche Dataflow, spécifiez les dépendances de pipeline à charger, y compris le fichier de modèle. Le code Python suivant montre le fichier setup.py contenant les fichiers de modèle à envoyer avec la tâche Dataflow.

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. Appelez les fichiers de modèle locaux pendant le pipeline. Cette action produit la prédiction pour les instances données. Le code Python suivant montre comment procéder.

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

Consultez la page sur les dépendances de plusieurs fichiers Apache Beam pour obtenir plus de détails.

Tâche de prédiction par lot AI Platform Prediction

Outre le déploiement du modèle comme point de terminaison HTTP, AI Platform permet d'exécuter une tâche de prédiction par lot à l'aide d'une version du modèle déployé ou d'un SavedModel TensorFlow dans Cloud Storage.

Une tâche de prédiction par lot AI Platform se sert de l'emplacement Cloud Storage des fichiers de données d'entrée comme paramètre. Elle utilise le modèle pour obtenir des prédictions pour ces données, puis stocke les résultats dans un autre emplacement de sortie Cloud Storage, également défini en tant que paramètre. L'exemple suivant montre les commandes gcloud qui envoient une tâche de prédiction par lot AI Platform.

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

Traitement point par point comparé au microtraitement par lot pour la prédiction en ligne

Dans les pipelines de prédiction en temps réel, que vous utilisiez le modèle en tant que point de terminaison HTTP ou directement à partir des nœuds de calcul, vous avez deux manières d'obtenir des prédictions pour les points de données entrants :

  • Point individuel. L'option logique consiste à envoyer chaque point de données au modèle de façon individuelle et à obtenir une prédiction.
  • Micro-lots. Une option plus optimisée consiste à utiliser une fonction de fenêtrage pour créer des micro-lots, regroupant des points de données sur une période prédéfinie, par exemple toutes les 5 secondes. Le micro-lot est ensuite envoyé au modèle pour obtenir des prédictions de façon simultanée pour toutes les instances.

Le code Python suivant montre comment créer des micro-lots temporels à l'aide d'une fonction de fenêtrage dans un pipeline Apache Beam.

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

L'approche "microtraitement" utilise des modèles déployés en tant que points de terminaison HTTP, ce qui réduit considérablement le nombre de requêtes HTTP et la latence. Même lorsque la technique de microtraitement est utilisée avec le modèle direct, l'envoi d'un Tensor avec N instances de prédiction au modèle est plus efficace que d'envoyer un Tensor de longueur 1 en raison des opérations vectorisées.

Expériences de traitement par lot

Dans les expériences de traitement par lot, notre but est d'estimer le poids des bébés dans l'ensemble de données sur la natalité de BigQuery à l'aide d'un modèle de régression TensorFlow. Nous allons ensuite enregistrer les résultats de la prédiction dans Cloud Storage sous forme de fichiers CSV à l'aide d'un pipeline de traitement par lot Dataflow. La section suivante décrit les différentes expériences que nous avons menées pour accomplir cette tâche.

Approche 1 : Dataflow avec prédiction par modèle direct

Dans cette approche, les nœuds de calcul Dataflow hébergent le SavedModel TensorFlow appelé directement pour la prédiction pendant le pipeline de traitement par lot de chaque enregistrement. La figure 5 montre l'architecture de haut niveau de cette approche.

Approche "traitement par lot" 1 : Dataflow avec prédiction par modèle direct
Figure 5. Approche de traitement par lot 1 : Dataflow avec prédiction par modèle direct

Le pipeline Dataflow effectue les étapes suivantes :

  1. Lecture des données issues de BigQuery
  2. Préparation de l'enregistrement BigQuery pour la prédiction
  3. Appel du SavedModel TensorFlow local pour obtenir une prédiction pour chaque enregistrement
  4. Conversion du résultat (enregistrement d'entrée et poids estimés des bébés) en fichier CSV
  5. Écriture du fichier CSV dans Cloud Storage

Dans cette approche, il n'y a pas d'appels aux services distants, comme par exemple le modèle déployé sur AI Platform en tant que point de terminaison HTTP. La prédiction est effectuée localement au sein de chaque nœud de calcul Dataflow à l'aide du SavedModel TensorFlow.

Approche 2 : Dataflow avec prédiction par lot AI Platform

Dans cette approche, le SavedModel TensorFlow est stocké dans Cloud Storage et utilisé par AI Platform pour la prédiction. Toutefois, au lieu d'effectuer un appel API au modèle déployé pour chaque enregistrement comme avec l'approche précédente, les données sont préparées pour la prédiction et envoyées en tant que lot.

Cette approche comporte deux phases :

  1. Dataflow prépare les données issues de BigQuery pour la prédiction, puis les stocke dans Cloud Storage.
  2. La tâche de prédiction par lot AI Platform est envoyée avec les données préparées et les résultats de prédiction sont stockés dans Cloud Storage.

La figure 6 illustre l'architecture globale de cette approche en deux phases.

Approche "traitement par lot" 2 : Dataflow avec prédiction par lot AI Platform
Figure 6. Approche de traitement par lot 2 : Dataflow avec prédiction par lot AI Platform

Les étapes du workflow, qui inclut le pipeline Dataflow, sont les suivantes :

  1. Lecture des données issues de BigQuery
  2. Préparation de l'enregistrement BigQuery pour la prédiction
  3. Écriture des données JSON dans Cloud Storage. La fonction serving_fn dans le modèle s'attend à des instances JSON comme entrées.
  4. Envoi d'une tâche de prédiction par lot AI Platform avec les données préparées dans Cloud Storage. Cette tâche écrit aussi les résultats de prédiction dans Cloud Storage.

La tâche Dataflow prépare les données pour la prédiction plutôt que d'envoyer la tâche de prédiction AI Platform. En d'autres termes, la tâche de préparation des données et la tâche de prédiction par lot ne sont pas étroitement liées. Cloud Functions, Airflow ou tout autre programmeur peut orchestrer le workflow en exécutant la tâche Dataflow, puis en envoyant la tâche AI Platform pour la prédiction par lot.

La prédiction par lot AI Platform est recommandée en termes de performance et de facilité d'utilisation si vos données répondent aux critères suivants :

  • Vos données sont disponibles dans Cloud Storage, au format attendu pour la prédiction, à partir d'un processus précédent d'ingestion de données.
  • Vous ne contrôlez pas la première phase du workflow, comme par exemple le pipeline Dataflow qui prépare les données dans Cloud Storage pour la prédiction.

Configurations expérimentales

Nous avons utilisé les configurations suivantes dans trois expériences :

  • Taille des données : 10K, 100K, 1M et 10M lignes
  • Classe Cloud Storage : Regional Storage
  • Emplacement Cloud Storage : europe-west1-b
  • Région Dataflow : europe-west1-b
  • Type de machine de nœud de calcul Dataflow : n1-standard-1
  • Autoscaling Dataflow pour les données par lot jusqu'à un million d'enregistrements
  • Paramètre num_worker de Dataflow : 20 pour les données par lot jusqu'à 10 millions d'enregistrements
  • Paramètre max-worker-count de prédiction par lot AI Platform : 20

L'emplacement Cloud Storage et la région Dataflow doivent être identiques. Cette solution utilise la région europe-west1-b comme valeur arbitraire.

Résultats

Le tableau suivant récapitule les résultats (durées) de l'exécution des prédictions par lot et des prédictions par modèle direct avec des ensembles de données de différentes tailles.

Taille des données par lot Métrique Dataflow puis prédiction par lot AI Platform Dataflow avec prédiction par modèle direct
10 000 lignes Temps d'exécution 15 min 30 s

(Dataflow : 7 min 47 s + 
AI Platform : 7 min 43 s)
8 min 24 s
Temps total des processeurs virtuels 0,301 h

(Dataflow : 0,151 h + 
AI Platform : 0,15 h)
0,173 h
100 000 lignes Temps d'exécution 16 min 37 s

(Dataflow : 8 min 39 s + 
AI Platform : 7 min 58 s)
10 min 31 s
Temps total des processeurs virtuels 0,334 h

(Dataflow : 0,184 h + 
AI Platform : 0,15 h)
0,243 h
1 million de lignes Temps d'exécution 21 min 11 s
(Dataflow : 11 min 07 s + 
AI Platform : 10 min 04 s)
17 min 12 s
Temps total des processeurs virtuels 0,446 h

(Dataflow : 0,256 h + 
AI Platform : 0,19 h)
1,115 h
10 millions de lignes Temps d'exécution 33 min 08 s
(Dataflow : 12 min 15 s + 
AI Platform : 20 min 53 s)
25 min 2 s
Temps total des processeurs virtuels 5,251 h

(Dataflow : 3,581 h + 
AI Platform : 1,67 h)
7,878 h

La figure 7 illustre ces résultats sous forme de graphique.

Graphique représentant la durée des trois approches pour quatre tailles différentes d'ensembles de données
Figure 7. Graphique représentant la durée des trois approches pour quatre tailles différentes d'ensembles de données

Comme le montrent les résultats, la tâche de prédiction par lot AI Platform prend à elle seule moins de temps à produire des prédictions pour les données d'entrée, étant donné que les données sont déjà dans Cloud Storage au format utilisé pour la prédiction. Toutefois, lorsque la tâche de prédiction par lot est associée à une étape de prétraitement (extraction et préparation des données de BigQuery vers Cloud Storage pour la prédiction) et à une étape de post-traitement (stockage des données dans BigQuery), l'approche "modèle direct" produit des temps d'exécution plus intéressants de bout en bout. En outre, les performances de l'approche "prédiction par modèle direct" peuvent être optimisées à l'aide du microtraitement (dont nous discuterons plus loin dans le cadre des expériences de traitement par flux).

Expériences de traitement par flux

Dans les expériences de traitement par flux, le pipeline Dataflow lit les points de données d'un sujet Pub/Sub et écrit les données dans BigQuery à l'aide des API de diffusion. Le pipeline de traitement par flux Dataflow traite les données et obtient les prédictions à l'aide du modèle TensorFlow d'estimation du poids des bébés.

Le sujet reçoit des données d'un simulateur de flux, qui génère des points de données, c'est-à-dire des instances permettant d'estimer le poids des bébés, à un taux d'événements prédéfini par seconde. Cette expérience simule un exemple réel de source de données illimitée. Le code Python suivant simule le flux de données envoyé à un sujet Pub/Sub.

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

Approche 1 : Dataflow avec prédiction en ligne AI Platform

Dans cette approche, le modèle TensorFlow est déployé et hébergé dans AI Platform en tant qu'API REST. Le pipeline de traitement par flux Dataflow appelle l'API pour chaque message consommé à partir de Pub/Sub pour obtenir des prédictions. L'architecture de haut niveau de cette approche est illustrée à la figure 8.

Approche "traitement par flux" 1 : Dataflow avec prédiction en ligne AI Platform
Figure 8. Approche de traitement par flux 1 : Dataflow avec prédiction en ligne AI Platform La requête HTTP peut inclure un seul point de données ou un groupe de points de données dans un micro-lot.

Avec cette approche, le pipeline Dataflow effectue les étapes suivantes :

  1. Lecture des messages d'un sujet Pub/Sub
  2. Envoi d'une requête HTTP à l'API du modèle AI Platform pour obtenir des prédictions pour chaque message
  3. Écriture des résultats dans BigQuery à l'aide des API de diffusion

Le microtraitement par lot est une approche plus efficace. En d'autres termes, au lieu d'envoyer une requête HTTP à l'API REST du modèle pour chaque message lu à partir de Pub/Sub, Dataflow regroupe les messages reçus au cours d'un intervalle d'une seconde. Il envoie ensuite ce groupe de messages sous forme de micro-lot dans une requête HTTP unique à l'API du modèle. Avec cette approche, le pipeline Dataflow effectue les étapes suivantes :

  1. Lecture des messages à partir du sujet Pub/Sub
  2. Mise en œuvre d'une opération de fenêtrage de 1 seconde pour créer un micro-lot de messages
  3. Envoi d'une requête HTTP avec le micro-lot à l'API du modèle AI Platform pour obtenir des prédictions pour les messages
  4. Écriture des résultats dans BigQuery à l'aide des API de diffusion

Cette approche présente différents avantages :

  1. Réduction du nombre d'appels au service distant, comme par exemple le modèle AI Platform
  2. Réduction de la latence moyenne de la diffusion de chaque message
  3. Réduction du temps de traitement global du pipeline

Dans cette expérience, la durée de la fenêtre a été définie sur 1 seconde. Cependant, la taille du micro-lot, qui correspond au nombre de messages envoyés sous forme de lot au modèle AI Platform, varie. La taille du micro-lot dépend de la fréquence de génération des messages, à savoir le nombre de messages par seconde.

La section suivante décrit des expériences menées avec trois fréquences différentes : 50, 100 et 500 messages par seconde. Autrement dit, la taille du micro-lot est de 50, de 100 et de 500.

Approche 2 : Dataflow avec prédiction par modèle direct

Cette approche est identique à celle utilisée dans les expériences de traitement par lot. Le SavedModel TensorFlow est hébergé sur des nœuds de calcul Dataflow et est appelé pour réaliser des prédictions pendant le pipeline de traitement par flux pour chaque enregistrement. La figure 9 montre l'architecture de haut niveau de cette approche.

Approche "traitement par flux" 2 : Dataflow avec prédiction par modèle direct
Figure 9. Approche de traitement par flux 2 : Dataflow avec prédiction par modèle direct

Avec cette approche, le pipeline Dataflow effectue les étapes suivantes :

  1. Lecture des messages à partir du sujet Pub/Sub
  2. Appel du SavedModel TensorFlow local pour obtenir des prédictions pour chaque enregistrement
  3. Écriture des résultats dans BigQuery à l'aide des API de diffusion

La technique de microtraitement peut également être utilisée dans le pipeline de traitement par flux avec l'approche "prédiction par modèle direct". Au lieu d'envoyer le Tensor d'une instance de données au modèle, nous pouvons envoyer au modèle un Tensor de N instances de données (où N est égal aux messages reçus dans la fenêtre Dataflow). Cette technique utilise les opérations vectorisées du modèle TensorFlow et obtient plusieurs prédictions en parallèle.

Configurations expérimentales

Nous avons utilisé les configurations suivantes pour ces expériences :

  • Taille des données par flux : 10K records (messages)
  • Messages simulés par seconde (MPS) : 50, 100 et 500
  • Taille de la fenêtre (dans les expériences avec micro-lots) : 1 second
  • Région Dataflow : europe-west1-b
  • Type de machine de nœud de calcul Dataflow : n1-standard-1
  • Paramètre num_worker de Dataflow : 5 (pas d'autoscaling)
  • Nœuds d'API de modèle AI Platform : 3 (manualScale)

Résultats

Le tableau suivant récapitule les résultats des expériences de traitement par flux avec différents volumes de données (messages par seconde). La fréquence des messages fait référence au nombre de messages envoyés par seconde et le temps de simulation fait référence au temps d'envoi de tous les messages.

Fréquence des messages de flux Métrique Dataflow avec prédiction en ligne AI Platform   Dataflow avec prédiction par modèle direct  
    Message unique Microtraitement par lot Message unique Microtraitement par lot
50 messages/s

(Temps de simulation : 3 min 20 s)
Durée totale 9 min 34 s 7 min 44 s 3 min 43 s 3 min 22 s
100 messages/s

(Temps de simulation : 1 min 40 s)
Durée totale 6 min 03 s 4 min 34 s 1 min 51 s 1 min 41 s
500 messages/s

(Temps de simulation : 20 s)
Durée totale NA : quota de prédiction en ligne AI Platform par défaut 2 min 47 s 1 min 23 s 48 s

La figure 10 illustre ces résultats sous forme de graphique.

Graphique représentant les durées selon différentes approches et fréquences
Figure 10. Graphique représentant les durées selon différentes approches et fréquences

Comme le montrent les résultats, la technique de microtraitement par lot améliore les performances d'exécution tant dans la prédiction en ligne AI Platform que dans la prédiction par modèle direct. En outre, l'utilisation du modèle direct avec le pipeline de traitement par flux offre une amélioration des performances de 2 à 4 fois supérieure à celle de l'appel d'une API REST/HTTP externe pour la prédiction en ligne.

Conclusions

En fonction des approches décrites et des résultats des expériences menées, nous recommandons les approches suivantes.

Traitement par lot

  • Si vous créez votre pipeline de traitement de données par lot et que vous souhaitez effectuer une prédiction dans le pipeline, utilisez l'approche "modèle direct" pour obtenir de meilleures performances.
  • Améliorez les performances de l'approche "modèle direct" en créant des micro-lots de points de données avant d'appeler le modèle local de prédiction afin de tirer parti du chargement en parallèle des opérations vectorisées.
  • Si vos données sont stockées dans Cloud Storage au format attendu pour la prédiction, utilisez la prédiction par lot AI Platform afin d'optimiser les performances.
  • Utilisez AI Platform si vous souhaitez utiliser la puissance des GPU pour la prédiction par lot.
  • N'utilisez pas la prédiction en ligne AI Platform pour la prédiction par lot.

Traitement par flux

  • Utilisez le modèle direct dans le pipeline de traitement par flux pour optimiser les performances et réduire le temps de latence moyen. Les prédictions sont effectuées localement, sans appel HTTP aux services distants.
  • Dissociez le modèle de vos pipelines de traitement des données pour améliorer la gestion des modèles utilisés dans les prédictions en ligne. La meilleure approche consiste à diffuser votre modèle en tant que microservice indépendant via AI Platform ou tout autre service d'hébergement Web.
  • Déployez votre modèle en tant que service Web indépendant pour permettre à plusieurs pipelines de traitement des données et applications en ligne de consommer le service de modèle en tant que point de terminaison. En outre, les modifications apportées au modèle sont transparentes pour les applications et les pipelines qui le consomment.
  • Déployez plusieurs instances du service avec un équilibrage de charge pour améliorer l'évolutivité et la disponibilité du service Web du modèle. Avec AI Platform, il suffit de spécifier le nombre de nœuds (manualScaling) ou minNodes (autoScaling) dans le fichier de configuration .yaml lorsque vous déployez une version du modèle.
  • Si vous déployez le modèle dans un microservice distinct, cela peut entraîner des coûts supplémentaires selon l'infrastructure de service sous-jacente. Consultez les questions fréquentes sur les tarifs pour la prédiction en ligne AI Platform.
  • Utilisez le microtraitement dans votre pipeline de traitement de données par flux pour optimiser les meilleures performances avec le service par modèle direct et par modèle HTTP. Le microtraitement réduit le nombre de requêtes HTTP adressées au service de modèle et utilise les opérations vectorisées du modèle TensorFlow pour obtenir des prédictions.

Étapes suivantes