Maschinelles Lernen mit Apache Beam und TensorFlow

In dieser Schritt-für-Schritt-Anleitung erfahren Sie, wie Sie mit Apache Beam, Google Dataflow und TensorFlow ein Modell für maschinelles Lernen vorverarbeiten, trainieren und damit Vorhersagen erstellen.

Zur Veranschaulichung dieser Konzepte wird hier das Codebeispiel "Molecules" verwendet. Mit Daten über Moleküle als Eingabe wird mit dem Codebeispiel "Molecules" ein Modell für maschinelles Lernen zur Vorhersage molekularer Energie erstellt und trainiert.

Kosten

In dieser Schritt-für-Schritt-Anleitung werden möglicherweise kostenpflichtige Komponenten von Google Cloud verwendet, darunter eine oder mehrere der folgenden:

  • Dataflow
  • Cloud Storage
  • AI Platform

Sie können mithilfe des Preisrechners eine Kostenschätzung für Ihre voraussichtliche Nutzung erstellen.

Übersicht

Mit dem Codebeispiel "Molecules" werden Dateien extrahiert, die Daten über Moleküle enthalten, und die jeweilige Anzahl der Kohlenstoff-, Wasserstoff-, Sauerstoff- und Stickstoffatome in jedem Molekül wird gezählt. Dann werden mit dem Code die jeweiligen Anzahlen auf Werte zwischen 0 und 1 normalisiert. Anschließend werden diese Werte in einen TensorFlow Deep Neural Network Estimator eingespeist. Der Neural Network Estimator trainiert ein Modell für maschinelles Lernen zur Vorhersage molekularer Energie.

Das Codebeispiel umfasst vier Phasen:

  1. Datenextraktion (data-extractor.py)
  2. Vorverarbeitung (preprocess.py)
  3. Training (trainer/task.py)
  4. Vorhersage (predict.py)

In den folgenden Abschnitten werden alle vier Phasen behandelt. Der Schwerpunkt liegt in dieser Anleitung aber auf den Phasen, in denen Apache Beam und Dataflow zum Einsatz kommen: die Vorverarbeitungsphase und die Vorhersagephase. In der Vorverarbeitungsphase wird auch die TensorFlow Transform-Bibliothek tf.Transform verwendet.

In der folgenden Abbildung wird der Workflow des Codebeispiels "Molecules" veranschaulicht.

Workflow für "Molecules"

Codebeispiel ausführen

Folgen Sie bei der Einrichtung der Umgebung der Anleitung in der README-Datei im GitHub-Repository "Molecules". Führen Sie anschließend das Codebeispiel "Molecules" mit einem der bereitgestellten Wrapper-Skripts run-local oder run-cloud aus. Mit diesen Skripts werden automatisch alle vier Phasen des Codebeispiels ausgeführt (Extraktion, Vorverarbeitung, Training und Vorhersage).

Alternativ können Sie alle Phasen mit den Befehlen in den entsprechenden Abschnitten dieses Dokuments manuell ausführen.

Lokal ausführen

Verwenden Sie das Wrapper-Skript run-local, um das Codebeispiel "Molecules" lokal auszuführen:

./run-local

In den Ausgabelogs wird angezeigt, welche der vier Phasen (Datenextraktion, Vorverarbeitung, Training und Vorhersage) gerade vom Skript ausgeführt wird.

Das Skript data-extractor.py hat ein erforderliches Argument für die Anzahl der Dateien. Der Einfachheit halber haben das Skript run-local und das Skript run-cloud standardmäßig 5 Dateien für dieses Argument. Jede Datei umfasst 25.000 Moleküle. Die vollständige Ausführung des Codebeispiels dauert ca. 3 bis 7 Minuten. Die Ausführungsdauer hängt von der CPU Ihres Computers ab.

In Google Cloud ausführen

Führen Sie das Wrapper-Skript run-cloud aus, um das Codebeispiel "Molecules" auf Google Cloud auszuführen. Alle Eingabedateien müssen sich in Cloud Storage befinden.

