Vergleich von Modellen für maschinelles Lernen für Vorhersagen in Cloud Dataflow-Pipelines

Last reviewed 2018-09-07 UTC

In dieser Lösung werden die verschiedenen Designansätze zum Aufrufen eines Modells für maschinelles Lernen während einer Dataflow-Pipeline beschrieben und verglichen. Dabei wird auch betrachtet, welche Kompromisse bei dem jeweiligen Ansatz in Kauf genommen werden müssen. Wir stellen die Ergebnisse einer Reihe von Experimenten vor, mit denen wir verschiedene Ansätze untersucht haben, und illustrieren die besagten Kompromisse an Batch- und Stream-Verarbeitungspipelines. Diese Lösung ist eher für Nutzer gedacht, die trainierte Modelle in Datenverarbeitungspipelines integrieren, als für Data Scientists, die Modelle für maschinelles Lernen erstellen möchten.

Einführung

Wenn Sie für die Einbindung dieses ML-Modells in die Dataflow-Pipeline verantwortlich sind, möchten Sie vielleicht wissen, welche Ansätze es gibt und welcher den Systemanforderungen am besten entspricht. Dabei müssen Sie verschiedene Faktoren berücksichtigen, zum Beispiel:

  • Durchsatz
  • Latenz
  • Kosten
  • Implementierung
  • Wartung

Es ist nicht immer leicht, diese Faktoren unter einen Hut zu bringen. Mithilfe dieser Lösung können Sie jedoch den Entscheidungsprozess anhand Ihrer Prioritäten steuern. In der Lösung werden drei Ansätze verglichen, in Batch- und Stream-Datenpipelines mit einem TensorFlow-trainierten Modell für maschinelles Lernen (ML) Vorhersagen zu treffen:

  • Ein erstelltes Modell als REST/HTTP API für Streamingpipelines verwenden
  • Batchvorhersagejobs in AI Platform für Batchpipelines verwenden
  • Direkte Modellvorhersagen in Dataflow für Batch- und Streamingpipelines verwenden

Bei allen Experimenten wird ein vorhandenes trainiertes Modell eingesetzt, das sogenannte Natality-Dataset. Damit wird anhand verschiedener Eingaben das Gewicht von Babys prognostiziert. Da es bei dieser Lösung nicht darum geht, ein Modell zu erstellen, gehen wir auf das Erstellen und Trainieren dieses Modells nicht weiter ein. Nähere Angaben zum Natality-Dataset finden Sie im Abschnitt Weitere Informationen.

Plattform

Es gibt verschiedene Möglichkeiten, eine Datenpipeline auszuführen und ein trainiertes ML-Modell aufzurufen. Die funktionalen Anforderungen sind jedoch immer gleich:

  1. Datenaufnahme aus einer begrenzten (Batch) oder unbegrenzten Quelle (Streaming). Beispiele für Quellen, aus denen Daten aufgenommen werden können, sind Sensordaten, Website-Interaktionen und Finanztransaktionen.
  2. Transformation und Anreicherung der Eingabedaten. Dazu werden ML-Modelle für Vorhersagen aufgerufen. Ein Beispiel ist das Parsen einer JSON-Datei zu dem Zweck, relevante Felder zu extrahieren und damit ein Wartungsdatum vorherzusagen, ein Produkt zu empfehlen oder Betrug zu erkennen.
  3. Speichern der transformierten Daten und Vorhersagen für Analysen, für Sicherungen oder zur Übergabe an ein Warteschlangensystem, um ein neues Ereignis oder weitere Pipelines auszulösen. Beispiele dafür sind das Erkennen potenzieller Betrugsversuche in Echtzeit oder das Speichern von Informationen zu Wartungsplänen in einem Speicher, auf den über ein Dashboard zugegriffen werden kann.

Wenn Sie Daten in einem Batch-ETL-Prozess transformieren und mit Vorhersagen anreichern, geht es darum, den Durchsatz zu maximieren und damit die Gesamtverarbeitungszeit für den Datenbatch zu reduzieren. Wenn Sie dagegen Streaming-Daten für Online-Vorhersagen verarbeiten, geht es darum, die Latenz zu minimieren, um jede Vorhersage (nahezu) in Echtzeit zu erhalten. Daher kann das Aufrufen des Modells zu einem Engpass werden.

Kernkomponenten

Bei den Batch- und Streamingexperimenten dieser Lösung werden hauptsächlich drei Technologien eingesetzt:

  • Apache Beam wird auf Dataflow ausgeführt, um die Daten zu verarbeiten.
  • TensorFlow wird zur Implementierung und zum Training des ML-Modells eingesetzt.
  • Bei einigen Experimenten wird AI Platform als Hostingplattform für die trainierten ML-Modelle eingesetzt, um Vorhersagen im Batch und online durchzuführen.