Verweisen Sie mit dem Parameter --work-dir auf den Cloud Storage-Bucket:

./run-cloud --work-dir gs://<your-bucket-name>/cloudml-samples/molecules

Phase 1: Datenextraktion

Quellcode: data-extractor.py

Im ersten Schritt werden die Eingabedaten extrahiert. Die angegebenen SDF-Dateien werden mit der Datei data-extractor.py extrahiert und dekomprimiert. In späteren Schritten des Beispiels werden diese Dateien vorverarbeitet und die Daten zum Trainieren und Evaluieren des Modells für maschinelles Lernen verwendet. Die Datei extrahiert die SDF-Dateien aus der öffentlichen Quelle und speichert sie in einem Unterverzeichnis des angegebenen Arbeitsverzeichnisses. Das Standardarbeitsverzeichnis (--work-dir) ist /tmp/cloudml-samples/molecules.

Extrahierte Dateien speichern

Speichern Sie die extrahierten Datendateien lokal:

python data-extractor.py --max-data-files 5

Oder speichern Sie die extrahierten Datendateien an einem Speicherort in Cloud Storage:

WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules

python data-extractor.py --work-dir $WORK_DIR --max-data-files 5

Phase 2: Vorverarbeitung

Quellcode: preprocess.py

Im Codebeispiel "Molecules" wird eine Apache Beam-Pipeline zur Vorverarbeitung der Daten verwendet. Folgende Vorverarbeitungsaktionen werden mit der Pipeline ausgeführt:

  1. Lesen und Parsen der extrahierten SDF-Dateien
  2. Zählen der verschiedenen Atome in jedem der Moleküle in den Dateien
  3. Normalisieren der Anzahlen mit tf.Transform auf Werte zwischen 0 und 1
  4. Partitionieren des Datasets in ein Trainings-Dataset und ein Evaluations-Dataset
  5. Schreiben der beiden Datasets als Objekte vom Typ TFRecord

Mit Apache Beam-Transformationen lassen sich einzelne Elemente auf effiziente Weise bearbeiten. Allerdings lassen sich Transformationen, die einen vollständigen Durchlauf des Datasets erfordern, nicht gut mit Apache Beam ausführen. Besser eignet sich dafür tf.Transform. Aus diesem Grund werden im Code Apache Beam-Transformationen zum Lesen und Formatieren der Moleküle sowie zum Zählen der Atome in den einzelnen Molekülen verwendet. Der Code sucht dann mit tf.Transform nach den globalen Mindest- und Höchstwerten, um die Daten zu normalisieren.

In der folgenden Abbildung sind die Schritte in der Pipeline dargestellt.

Vorverarbeitungspipeline

Elementbasierte Transformationen anwenden

Mit dem Code preprocess.py wird eine Apache Beam-Pipeline erstellt.

# Build and run a Beam Pipeline
with beam.Pipeline(options=beam_options) as p, \
     beam_impl.Context(temp_dir=tft_temp_dir):

Als Nächstes wird mit dem Code die Transformation feature_extraction auf die Pipeline angewendet.

# Transform and validate the input data matches the input schema
dataset = (
    p
    | 'Feature extraction' >> feature_extraction

Für die Transformation feature_extraction nutzt die Pipeline SimpleFeatureExtraction.

pubchem.SimpleFeatureExtraction(pubchem.ParseSDF(data_files_pattern)),

Die in pubchem/pipeline.py definierte Transformation SimpleFeatureExtraction umfasst eine Reihe von Transformationen, mit denen alle Elemente voneinander unabhängig bearbeitet werden. Als Erstes parst der Code die Moleküle aus der Quelldatei, formatiert die Moleküle anschließend zu einem Wörterbuch der Molekülattribute und zählt schließlich die Atome im Molekül. Die jeweilige Anzahl sind die Features (Eingaben) für das Modell für maschinelles Lernen.

class SimpleFeatureExtraction(beam.PTransform):
  """The feature extraction (element-wise transformations).

  We create a `PTransform` class. This `PTransform` is a bundle of
  transformations that can be applied to any other pipeline as a step.

  We'll extract all the raw features here. Due to the nature of `PTransform`s,
  we can only do element-wise transformations here. Anything that requires a
  full-pass of the data (such as feature scaling) has to be done with
  tf.Transform.
  """
  def __init__(self, source):
    super(SimpleFeatureExtraction, self).__init__()
    self.source = source

  def expand(self, p):
    # Return the preprocessing pipeline. In this case we're reading the PubChem
    # files, but the source could be any Apache Beam source.
    return (p
        | 'Read raw molecules' >> self.source
        | 'Format molecule' >> beam.ParDo(FormatMolecule())
        | 'Count atoms' >> beam.ParDo(CountAtoms())
    )

Mit der Lesetransformation beam.io.Read(pubchem.ParseSDF(data_files_pattern)) werden SDF-Dateien aus einer benutzerdefinierten Quelle gelesen.

Die benutzerdefinierte Quelle mit dem Namen ParseSDF ist in pubchem/pipeline.py definiert. ParseSDF dient als Erweiterung von FileBasedSource. Damit wird die Funktion read_records zum Öffnen der extrahierten SDF-Dateien implementiert.

Wenn Sie das Codebeispiel "Molecules" mit Google Cloud ausführen, können die Dateien von mehreren Workern (VMs) gleichzeitig gelesen werden. Damit zwei Worker nicht denselben Inhalt in den Dateien lesen, verwendet jede Datei einen range_tracker.

Die Pipeline gruppiert die Rohdaten in Abschnitte mit relevanten Informationen für die nächsten Schritte. Jeder Abschnitt in der geparsten SDF-Datei wird in einem Wörterbuch gespeichert (siehe pipeline/sdf.py), wobei die Schlüssel die Abschnittsnamen und die Werte die Rohinhalte in den Zeilen des entsprechenden Abschnitts sind.

Mit dem Code wird beam.ParDo(FormatMolecule()) auf die Pipeline angewendet. Mit ParDo wird das DoFn-Objekt mit dem Namen FormatMolecule auf jedes Molekül angewendet. FormatMolecule liefert ein Wörterbuch formatierter Moleküle. Das folgende Snippet ist ein Beispiel für ein Element in der Ausgabe-PCollection:

{
  'atoms': [
    {
      'atom_atom_mapping_number': 0,
      'atom_stereo_parity': 0,
      'atom_symbol': u'O',
      'charge': 0,
      'exact_change_flag': 0,
      'h0_designator': 0,
      'hydrogen_count': 0,
      'inversion_retention': 0,
      'mass_difference': 0,
      'stereo_care_box': 0,
      'valence': 0,
      'x': -0.0782,
      'y': -1.5651,
      'z': 1.3894,
    },
    ...
  ],
  'bonds': [
    {
      'bond_stereo': 0,
      'bond_topology': 0,
      'bond_type': 1,
      'first_atom_number': 1,
      'reacting_center_status': 0,
      'second_atom_number': 5,
    },
    ...
  ],
  '<PUBCHEM_COMPOUND_CID>': ['3\n'],
  ...
  '<PUBCHEM_MMFF94_ENERGY>': ['19.4085\n'],
  ...
}

Mit dem Code wird anschließend beam.ParDo(CountAtoms()) auf die Pipeline angewendet. Das DoFn-Objekt CountAtoms summiert die Anzahl der Kohlenstoff-, Wasserstoff-, Stickstoff- und Sauerstoffatome jedes Moleküls. CountAtoms gibt eine PCollection von Features und Labels aus. Hier ein Beispiel für ein Element in der Ausgabe-PCollection:

{
  'ID': 3,
  'TotalC': 7,
  'TotalH': 8,
  'TotalO': 4,
  'TotalN': 0,
  'Energy': 19.4085,
}

Die Pipeline überprüft anschließend die Eingaben. Das DoFn-Objekt ValidateInputData überprüft, ob jedes Element den in input_schema angegebenen Metadaten entspricht. Mit dieser Prüfung wird sichergestellt, dass die Daten im richtigen Format vorliegen, wenn sie in TensorFlow eingespeist werden.

| 'Validate inputs' >> beam.ParDo(ValidateInputData(
    input_feature_spec)))