Wir haben bei dieser Lösung Apache Beam, ausgeführt auf Dataflow, für die Datenpipelines ausgewählt. Dafür gibt es folgende Gründe:

  • Apache Beam ist ein einheitliches Open-Source-Programmiermodell, mit dem sowohl Streaming- als auch Batchdatenverarbeitungsjobs ausgeführt werden können.
  • Dataflow ist ein Google Cloud-Produkt, das Apache Beam-Jobs ohne Server ausführen kann.

TensorFlow ist eine mathematische Open-Source-Bibliothek von Google, die als Framework für maschinelles Lernen verwendet wird. Mit TensorFlow können Modelle auf einer einzelnen Maschine oder in verteilten Umgebungen erstellt, trainiert und bereitgestellt werden. Die Modelle können auf verschiedene Geräte portiert werden und verfügbare CPU-, GPU- oder TPU-Ressourcen für Training und Bereitstellung nutzen.

AI Platform ist eine serverlose Plattform, auf der Sie TensorFlow-Modelle im großen Maßstab trainieren, mithilfe von Hyperparametern abstimmen und anschließend bereitstellen können. Dabei ist seitens DevOps nur minimaler Verwaltungsaufwand erforderlich. Mit AI Platform lassen sich trainierte Modelle als REST APIs für Onlinevorhersagen bereitstellen und Batchvorhersagejobs senden. AI Platform ist eine von mehreren Mikrodienst-Optionen für Ihr Modell.

Bei den in dieser Lösung beschriebenen Ansätzen wird Dataflow für die Datenverarbeitungspipeline verwendet. In AI Platform werden die Modelle als HTTP-Endpunkte gehostet. Für beide Ansätze könnten jedoch auch andere Technologien genutzt werden. Beim Leistungsvergleich zwischen HTTP und einem direkten TensorFlow-Modell ergäbe sich dadurch kein großer Unterschied.

Batch- und Streamingdaten verarbeiten

In den Experimenten zu dieser Lösung geht es sowohl um Batch- als auch um Streaming-Anwendungsfälle. Bei jedem Experiment werden andere Google Cloud-Produkte für die Eingabe und Ausgabe eingesetzt, da die operativen Anforderungen für unbegrenzte und begrenzte Quellen unterschiedlich sind.

Batchverarbeitung eines begrenzten Datasets

In Abbildung 1 ist zu sehen, dass die Eingangs-Rohdaten in typischen Batchverarbeitungspipelines in einem Objektspeicher wie Cloud Storage gespeichert werden. Zu den strukturierten Datenspeicherformaten zählen durch Kommas getrennte Werte (CSV), das ORC-Format (Optimized Row Columnar, spaltenoptimierte Zeilen), Parquet oder Avro. Diese Formate werden häufig eingesetzt, wenn die Daten aus Datenbanken oder Logs stammen.

Grafik: Architektur typischer Batchverarbeitungspipelines
Abbildung 1: Batchverarbeitungsarchitektur

Einige Analyseplattformen wie BigQuery bieten zusätzlich zu den Abfragefunktionen auch Speichermöglichkeiten. BigQuery verwendet zum Speichern Capacitor. Apache Beam in Dataflow kann sowohl in BigQuery als auch Cloud Storage Lese- und Schreibvorgänge ausführen und außerdem weitere Speicheroptionen in Batchverarbeitungspipelines nutzen.

Streamverarbeitung eines unbegrenzten Datenstroms

Beim Streaming stammen die Eingabedaten für eine Datenverarbeitungspipeline in der Regel aus einem Messaging-System, wie in Abbildung 2 dargestellt. Meistens werden Technologien wie Pub/Sub oder Kafka eingesetzt, um einzelne Datenpunkte im JSON-, CSV- oder Protobuf-Format aufzunehmen.

Grafik: Architektur typischer Streamverarbeitungspipelines
Abbildung 2: Streamverarbeitungsarchitektur

Datenpunkte können einzeln oder als Gruppen in Mikro-Batches verarbeitet werden. Zur zeitlichen Ereignisverarbeitung werden dabei Fensterfunktionen eingesetzt. Die verarbeiteten Daten können an mehrere Ziele übertragen werden, zum Beispiel:

  1. BigQuery für Ad-hoc-Analysen über die Streaming-APIs.
  2. Bigtable zum Bereitstellen von Echtzeitinformationen.
  3. Ein Pub/Sub-Thema zum Auslösen nachfolgender Prozesse/Pipelines.

Auf der E/A-Seite zu Apache Beam finden Sie eine vollständige Liste der Quell-Connectors (Eingabe) und Senk-Connectors (Ausgabe) für Senken mit begrenzten und unbegrenzten Datenquellen.

Ein TensorFlow-Modell aufrufen

Ein in TensorFlow trainiertes Modell kann auf drei Arten aufgerufen werden:

  1. Über einen HTTP-Endpunkt für die Onlinevorhersage.
  2. Direkt durch Verwenden der gespeicherten Modelldatei für Batch- und Onlinevorhersagen.
  3. Über einen AI Platform-Batchvorhersagejob für Batchvorhersagen.

HTTP-Endpunkte für Onlinevorhersagen

TensorFlow-Modelle werden als HTTP-Endpunkte bereitgestellt, die aufgerufen werden können. Sie liefern Vorhersagen in Echtzeit, entweder über eine Stream-Datenverarbeitungspipeline oder über Client-Anwendungen.

Zum Bereitstellen eines TensorFlow-Modells als HTTP-Endpunkt für Onlinevorhersagen können Sie TensorFlow Serving oder einen beliebigen anderen Hostingdienst wie Seldon einsetzen. Wie in Abbildung 3 dargestellt, haben Sie folgende Auswahlmöglichkeiten:

  1. Das Modell selbst auf einer oder mehreren Compute Engine-Instanzen erstellen.
  2. Ein Docker-Image in Compute Engine oder Google Kubernetes Engine verwenden.
  3. Mit Kubeflow Bereitstellungen in Kubernetes oder Google Kubernetes Engine vornehmen.
  4. App Engine mit Endpoints verwenden, um das Modell in einer Web-Anwendung zu hosten.
  5. AI Platform einsetzen, den vollständig verwalteten ML-Trainings- und Bereitstellungsdienst für Google Cloud.
Grafik: Die Möglichkeiten, ein Modell in Dataflow als HTTP-Endpunkt bereitzustellen
Abbildung 3: Verschiedene Möglichkeiten, ein Modell in Dataflow als HTTP-Endpunkt bereitzustellen

AI Platform ist ein vollständig verwalteter Dienst, der einfacher zu implementieren ist als die anderen Varianten. Daher verwenden wir bei unseren Experimenten diese Option, um das Modell als HTTP-Endpunkt bereitzustellen. Wir können uns dann darauf konzentrieren, in AI Platform die Leistung eines direkten Modells mit einem HTTP-Endpunkt zu vergleichen, anstatt die verschiedenen Möglichkeiten der HTTP-Modellbereitstellung einander gegenüberzustellen.

Onlinevorhersagen mit AI Platform-Vorhersage bereitstellen

Für Online-Vorhersagen müssen zwei Aufgaben erfüllt werden:

  1. Ein Modell erstellen.
  2. Mit dem erstellten Modell Schlussfolgerungen ziehen (also Vorhersagen treffen).

Das Bereitstellen eines Modells als HTTP-Endpunkt mithilfe von AI Platform erfordert folgende Schritte:

  1. Die Dateien des trainierten Modells auf Cloud Storage verfügbar machen.
  2. Mit dem Befehl gcloud ml-engine models create ein Modell erstellen.
  3. Mit dem Befehl gcloud ml-engine versions create und den Modelldateien auf Cloud Storage eine Modellversion erstellen.

Mit folgenden Befehlen können Sie ein Modell erstellen:


PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

Der Code erstellt ein AI Platform Prediction-Modell mit dem Namen "babyweight_estimator" und der Modellversion v1 im Google Cloud-Projekt.

Nachdem das Modell erstellt wurde, können Sie es aufrufen. Im folgenden Python-Code wird eine Methode vorgestellt, eine Modellversion in der AI Platform Prediction als REST API aufzurufen:

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

Wenn in einem Speicher wie BigQuery oder Cloud Storage ein großes Dataset vorhanden ist und Sie den Durchsatz des gesamten Prozesses maximieren möchten, wird es nicht empfohlen, das ML-Modell für die Batchvorhersage als HTTP-Endpunkt bereitzustellen. Dadurch wird für jeden Datenpunkt eine HTTP-Anfrage erzeugt, die Anzahl der HTTP-Anfragen ist also sehr hoch. Im folgenden Abschnitt werden bessere Möglichkeiten für Batchvorhersagen vorgestellt.

Direkter Aufruf des Modells für Batch- und Onlinevorhersagen

Bei der Vorhersagetechnik mit einem direkt aufgerufenen Modell wird ein lokales SavedModel (gespeichertes Modell) aus TensorFlow auf den Dataflow-Instanzen verwendet. Das gespeicherte Modell ist eine Kopie der Ausgabedateien, die nach dem Erstellen und Trainieren des TensorFlow-Modells geschrieben wurden. Das SavedModel (gespeichertes Modell) aus TensorFlow kann z. B.:

  • Teil des Pipelinequellcodes sein, der als Dataflow-Job gesendet wird und
  • aus Cloud Storage heruntergeladen werden (siehe Abbildung 4).
Grafik: Vorhersage durch direkten Aufruf des Modells in Dataflow
Abbildung 4: Vorhersagen durch direkten Modellaufruf in Dataflow