Transformationen mit vollständigem Durchlauf anwenden

Im Codebeispiel "Molecules" wird ein DNN-Regressor zur Erstellung von Vorhersagen verwendet. Allgemein ist es empfehlenswert, die Eingaben zu normalisieren, bevor sie in das ML-Modell eingespeist werden. Die Pipeline normalisiert mit tf.Transform die jeweiligen Anzahlen der verschiedenen Atome auf Werte zwischen 0 und 1. Weitere Informationen zum Normalisieren von Eingaben finden Sie unter Feature scaling.

Für die Normalisierung der Werte ist ein vollständiger Durchlauf des Datasets erforderlich, bei dem die Minimal- und Maximalwerte aufgezeichnet werden. Der Code untersucht dann mit tf.Transform das gesamte Dataset und wendet Transformationen mit vollständigem Durchlauf an.

Damit Sie tf.Transform verwenden können, muss der Code eine Funktion bereitstellen, die die Logik der Transformation enthält, die am Dataset ausgeführt werden soll. In preprocess.py verwendet der Code die von tf.Transform bereitgestellte Transformation AnalyzeAndTransformDataset. Weitere Informationen zur Verwendung von tf.Transform.

# Apply the tf.Transform preprocessing_fn
input_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(input_feature_spec))

dataset_and_metadata, transform_fn = (
    (dataset, input_metadata)
    | 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset(
        feature_scaling))
dataset, metadata = dataset_and_metadata

In preprocess.py wird als feature_scaling-Funktion normalize_inputs verwendet, die in pubchem/pipeline.py definiert ist. Die Funktion normalisiert mithilfe der tf.Transform-Funktion scale_to_0_1 die Anzahlen auf Werte zwischen 0 und 1.

def normalize_inputs(inputs):
  """Preprocessing function for tf.Transform (full-pass transformations).

  Here we will do any preprocessing that requires a full-pass of the dataset.
  It takes as inputs the preprocessed data from the `PTransform` we specify, in
  this case `SimpleFeatureExtraction`.

  Common operations might be scaling values to 0-1, getting the minimum or
  maximum value of a certain field, creating a vocabulary for a string field.

  There are two main types of transformations supported by tf.Transform, for
  more information, check the following modules:
    - analyzers: tensorflow_transform.analyzers.py
    - mappers:   tensorflow_transform.mappers.py

  Any transformation done in tf.Transform will be embedded into the TensorFlow
  model itself.
  """
  return {
      # Scale the input features for normalization
      'NormalizedC': tft.scale_to_0_1(inputs['TotalC']),
      'NormalizedH': tft.scale_to_0_1(inputs['TotalH']),
      'NormalizedO': tft.scale_to_0_1(inputs['TotalO']),
      'NormalizedN': tft.scale_to_0_1(inputs['TotalN']),

      # Do not scale the label since we want the absolute number for prediction
      'Energy': inputs['Energy'],
  }

Manuelles Normalisieren der Daten ist möglich. Wenn das Dataset allerdings groß ist, geht es mit Dataflow schneller. Mit Dataflow kann die Pipeline nach Bedarf auf mehreren Workern (VMs) ausgeführt werden.

Dataset partitionieren

Als Nächstes partitioniert die Pipeline preprocess.py das Dataset in zwei Datasets. Ca. 80 % der Daten werden als Trainingsdaten und ca. 20 % als Evaluationsdaten zugewiesen.

# Split the dataset into a training set and an evaluation set
assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
train_dataset, eval_dataset = (
    dataset
    | 'Split dataset' >> beam.Partition(
        lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))

Ausgabe schreiben

Schließlich werden die beiden Datasets (Training und Evaluation) von der Pipeline preprocess.py mit der Transformation WriteToTFRecord geschrieben.

# Write the datasets as TFRecords
coder = example_proto_coder.ExampleProtoCoder(metadata.schema)

train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
_ = (
    train_dataset
    | 'Write train dataset' >> tfrecordio.WriteToTFRecord(
        train_dataset_prefix, coder))

eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
_ = (
    eval_dataset
    | 'Write eval dataset' >> tfrecordio.WriteToTFRecord(
        eval_dataset_prefix, coder))

# Write the transform_fn
_ = (
    transform_fn
    | 'Write transformFn' >> transform_fn_io.WriteTransformFn(work_dir))

Vorverarbeitungs-Pipeline ausführen

Führen Sie die Vorverarbeitungs-Pipeline lokal aus:

python preprocess.py

Oder führen Sie die Vorverarbeitungspipeline in Dataflow aus:

PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python preprocess.py \
  --project $PROJECT \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $WORK_DIR

Sobald die Pipeline ausgeführt wird, sehen Sie den Fortschritt der Pipeline in der Dataflow-Monitoring-Oberfläche:

Vorverarbeitungspipeline von "Molecules"

Phase 3: Training

Quellcode: trainer/task.py

Wie bereits erwähnt, erstellt der Code am Ende der Vorverarbeitungsphase zwei Datasets (Training und Evaluation) und zwar durch Aufteilen der Daten.

In dem Beispiel wird TensorFlow zum Trainieren des Modells für maschinelles Lernen verwendet. Die Datei trainer/task.py im Codebeispiel "Molecules" enthält den Code zum Trainieren des Modells. Die Hauptfunktion von trainer/task.py lädt die in der Vorverarbeitungsphase verarbeiteten Daten.

Der Estimator verwendet das Trainings-Dataset zum Trainieren des Modells und anschließend das Evaluations-Dataset, um zu überprüfen, ob das Modell bei bestimmten Molekülattributen die molekulare Energie genau vorhersagt.

Lesen Sie weitere Informationen darüber, wie Sie ein ML-Modell trainieren.

Das Modell trainieren

Trainieren Sie das Modell lokal:

python trainer/task.py

# To get the path of the trained model
EXPORT_DIR=/tmp/cloudml-samples/molecules/model/export/final
MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1)

Trainieren Sie das Modell alternativ mit AI Platform:

gcloud config set compute/region $REGION
  • Trainingsjob starten
JOB="cloudml_samples_molecules_$(date +%Y%m%d_%H%M%S)"
BUCKET=gs://<your bucket name>
WORK_DIR=$BUCKET/cloudml-samples/molecules
gcloud ai-platform jobs submit training $JOB \
  --module-name trainer.task \
  --package-path trainer \
  --staging-bucket $BUCKET \
  --runtime-version 1.13 \
  --stream-logs \
  -- \
  --work-dir $WORK_DIR

# To get the path of the trained model:
EXPORT_DIR=$WORK_DIR/model/export
MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1)

Phase 4: Vorhersage

Quellcode: predict.py

Nachdem das Modell vom Estimator trainiert wurde, können Sie dem Modell Eingaben hinzufügen und Vorhersagen erstellen. Im Codebeispiel "Molecules" werden Vorhersagen von der Pipeline predict.py erstellt. Die Pipeline kann entweder als Batchpipeline oder als Streamingpipeline ausgeführt werden.

Der Code für die Pipeline stimmt für Batch und Streaming zum großen Teil überein, ausgenommen die Abschnitte zu Quell- und Senkeninteraktionen. Wenn die Pipeline im Batchmodus ausgeführt wird, liest sie die Eingabedateien aus der benutzerdefinierten Quelle und schreibt die Ausgabevorhersagen als Textdateien in das angegebene Arbeitsverzeichnis. Wird die Pipeline im Streamingmodus ausgeführt, liest sie die Eingabemoleküle beim Eintreffen aus einem Pub/Sub-Thema. Sobald die Ausgabevorhersagen erstellt sind, schreibt die Pipeline diese in ein anderes Pub/Sub-Thema.