Das in dieser Lösung verwendete SavedModel (gespeichertes Modell) ist Teil des Quellcodes aus GitHub. Zum Laden eines Modells auf den Instanzen gehen Sie so vor:

  1. Geben Sie beim Erstellen des Dataflow-Jobs an, welche Abhängigkeiten der Pipeline geladen werden sollen, einschließlich der Modelldatei. Der folgende Python-Code stammt aus der Datei setup.py. Er enthält die Modelldateien, die mit dem Dataflow-Job übergeben werden sollen.

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. Rufen Sie im Verlauf der Pipeline die lokalen Modelldateien auf. Dadurch wird die Vorhersage für die gegebenen Fälle erzeugt. Im folgenden Python-Code sehen Sie, wie das funktioniert.

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

Weitere Informationen dazu finden Sie auf der Seite Mehrere Dateiabhängigkeiten bei Apache Beam.

AI Platform-Batchvorhersagejob

Neben dem Bereitstellen des Modells als HTTP-Endpunkt lassen sich mit AI Platform Batchvorhersagejobs ausführen. Dazu wird eine bereitgestellte Modellversion oder ein SavedModel (gespeichertes Modell) aus TensorFlow-in Cloud Storage verwendet.

Für einen Batchvorhersagejob in AI Platform wird der Cloud Storage-Speicherort der Dateien mit den Eingabedaten als Parameter angegeben. Hierbei wird das Modell verwendet, um Vorhersagen für diese Daten zu erhalten. Die Vorhersageergebnisse werden an einem anderen Speicherort in Cloud Storage abgelegt. Dieser Speicherort wird ebenfalls als Parameter angegeben. Im folgenden Beispiel finden Sie Befehle von gcloud, durch die ein Batchvorhersagejob an AI Platform gesendet wird.

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

Punkt für Punkt im Vergleich zu Mikro-Batching für die Onlinevorhersage

Bei Pipelines für Echtzeitvorhersagen gibt es zwei Möglichkeiten, Vorhersagen für eingehende Datenpunkte zu erhalten, unabhängig davon, ob Sie das Modell als HTTP-Endpunkt bereitstellen oder das Modell direkt von den Workern verwenden.

  • Einzelpunkt. Die naheliegendste Methode besteht darin, jeden Datenpunkt einzeln an das Modell zu senden und eine Vorhersage zu erhalten.
  • Mikro-Batches. Eine optimalere Methode besteht darin, mithilfe einer Fensterfunktion Mikro-Batches zu erstellen. Dabei werden Datenpunkte innerhalb eines bestimmten Zeitraums, beispielsweise alle 5 Sekunden, zu Gruppen zusammengefasst. Der Mikro-Batch wird dann an das Modell gesendet, um Vorhersagen für alle Datenpunkte gleichzeitig zu erhalten.

Der folgende Python-Code zeigt, wie mithilfe einer Fensterfunktion in einer Apache Beam-Pipeline zeitbasierte Mikro-Batches erstellt werden.

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

Beim Mikro-Batching werden Modelle verwendet, die als HTTP-Endpunkte bereitgestellt wurden. Die Anzahl der HTTP-Anfragen und die Latenz sind hierdurch deutlich geringer. Selbst wenn die Modelle beim Mikro-Batching direkt aufgerufen werden, ist es immer noch effizienter, dem Modell einen Tensor mit N Vorhersageinstanzen zu senden als einen Tensor mit einer Länge von 1. Das liegt an den vektorisierten Operationen.

Batch-Experimente

Bei den Batch-Experimenten möchten wir das Gewicht von Babys im Natality-Dataset in BigQuery mithilfe eines TensorFlow-Regressionsmodells schätzen. Anschließend möchten wir die Vorhersageergebnisse als CSV-Dateien in Cloud Storage speichern. Dazu verwenden wir eine Batchpipeline von Dataflow. Im folgenden Abschnitt werden verschiedene Experimente beschrieben, mit denen wir versucht haben, diese Aufgabe zu erfüllen.

Ansatz 1: Dataflow mit Vorhersagen durch direkten Modellaufruf

Bei diesem Ansatz wird das TensorFlow-SavedModel in Dataflow-Workern gehostet. Während der Batchverarbeitungspipeline wird das Modell für die Vorhersage bei jedem Datensatz direkt aufgerufen. In Abbildung 5 ist die Architektur bei dieser Vorgehensweise im Überblick dargestellt.

Grafik: Batchansatz 1: Dataflow mit Vorhersagen durch direkten Modellaufruf
Abbildung 5: Batchansatz 1: Dataflow mit Vorhersagen durch direkten Modellaufruf

In der Dataflow-Pipeline werden folgende Schritte ausgeführt:

  1. Daten aus BigQuery lesen.
  2. Den BigQuery-Datensatz für die Vorhersage aufbereiten.
  3. Für jeden Datensatz das lokale TensorFlow-SavedModel aufrufen, um eine Vorhersage zu erhalten.
  4. Das Ergebnis (Eingabedatensatz und geschätztes Babygewicht) in eine CSV-Datei umwandeln.
  5. Die CSV-Datei in Cloud Storage speichern.

Bei diesem Ansatz werden keine Remotedienste aufgerufen, z. B. auch kein auf AI Platform bereitgestelltes Modell als HTTP-Endpunkt. Die Vorhersage wird mithilfe des SavedModel (gespeichertes Modell) aus TensorFlow in jedem Dataflow-Worker lokal erstellt.

Grafik: Ansatz 2: Dataflow mit AI Platform-Batchvorhersage

Bei dieser Vorgehensweise wird das SavedModel (gespeicherte Modell) aus TensorFlow in Cloud Storage gespeichert und von AI Platform für die Vorhersage verwendet. Im Gegensatz zum ersten Ansatz wird hier jedoch nicht für jeden Datensatz ein API-Aufruf an das erstellte Modell ausgelöst. Stattdessen werden die Daten für die Vorhersage vorbereitet und als Batch gesendet.

Dieser Ansatz setzt sich aus zwei Phasen zusammen:

  1. Dataflow bereitet die Daten aus BigQuery für die Vorhersage vor und speichert sie dann in Cloud Storage.
  2. Der Batchvorhersagejob von AI Platform wird mit den aufbereiteten Daten gesendet und die Vorhersageergebnisse werden in Cloud Storage gespeichert.

In Abbildung 6 ist die Architektur dieses zweiphasigen Ansatzes im Überblick dargestellt.

Grafik: Batchansatz 2: Dataflow mit AI Platform-Batchvorhersage
Abbildung 6: Batchansatz 2: Dataflow mit AI Platform-Batchvorhersage

Zu diesem Workflow gehören einschließlich der Dataflow-Pipeline folgende Schritte:

  1. Daten aus BigQuery lesen.
  2. Den BigQuery-Datensatz für die Vorhersage aufbereiten.
  3. JSON-Daten in Cloud Storage schreiben. Die Funktion serving_fn im Modell erwartet JSON-Instanzen als Eingabe.
  4. Einen Batchvorhersagejob von AI Platform mit den aufbereiteten Daten in Cloud Storage senden. Die Vorhersageergebnisse dieses Jobs werden auch in Cloud Storage geschrieben.

Durch den Cloud Dataflow-Job werden die Daten für die Vorhersage aufbereitet, der AI Platform Prediction-Job wird aber nicht gesendet. Mit anderen Worten, die Aufgaben der Datenvorbereitung und der Batchvorhersage sind nicht fest aneinander gekoppelt. Der Workflow kann durch Cloud Functions, Airflow oder jeden anderen Planer orchestriert werden. Dabei muss zuerst der Dataflow-Job ausgeführt und anschließend der AI Platform-Job für die Batchvorhersage gesendet werden.

Die AI Platform-Batchvorhersage ist wegen ihrer Leistungsfähigkeit und Nutzerfreundlichkeit empfehlenswert, wenn die Daten folgende Kriterien erfüllen:

  • Die Daten liegen durch einen vorangegangenen Datenaufnahmevorgang in Cloud Storage in dem für die Vorhersage erwarteten Format vor.
  • Sie haben keine Kontrolle über die erste Phase des Workflows, z. B. über die Dataflow-Pipeline, mit der die Daten in Cloud Storage für die Vorhersage aufbereitet werden.

Konfigurationen der Experimente

Wir haben bei drei Experimenten folgende Konfigurationen verwendet:

  • Datengröße: 10K, 100K, 1M und 10M Zeilen
  • Cloud Storage-Klasse: Regional Storage
  • Cloud Storage-Speicherort: europe-west1-b
  • Dataflow-Region: europe-west1-b
  • Maschinentyp der Dataflow-Worker: n1-standard-1
  • Dataflow-Autoscaling für Batchdaten mit bis zu einer Million Datensätzen
  • Dataflow-Einstellung num_worker: 20 für Batchdaten mit bis zu zehn Millionen Datensätzen
  • AI Platform-Batchvorhersageeinstellung max-worker-count: 20

Der Cloud Storage-Speicherort sollte mit der Dataflow-Region übereinstimmen. In dieser Lösung wird der beliebige Wert europe-west1-b als Region verwendet.

Ergebnisse

In der folgenden Tabelle sind die Ergebnisse (Zeiten) beim Ausführen der Batchvorhersagen und der Vorhersagen mit direktem Modellaufruf mit unterschiedlichen Dataset-Größen zusammengefasst.

Batchdatengröße Messwert Dataflow, dann AI Platform-Batchvorhersage Dataflow mit Vorhersage durch direkten Modellaufruf
10.000 Zeilen Laufzeit 15 Min. 30 Sek.