if args.verb == 'batch':
  data_files_pattern = os.path.join(args.inputs_dir, '*.sdf')
  results_prefix = os.path.join(args.outputs_dir, 'part')
  source = pubchem.ParseSDF(data_files_pattern)
  sink = beam.io.WriteToText(results_prefix)

elif args.verb == 'stream':
  if not project:
    parser.print_usage()
    print('error: argument --project is required for streaming')
    sys.exit(1)

  beam_options.view_as(StandardOptions).streaming = True
  source = beam.io.ReadFromPubSub(topic='projects/{}/topics/{}'.format(
      project, args.inputs_topic))
  sink = beam.io.WriteToPubSub(topic='projects/{}/topics/{}'.format(
      project, args.outputs_topic))

In der folgenden Abbildung sind die Schritte in den Vorhersage-Pipelines (Batch und Streaming) dargestellt.

Vorverarbeitungspipeline von "Molecules"

In predict.py definiert der Code die Pipeline in der Funktion run:

def run(model_dir, feature_extraction, sink, beam_options=None):
  print('Listening...')
  with beam.Pipeline(options=beam_options) as p:
    _ = (p
        | 'Feature extraction' >> feature_extraction
        | 'Predict' >> beam.ParDo(Predict(model_dir, 'ID'))
        | 'Format as JSON' >> beam.Map(json.dumps)
        | 'Write predictions' >> sink)

Der Code ruft die Funktion run mit den folgenden Parametern auf:

run(
    args.model_dir,
    pubchem.SimpleFeatureExtraction(source),
    sink,
    beam_options)

Zuerst übergibt der Code die Transformation pubchem.SimpleFeatureExtraction(source) als die Transformation feature_extraction. Diese Transformation, die auch in der Vorverarbeitungsphase verwendet wird, wird auf die Pipeline angewandt:

class SimpleFeatureExtraction(beam.PTransform):
  """The feature extraction (element-wise transformations).

  We create a `PTransform` class. This `PTransform` is a bundle of
  transformations that can be applied to any other pipeline as a step.

  We'll extract all the raw features here. Due to the nature of `PTransform`s,
  we can only do element-wise transformations here. Anything that requires a
  full-pass of the data (such as feature scaling) has to be done with
  tf.Transform.
  """
  def __init__(self, source):
    super(SimpleFeatureExtraction, self).__init__()
    self.source = source

  def expand(self, p):
    # Return the preprocessing pipeline. In this case we're reading the PubChem
    # files, but the source could be any Apache Beam source.
    return (p
        | 'Read raw molecules' >> self.source
        | 'Format molecule' >> beam.ParDo(FormatMolecule())
        | 'Count atoms' >> beam.ParDo(CountAtoms())
    )

Die Transformation liest basierend auf dem Ausführungsmodus der Pipeline (Batch oder Streaming) die entsprechende Quelle aus, formatiert die Moleküle und zählt die verschiedenen Atome in jedem Molekül.

Anschließend wird beam.ParDo(Predict(…)) auf die Pipeline angewendet, um die molekulare Energie vorherzusagen. Predict, das übergebene DoFn-Objekt, verwendet das angegebene Wörterbuch der Eingabefeatures (Anzahl von Atomen) zur Vorhersage der molekularen Energie.

Als nächste Transformation wird beam.Map(lambda result: json.dumps(result)) auf die Pipeline angewendet, um das Wörterbuch mit den Vorhersageergebnissen in einen JSON-String zu serialisieren.

Schließlich wird die Ausgabe in die Senke geschrieben (entweder als Textdateien in das Arbeitsverzeichnis für den Batchmodus oder als Nachrichten in ein Pub/Sub-Thema für den Streamingmodus).

Batchvorhersagen

Batchvorhersagen sind für den Durchsatz optimiert, nicht für die Latenz. Am besten funktionieren Batchvorhersagen, wenn Sie viele Vorhersagen erstellen und warten können, bis alle fertig sind, bevor Sie die Ergebnisse erhalten.