(Dataflow: 7 Min. 47 Sek. +
AI Platform: 7 Min. 43 Sek.)
8 Min. 24 Sek.
vCPU-Gesamtzeit 0,301 Std.

(Dataflow: 0,151 Std.+
AI Platform: 0,15 Std.)
0,173 Std.
100.000 Zeilen Laufzeit 16 Min. 37 Sek.

(Dataflow: 8 Min. 39 Sek. +
AI Platform: 7 Min. 58 Sek.)
10 Min. 31 Sek.
vCPU-Gesamtzeit 0,334 Std.

(Dataflow: 0,184 Std.+
AI Platform: 0,15 Std.)
0,243 Std.
Eine Million Zeilen Laufzeit 21 Min. 11 Sek.
(Dataflow: 11 Min. 7 Sek. +
AI Platform: 10 Min. 4 Sek.)
17 Min. 12 Sek.
vCPU-Gesamtzeit 0,446 Std.

(Dataflow: 0,256 Std. +
AI Platform: 0,19 Std.)
1,115 Std.
Zehn Millionen Zeilen Laufzeit 33 Min. 8 Sek.
(Dataflow: 12 Min. 15 Sek. +
AI Platform: 20 Min. 53 Sek.)
25 Min. 2 Sek.
vCPU-Gesamtzeit 5,251 Std.

(Dataflow: 3,581 Std. +
AI Platform: 1,67 Std.)
7,878 Std.

In Abbildung 7 sind diese Ergebnisse grafisch dargestellt.

Grafik: Diagramm mit Zeitangaben für drei Vorgehensweisen und vier verschiedene Dataset-Größen
Abbildung 7: Diagramm mit Zeitangaben für drei Vorgehensweisen und vier verschiedene Dataset-Größen

Wie die Ergebnisse zeigen, benötigt der Batchvorhersagejob von AI Platform allein weniger Zeit, um Vorhersagen für die Eingabedaten zu erstellen, da die Daten bereits in Cloud Storage in dem für die Vorhersage verwendeten Format vorliegen. Wenn der Batchvorhersagejob jedoch mit einem Vorverarbeitungsschritt (Daten aus BigQuery in Cloud Storage extrahieren und aufbereiten) und mit einem Nachverarbeitungsschritt (Daten in BigQuery speichern) kombiniert wird, ist die Ausführungszeit bei direktem Modellaufruf besser. Darüber hinaus kann die Leistung des Vorhersageansatzes mit direktem Modellaufruf mithilfe von Mikro-Batching weiter optimiert werden. Darauf gehen wir bei den Streaming-Experimenten noch ein.

Streaming-Experimente

Bei den Streaming-Experimenten liest die Dataflow-Pipeline Datenpunkte aus einem Pub/Sub-Thema und schreibt die Daten mithilfe der Streaming-APIs in BigQuery. In der Dataflow-Streamingpipeline werden die Daten verarbeitet und mithilfe des TensorFlow-Modells zur Abschätzung des Babygewichts Vorhersagen erstellt.

Das Thema empfängt Daten von einem Streamsimulator, der Datenpunkte erzeugt, für die das Babygewicht mit einer vordefinierten Ereignisrate pro Sekunde geschätzt wird. Dadurch wird ein reales Beispiel für eine unbegrenzte Datenquelle simuliert. Mit dem folgenden Python-Code wird der Datenstrom simuliert, der an ein Pub/Sub-Thema gesendet wird.

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

Ansatz 1: Dataflow mit Onlinevorhersage in AI Platform

Bei diesem Ansatz wird das TensorFlow-Modell als REST API in AI Platform bereitgestellt und gehostet. Die Dataflow-Streamingpipeline ruft die API für jede von Pub/Sub abgerufene Nachricht auf, um Vorhersagen zu erhalten. Die Gesamtarchitektur dieses Ansatzes ist in Abbildung 8 dargestellt:

Grafik: Streamingansatz 1: Dataflow mit AI Platform-Onlinevorhersage
Abbildung 8: Streamingansatz 1: Dataflow mit AI Platform-Onlinevorhersage. Die HTTP-Anfrage kann einen einzelnen Datenpunkt oder eine Gruppe von Datenpunkten in einem Mikro-Batch enthalten.

Bei diesem Ansatz führt die Dataflow-Pipeline die folgenden Schritte aus:

  1. Nachrichten aus einem Pub/Sub-Thema lesen.
  2. Für jede Nachricht eine HTTP-Anfrage an die API des AI Platform-Modells senden und Vorhersagen abrufen.
  3. Ergebnisse mithilfe von Streaming-APIs in BigQuery schreiben.

Mikro-Batching ist ein besserer Ansatz. Anstatt für jede aus Pub/Sub gelesene Nachricht eine HTTP-Anfrage an die REST API des Modells zu senden, fasst Dataflow Nachrichten, die während eines 1-Sekunden-Fensters empfangen wurden, zu Gruppen zusammen. Diese Nachrichtengruppe wird dann als Mikro-Batch in einer einzelnen HTTP-Anfrage an die API des Modells gesendet. Bei diesem Ansatz führt die Dataflow-Pipeline die folgenden Schritte aus:

  1. Nachrichten aus Pub/Sub-Thema lesen.
  2. Durch eine Fensterfunktion die Nachrichten einer Sekunde zu einem Mikro-Batch zusammenfassen.
  3. Eine HTTP-Anfrage mit dem Mikro-Batch an die API des AI Platform-Modells senden, um die Vorhersagen für die Nachrichten zu erhalten.
  4. Ergebnisse mithilfe von Streaming-APIs in BigQuery schreiben.

Hinter dieser Vorgehensweise stehen folgende Überlegungen:

  1. Die Anzahl der Aufrufe des Remotediensts, z. B. des AI Platform-Modells, ist geringer.
  2. Die durchschnittliche Latenz bei der Zustellung jeder Nachricht ist geringer.
  3. Die Gesamtverarbeitungszeit der Pipeline ist kürzer.

Bei diesem Experiment wurde das Zeitfenster auf eine Sekunde eingestellt. Die Größe des Mikro-Batches, d. h. die Anzahl der Nachrichten, die als Batch an den AI Platform-Modus gesendet werden, variiert jedoch. Die Größe der Mikro-Batches hängt davon ab, mit welcher Häufigkeit Nachrichten erzeugt werden, also von der Anzahl der Nachrichten pro Sekunde.

Im folgenden Abschnitt werden Experimente mit drei verschiedenen Häufigkeiten beschrieben: 50, 100 und 500 Nachrichten pro Sekunde. Das heißt, die Mikro-Batch-Größe beträgt 50, 100 und 500.

Ansatz 2: Dataflow mit Vorhersagen durch direkten Modellaufruf

Diese Vorgehensweise ähnelt der Vorgehensweise in den Batch-Experimenten. Das TensorFlow-SavedModel wird in Dataflow-Workern gehostet. Während der Streamverarbeitungspipeline wird bei jedem Datensatz das Modell für die Vorhersage aufgerufen. In Abbildung 9 ist die Architektur bei dieser Vorgehensweise im Überblick dargestellt.

Grafik: Streamingansatz 2: Dataflow mit Vorhersagen durch direkten Modellaufruf
Abbildung 9: Streamingansatz 2: Dataflow mit Vorhersagen durch direkten Modellaufruf

Bei diesem Ansatz führt die Dataflow-Pipeline die folgenden Schritte aus:

  1. Nachrichten aus Pub/Sub-Thema lesen.
  2. Für jeden Datensatz das lokale TensorFlow-SavedModel aufrufen, um eine Vorhersage zu erhalten.
  3. Ergebnisse mithilfe von Streaming-APIs in BigQuery schreiben.

Die Mikro-Batch-Technik kann in der Stream-Pipeline auch beim Ansatz mit direktem Modellaufruf verwendet werden. Anstatt einen Tensor einer Dateninstanz an das Modell zu senden, können wir einen Tensor aus N Dateninstanzen senden. Dabei ist N die Anzahl der Nachrichten, die im Dataflow-Fenster im Modell empfangen wurden. Bei dieser Technik werden die vektorisierten Operationen des TensorFlow-Modells verwendet und man erhält mehrere Vorhersagen parallel.

Konfigurationen der Experimente

Bei diesen Experimenten haben wir folgende Konfigurationen verwendet:

  • Stream-Datengröße: 10K records (messages)
  • Simulierte Nachrichten pro Sekunde (MPS): 50, 100 und 500
  • Größe des Zeitfensters (bei Mikro-Batch-Experimenten): 1 second
  • Dataflow-Region: europe-west1-b
  • Maschinentyp der Dataflow-Worker: n1-standard-1
  • Dataflow-Einstellung num_worker: 5 (kein Autoscaling)
  • API-Knoten des AI Platform-Modells: 3 (manualScale)

Ergebnisse

In der folgenden Tabelle sind die Ergebnisse der Streaming-Experimente mit unterschiedlichen Datenmengen (Nachrichten pro Sekunde) zusammengefasst. Mit der Nachrichtenhäufigkeit ist die Anzahl der pro Sekunde gesendeten Nachrichten gemeint. Die Simulationszeit ist die Zeit für das Senden aller Nachrichten.

Stream-Nachrichtenhäufigkeit Messwert Dataflow mit AI Platform-Onlinevorhersage   Dataflow mit Vorhersage durch direkten Modellaufruf  
    Einzelne Nachricht Mikro-Batching Einzelne Nachricht Mikro-Batching
50 Nachr. pro Sek.