Vorhersage-Pipeline im Batchmodus ausführen

Führen Sie die Batchvorhersagepipeline lokal aus:

# For simplicity, we'll use the same files we used for training
python predict.py \
  --model-dir $MODEL_DIR \
  batch \
  --inputs-dir /tmp/cloudml-samples/molecules/data \
  --outputs-dir /tmp/cloudml-samples/molecules/predictions

Oder führen Sie die Batchvorhersagepipeline in Dataflow aus:

# For simplicity, we'll use the same files we used for training
PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python predict.py \
  --work-dir $WORK_DIR \
  --model-dir $MODEL_DIR \
  batch \
  --project $PROJECT \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $WORK_DIR/data \
  --outputs-dir $WORK_DIR/predictions

Sobald die Pipeline ausgeführt wird, sehen Sie den Fortschritt der Pipeline in der Dataflow-Monitoring-Oberfläche:

Vorhersagepipeline von "Molecules"

Streamingvorhersagen

Streaming-Vorhersagen sind für die Latenz optimiert, nicht für den Durchsatz. Am besten funktionieren Streaming-Vorhersagen, wenn Sie sporadische Vorhersagen erstellen, die Ergebnisse aber so schnell wie möglich erhalten möchten.

Der Vorhersagedienst (die Streaming-Vorhersagepipeline) empfängt die Eingabemoleküle von einem Pub/Sub-Thema und veröffentlicht die Ausgabe (Vorhersagen) in einem anderen Pub/Sub-Thema.

Erstellen Sie das Pub/Sub-Thema für die Eingaben:

gcloud pubsub topics create molecules-inputs

Erstellen Sie das Pub/Sub-Thema für die Ausgaben:

gcloud pubsub topics create molecules-predictions

Führen Sie die Streaming-Vorhersagepipeline lokal aus:

# Run on terminal 1
PROJECT=$(gcloud config get-value project)
python predict.py \
  --model-dir $MODEL_DIR \
  stream \
  --project $PROJECT
  --inputs-topic molecules-inputs \
  --outputs-topic molecules-predictions

Oder führen Sie die Streaming-Vorhersagepipeline in Dataflow aus:

# Run on terminal 1
PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python predict.py \
  --work-dir $WORK_DIR \
  --model-dir $MODEL_DIR \
  stream \
  --project $PROJECT
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-topic molecules-inputs \
  --outputs-topic molecules-predictions

Nachdem Sie den Vorhersagedienst (die Streaming-Vorhersagepipeline) gestartet haben, ist es erforderlich, dass Sie einen Publisher ausführen, der Moleküle an den Vorhersagedienst sendet, und einen Subscriber, der die Vorhersageergebnisse abhört. Das Codebeispiel "Molecules" stellt Publisher-Dienste (publisher.py) und Subscriber-Dienste (subscriber.py) zur Verfügung.

Der Publisher parst SDF-Dateien aus einem Verzeichnis und veröffentlicht sie im Eingabethema. Der Subscriber hört die Vorhersageergebnisse ab und protokolliert sie. Zur Vereinfachung werden in diesem Beispiel dieselben SDF-Dateien wie in der Trainingsphase verwendet.

Führen Sie den Subscriber aus:

# Run on terminal 2
python subscriber.py \
  --project $PROJECT \
  --topic molecules-predictions

Führen Sie den Publisher aus:

# Run on terminal 3
python publisher.py \
  --project $PROJECT \
  --topic molecules-inputs \
  --inputs-dir $WORK_DIR/data

Nachdem der Publisher mit dem Parsen und Veröffentlichen von Molekülen begonnen hat, sehen Sie Vorhersagen des Subscribers.

Bereinigen

Wenn die Ausführung der Streaming-Vorhersagen-Pipeline abgeschlossen ist, stoppen Sie die Pipeline, damit keine Gebühren anfallen.

Weitere Informationen