(Simulationszeit: 3 Min. 20 Sek.)
Gesamtzeit 9 Min. 34 Sek. 7 Min. 44 Sek. 3 Min. 43 Sek. 3 Min. 22 Sek.
100 Nachr. pro Sek.

(Simulationszeit: 1 Min. 40 Sek.)
Gesamtzeit 6 Min. 3 Sek. 4 Min. 34 Sek. 1 Min. 51 Sek. 1 Min. 41 Sek.
500 Nachr. pro Sek.

(Simulationszeit: 20 Sek.)
Gesamtzeit Nicht verfügbar – Standardkontingent für AI Platform-Onlinevorhersage 2 Min. 47 Sek. 1 Min. 23 Sek. 48 Sek.

In Abbildung 10 sind diese Ergebnisse grafisch dargestellt.

Grafik: Diagramm mit Zeitangaben für verschiedene Vorgehensweisen und Häufigkeiten
Abbildung 10: Diagramm mit Zeitangaben für verschiedene Vorgehensweisen und Häufigkeiten

An den Ergebnissen ist zu sehen, dass das Mikro-Batching-Verfahren die Ausführungsleistung sowohl bei der Onlinevorhersage von AI Platform als auch bei der Vorhersage durch direkten Modellaufruf verbessert. Darüber hinaus erkennt man beim direkten Modellaufruf mit Streamingpipeline eine Leistungsverbesserung um das Doppelte bis Vierfache im Vergleich zum Aufrufen einer externen REST/HTTP API für die Onlinevorhersage.

Zusammenfassung

Entsprechend den beschriebenen Vorgehensweisen und Versuchsergebnissen möchten wir die folgenden Empfehlungen abgeben.

Batchverarbeitung

  • Wenn Sie Ihre Batch-Datenverarbeitungspipeline erstellen und im Rahmen der Pipeline Vorhersagen erstellen möchten, verwenden Sie den Ansatz mit dem direkten Modellaufruf, um die beste Leistung zu erzielen.
  • Beim direkten Modellaufruf lässt sich die Leistung weiter verbessern. Erstellen Sie dazu Mikro-Batches der Datenpunkte, bevor Sie das lokale Modell für die Vorhersage aufrufen, um die Parallelisierung der vektorisierten Vorgänge zu nutzen.
  • Wenn die Daten in dem für die Vorhersage erwarteten Format in Cloud Storage abgelegt werden, verwenden Sie die Batchvorhersage von AI Platform, um eine optimale Leistung zu erzielen.
  • Wenn Sie die Leistung von GPUs für die Batchvorhersage nutzen möchten, verwenden Sie AI Platform.
  • Verwenden Sie die Onlinevorhersage von AI Platform nicht für Batchvorhersagen.

Streamverarbeitung

  • Verwenden Sie den direkten Modellaufruf in der Streamingpipeline, um die beste Leistung zu erzielen und die durchschnittliche Latenz zu reduzieren. Vorhersagen werden lokal ausgeführt, ohne HTTP-Aufrufe für Remotedienste.
  • Entkoppeln Sie das Modell von den Datenverarbeitungspipelines. Dadurch sind die für Onlinevorhersagen verwendeten Modelle besser zu handhaben. Die beste Vorgehensweise besteht darin, das Modell als unabhängigen Mikrodienst bereitzustellen. Hierzu kann AI Platform oder ein anderer Webhostingdienst genutzt werden.
  • Erstellen Sie das Modell als unabhängigen Webdienst. So können mehrere Datenverarbeitungspipelines und Onlineanwendungen den Modelldienst als Endpunkt verwenden. Außerdem sind Änderungen am Modell für die Anwendungen und Pipelines, die es verwenden, transparent.
  • Erstellen Sie mehrere Instanzen des Dienstes mit Load-Balancing. Dadurch steigern Sie die Skalierbarkeit und Verfügbarkeit des Modell-Webdiensts. Bei AI Platform müssen Sie nur die Anzahl der Knoten (manualScaling) oder minNodes (autoScaling) in der YAML-Konfigurationsdatei angeben, wenn Sie eine Modellversion bereitstellen.
  • Wenn Sie das Modell in einem separaten Mikrodienst erstellen, fallen je nach zugrunde liegender Serverinfrastruktur zusätzliche Kosten an. Weitere Informationen dazu finden Sie in den FAQ zu Preisen für AI Platform-Onlinevorhersagen.
  • Verwenden Sie Mikro-Batching in der Streaming-Datenverarbeitungspipeline. Dadurch erzielen Sie sowohl beim direkten Modellaufruf als auch beim HTTP-Modelldienst eine bessere Leistung. Durch Mikro-Batching benötigen Sie weniger HTTP-Anfragen an den Modelldienst. Für die Vorhersagen werden die vektorisierten Vorgänge des TensorFlow-Modells verwendet.

Weitere Informationen