Machine learning avec Apache Beam et TensorFlow

Cette procédure pas à pas vous montre comment prétraiter, entraîner et effectuer des prédictions avec un modèle de machine learning à l'aide d'Apache Beam, de Google Cloud Dataflow et de TensorFlow.

Pour illustrer ces concepts, cette procédure utilise l'exemple de code Molecules. À partir de données moléculaires en entrée, le code Molecules crée et entraîne un modèle d'apprentissage automatique permettant de prédire l'énergie moléculaire.

Remarque : L'exemple de code Molecules utilise l'API Estimator de TensorFlow. Si vous utilisez l'API Keras de TensorFlow, mais que vous souhaitez mieux faire correspondre votre projet à ce tutoriel, vous pouvez convertir votre modèle Keras en Estimator.

Coûts

Cette procédure peut utiliser des composants facturables de Google Cloud Platform, tels que :

  • Cloud Dataflow
  • Cloud Storage
  • AI Platform

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.

Aperçu

Le code Molecules extrait des fichiers contenant des données moléculaires, et compte le nombre d'atomes de carbone, d'hydrogène, d'oxygène et d'azote dans chaque molécule. Ensuite, le code normalise les décomptes vers des valeurs comprises entre 0 et 1, et transmet ces valeurs à un Estimator de réseau de neurones profond TensorFlow. L'Estimator de réseau de neurones entraîne un modèle de machine learning pour prédire l'énergie moléculaire.

L'exemple de code est organisé en quatre phases :

  1. Extraction des données (data-extractor.py).
  2. Prétraitement (preprocess.py).
  3. Entraînement (trainer/task.py).
  4. Prédiction (predict.py).

Les sections ci-dessous passent en revue chacune de ces quatre phases, mais cette procédure pas à pas se concentre sur les phases reposant sur Apache Beam et Cloud Dataflow : la phase de prétraitement et la phase de prédiction. La phase de prétraitement utilise également la bibliothèque TensorFlow Transform (communément appelée tf.Transform).

L'illustration suivante montre le workflow de l'exemple de code Molecules.

Workflow Molecules

Exécuter l'exemple de code

Pour configurer votre environnement, suivez les instructions figurant dans le fichier README du dépôt GitHub de Molecules. Exécutez ensuite l'exemple de code Molecules à l'aide d'un des scripts wrapper fournis, run-local ou run-cloud. Ces scripts exécutent automatiquement les quatre phases de l'exemple de code (extraction, prétraitement, entraînement et prédiction).

Vous pouvez également exécuter manuellement chacune des phases à l'aide des commandes fournies dans les sections de ce document.

Exécuter localement

Pour exécuter l'exemple de code Molecules localement, exécutez le script wrapper run-local :

./run-local

Les journaux de sortie vous indiquent laquelle des quatre phases le script est en train d'exécuter (extraction de données, prétraitement, entraînement et prédiction).

Le script data-extractor.py requiert un argument correspondant au nombre de fichiers. Pour faciliter leur utilisation, les scripts run-local et run-cloud possèdent une valeur par défaut de cinq fichiers pour cet argument. Chaque fichier contient 25 000 molécules. Exécuter cet exemple de code doit prendre approximativement de trois à sept minutes du début à la fin. La durée d'exécution dépend du processeur de votre ordinateur.

Remarque : Le script run-local comprend des arguments facultatifs. Consultez le fichier README pour découvrir comment utiliser les arguments facultatifs.

Exécuter sur Google Cloud Platform

Pour exécuter l'exemple de code Molecules sur GCP, exécutez le script wrapper run-cloud. Tous les fichiers d'entrée doivent être hébergés dans Cloud Storage.

Définissez le paramètre --work-dir pour qu'il pointe vers le bucket Cloud Storage :

./run-cloud --work-dir gs://<your-bucket-name>/cloudml-samples/molecules

Remarque : Le script run-cloud comprend des arguments facultatifs. Consultez le fichier README pour découvrir comment utiliser les arguments facultatifs.

Phase 1 : Extraction des données

Code source : data-extractor.py

La première étape consiste à extraire les données d'entrée. Le fichier data-extractor.py extrait et décompresse les fichiers SDF spécifiés. Au cours d'étapes suivantes, l'exemple prétraite ces fichiers, et utilise les données pour entraîner et évaluer le modèle de machine learning. Le script extrait les fichiers SDF de leur source publique et les enregistre dans un sous-répertoire du répertoire de travail spécifié. Le répertoire de travail par défaut (--work-dir) est /tmp/cloudml-samples/molecules.

Enregistrer les fichiers extraits

Enregistrez localement les fichiers de données que vous venez d'extraire :

python data-extractor.py --max-data-files 5

Ou enregistrez les fichiers extraits dans un emplacement Cloud Storage :

WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules

python data-extractor.py --work-dir $WORK_DIR --max-data-files 5

Remarque : Le stockage de fichiers dans Cloud Storage entraîne des frais dans le cadre de votre projet GCP. Pour plus d'informations, consultez les tarifs Cloud Storage.

Phase 2 : Prétraitement

Code source : preprocess.py

L'exemple de code Molecules utilise un pipeline Apache Beam pour prétraiter les données. Le pipeline effectue les opérations de prétraitement suivantes :

  1. Lire et analyser les fichiers SDF extraits.
  2. Compter le nombre d'atomes différents dans chacune des molécules des fichiers.
  3. Normaliser les décomptes à l'aide de tf.Transform vers des valeurs comprises entre 0 et 1
  4. Partitionner l'ensemble de données en un ensemble de données d'apprentissage et un ensemble de données d'évaluation.
  5. Écrire les deux ensembles de données en tant qu'objets TFRecord.

Les transformations Apache Beam peuvent manipuler efficacement des éléments individuels, mais les transformations impliquant de parcourir l'intégralité de l'ensemble de données sont difficiles à réaliser avec Apache Beam uniquement. Elles sont plus efficaces avec tf.Transform. De ce fait, le code utilise des transformations Apache Beam pour lire et mettre en forme les molécules, et pour compter les atomes dans chaque molécule. Le code utilise ensuite tf.Transform pour identifier les nombres minimum et maximum globaux afin de normaliser les données.

L'illustration suivante montre les étapes du pipeline.

Pipeline de prétraitement

Remarque : Une partie de la logique du pipeline de prétraitement (preprocess.py) est commune au pipeline responsable de la prédiction (predict.py). Pour éviter la duplication de code, cette logique partagée est hébergée dans les fichiers pubchem/pipeline.py et pubchem/sdf.py. La gestion des dépendances de pipeline d'Apache Beam encourage à placer les fichiers importés dans un module séparé.

Appliquer des transformations basées sur les éléments

Le code preprocess.py crée un pipeline Apache Beam.

Consultez le code tensorflow_transform/beam/impl.py.

# Build and run a Beam Pipeline
with beam.Pipeline(options=beam_options) as p, \
     beam_impl.Context(temp_dir=tft_temp_dir):

Ensuite, le code applique une transformation feature_extraction au pipeline.

# Transform and validate the input data matches the input schema
dataset = (
    p
    | 'Feature extraction' >> feature_extraction

Le pipeline utilise SimpleFeatureExtraction en tant que transformation feature_extraction.

pubchem.SimpleFeatureExtraction(pubchem.ParseSDF(data_files_pattern)),

La transformation SimpleFeatureExtraction, définie dans pubchem/pipeline.py, contient une série de transformations qui manipulent tous les éléments indépendamment. Pour commencer, le code analyse les molécules à partir du fichier source, puis il met en forme les molécules dans un dictionnaire des propriétés moléculaires et enfin, il compte les atomes présents dans la molécule. Ces nombres sont les caractéristiques (entrées) du modèle de machine learning.

class SimpleFeatureExtraction(beam.PTransform):
  """The feature extraction (element-wise transformations).

  We create a `PTransform` class. This `PTransform` is a bundle of
  transformations that can be applied to any other pipeline as a step.

  We'll extract all the raw features here. Due to the nature of `PTransform`s,
  we can only do element-wise transformations here. Anything that requires a
  full-pass of the data (such as feature scaling) has to be done with
  tf.Transform.
  """
  def __init__(self, source):
    super(SimpleFeatureExtraction, self).__init__()
    self.source = source

  def expand(self, p):
    # Return the preprocessing pipeline. In this case we're reading the PubChem
    # files, but the source could be any Apache Beam source.
    return (p
        | 'Read raw molecules' >> self.source
        | 'Format molecule' >> beam.ParDo(FormatMolecule())
        | 'Count atoms' >> beam.ParDo(CountAtoms())
    )

La transformation de lecture beam.io.Read(pubchem.ParseSDF(data_files_pattern)) lit les fichiers SDF depuis une source personnalisée.

La source personnalisée, appelée ParseSDF, est définie dans pubchem/pipeline.py. ParseSDF, étend FileBasedSource et implémente la fonction read_records qui ouvre les fichiers SDF extraits.

Lorsque vous exécutez l'exemple de code Molecules sur GCP, plusieurs nœuds de calcul (VM) peuvent lire les fichiers simultanément. Pour éviter que deux nœuds de calcul ne lisent le même contenu dans les fichiers, chaque fichier utilise une valeur range_tracker.

Le pipeline regroupe les données brutes en sections d'informations pertinentes nécessaires aux étapes suivantes. Chaque section du fichier SDF analysé est stockée dans un dictionnaire (voir pipeline/sdf.py) dont les clés sont les noms des sections et les valeurs, le contenu de la ligne brute de la section correspondante.

Remarque : L'exemple de code Molecules n'extrait que quelques-unes des caractéristiques des molécules. Pour voir un exemple d'extraction de fonctionnalités plus complexe, consultez ce projet.

Le code applique beam.ParDo(FormatMolecule()) au pipeline. La fonction ParDo applique le DoFn nommé FormatMolecule à chaque molécule. FormatMolecule génère un dictionnaire de molécules mises en forme. Le fragment de code suivant est un exemple d'élément figurant dans la PCollection de sortie :

{
  'atoms': [
    {
      'atom_atom_mapping_number': 0,
      'atom_stereo_parity': 0,
      'atom_symbol': u'O',
      'charge': 0,
      'exact_change_flag': 0,
      'h0_designator': 0,
      'hydrogen_count': 0,
      'inversion_retention': 0,
      'mass_difference': 0,
      'stereo_care_box': 0,
      'valence': 0,
      'x': -0.0782,
      'y': -1.5651,
      'z': 1.3894,
    },
    ...
  ],
  'bonds': [
    {
      'bond_stereo': 0,
      'bond_topology': 0,
      'bond_type': 1,
      'first_atom_number': 1,
      'reacting_center_status': 0,
      'second_atom_number': 5,
    },
    ...
  ],
  '<PUBCHEM_COMPOUND_CID>': ['3\n'],
  ...
  '<PUBCHEM_MMFF94_ENERGY>': ['19.4085\n'],
  ...
}

Ensuite, le code applique beam.ParDo(CountAtoms()) au pipeline. Le DoFn CountAtoms compte le nombre d'atomes de carbone, d'hydrogène, d'azote et d'oxygène contenus dans chaque molécule. CountAtoms renvoie une PCollection de caractéristiques et d'étiquettes. Voici un exemple d'élément figurant dans la PCollection de sortie :

{
  'ID': 3,
  'TotalC': 7,
  'TotalH': 8,
  'TotalO': 4,
  'TotalN': 0,
  'Energy': 19.4085,
}

Le pipeline valide ensuite les entrées. Le DoFn ValidateInputData valide le fait que chaque élément correspond aux métadonnées renseignées dans le input_schema. Cette validation garantit la forme que doivent posséder les données lorsqu'elles sont transmises à TensorFlow.

| 'Validate inputs' >> beam.ParDo(ValidateInputData(
    input_feature_spec)))

Appliquer des transformations effectuant un passage complet

L'exemple de code Molecules utilise un régresseur de réseau de neurones profonds pour effectuer des prédictions. Il est généralement recommandé de normaliser les entrées avant de les introduire dans le modèle de ML. Le pipeline utilise tf.Transform pour normaliser les décomptes de chaque atome vers des valeurs comprises entre 0 et 1. Pour en savoir plus sur la normalisation des entrées, consultez l'article Feature scaling (Scaling des caractéristiques).

La normalisation des valeurs implique un passage complet sur l'ensemble de données pour enregistrer les valeurs minimale et maximale. Le code utilise tf.Transform pour parcourir l'intégralité de l'ensemble de données et appliquer des transformations effectuant un passage complet.

Pour utiliser tf.Transform, le code doit fournir une fonction contenant la logique de la transformation à effectuer sur l'ensemble de données. Dans preprocess.py, le code utilise la transformation AnalyzeAndTransformDataset fournie par tf.Transform. En savoir plus sur l'utilisation de tf.Transform

# Apply the tf.Transform preprocessing_fn
input_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(input_feature_spec))

dataset_and_metadata, transform_fn = (
    (dataset, input_metadata)
    | 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset(
        feature_scaling))
dataset, metadata = dataset_and_metadata

Dans preprocess.py, la fonction de feature_scaling utilisée est normalize_inputs et celle-ci est définie dans pubchem/pipeline.py. La fonction utilise la fonction tf.Transform scale_to_0_1 pour normaliser les décomptes vers des valeurs comprises entre 0 et 1.

def normalize_inputs(inputs):
  """Preprocessing function for tf.Transform (full-pass transformations).

  Here we will do any preprocessing that requires a full-pass of the dataset.
  It takes as inputs the preprocessed data from the `PTransform` we specify, in
  this case `SimpleFeatureExtraction`.

  Common operations might be scaling values to 0-1, getting the minimum or
  maximum value of a certain field, creating a vocabulary for a string field.

  There are two main types of transformations supported by tf.Transform, for
  more information, check the following modules:
    - analyzers: tensorflow_transform.analyzers.py
    - mappers:   tensorflow_transform.mappers.py

  Any transformation done in tf.Transform will be embedded into the TensorFlow
  model itself.
  """
  return {
      # Scale the input features for normalization
      'NormalizedC': tft.scale_to_0_1(inputs['TotalC']),
      'NormalizedH': tft.scale_to_0_1(inputs['TotalH']),
      'NormalizedO': tft.scale_to_0_1(inputs['TotalO']),
      'NormalizedN': tft.scale_to_0_1(inputs['TotalN']),

      # Do not scale the label since we want the absolute number for prediction
      'Energy': inputs['Energy'],
  }

Il est possible de normaliser manuellement les données, mais si l'ensemble de données est volumineux, il est plus rapide d'utiliser Cloud Dataflow. L'utilisation de Cloud Dataflow permet au pipeline de s'exécuter sur plusieurs nœuds de calcul (VM) si nécessaire.

Partitionner l'ensemble de données

Ensuite, le pipeline preprocess.py partitionne l'ensemble de données unique en deux jeux de données. Il alloue environ 80 % des données à l'entraînement et environ 20 % à l'évaluation.

# Split the dataset into a training set and an evaluation set
assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
train_dataset, eval_dataset = (
    dataset
    | 'Split dataset' >> beam.Partition(
        lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))

Écrire la sortie

Enfin, le pipeline preprocess.py écrit les deux ensembles de données (entraînement et évaluation) à l'aide de la transformation WriteToTFRecord.

# Write the datasets as TFRecords
coder = example_proto_coder.ExampleProtoCoder(metadata.schema)

train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
_ = (
    train_dataset
    | 'Write train dataset' >> tfrecordio.WriteToTFRecord(
        train_dataset_prefix, coder))

eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
_ = (
    eval_dataset
    | 'Write eval dataset' >> tfrecordio.WriteToTFRecord(
        eval_dataset_prefix, coder))

# Write the transform_fn
_ = (
    transform_fn
    | 'Write transformFn' >> transform_fn_io.WriteTransformFn(work_dir))

Exécuter le pipeline de prétraitement

Exécutez le pipeline de prétraitement localement :

python preprocess.py

Ou exécutez le pipeline de prétraitement sur Cloud Dataflow :

PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python preprocess.py \
  --project $PROJECT \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $WORK_DIR

Remarque : L'exécution de pipelines Cloud Dataflow entraîne des frais dans le cadre de votre projet GCP. Pour plus d'informations, consultez les tarifs Cloud Dataflow.

Une fois le pipeline en cours d'exécution, vous pouvez afficher sa progression dans l'interface de surveillance de Cloud Dataflow :

Pipeline de prétraitement Molecules

Phase 3 : Entraînement

Code source : trainer/task.py

Rappelons qu'à la fin de la phase de prétraitement, le code partitionne les données en deux ensembles de données (entraînement et évaluation).

L'exemple utilise TensorFlow pour entraîner le modèle de machine learning. Le fichier trainer/task.py de l'exemple Molecules contient le code d'entraînement du modèle. La fonction principale de trainer/task.py charge les données traitées lors de la phase de prétraitement.

L'Estimator utilise l'ensemble de données d'apprentissage pour entraîner le modèle, puis l'ensemble de données d'évaluation pour vérifier que le modèle prédit avec justesse l'énergie moléculaire en fonction de certaines propriétés de la molécule.

En savoir plus sur l'entraînement d'un modèle de ML.

Entraîner le modèle

Entraînez le modèle localement :

python trainer/task.py

# To get the path of the trained model
EXPORT_DIR=/tmp/cloudml-samples/molecules/model/export/final
MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1)

Sinon, entraînez le modèle sur AI Platform :

JOB="cloudml_samples_molecules_$(date +%Y%m%d_%H%M%S)"
BUCKET=gs://<your bucket name>
WORK_DIR=$BUCKET/cloudml-samples/molecules
gcloud ml-engine jobs submit training $JOB \
  --module-name trainer.task \
  --package-path trainer \
  --staging-bucket $BUCKET \
  --runtime-version 1.8 \
  --stream-logs \
  -- \
  --work-dir $WORK_DIR

# To get the path of the trained model:
EXPORT_DIR=$WORK_DIR/model/export
MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1)

Remarque : L'établissement de prévisions à l'aide d'AI Platform entraîne des frais pour votre projet GCP. Pour plus d'informations, consultez la page Tarifs d'AI Platform.

Phase 4 : Prédiction

Code source : predict.py

Une fois que l'Estimator a entraîné le modèle, vous pouvez lui fournir des données d'entrée qui lui permettront de faire des prédictions. Dans l'exemple de code Molecules, c'est le pipeline de predict.py qui est chargé de faire des prédictions. Le pipeline peut fonctionner par lots ou par flux.

Le code du pipeline est le même pour le traitement par lots et par flux, à l'exception des interactions source et récepteur. Si le pipeline s'exécute en mode de traitement par lots, il lit les fichiers d'entrée à partir de la source personnalisée et écrit les prédictions de sortie sous forme de fichiers texte dans le répertoire de travail spécifié. Si le pipeline fonctionne en mode de traitement par flux, il lit les molécules d'entrée à mesure qu'elles lui arrivent depuis un sujet Cloud Pub/Sub. Lorsque les prédictions de sortie sont prêtes, le pipeline les écrit dans un autre sujet Cloud Pub/Sub.

if args.verb == 'batch':
  data_files_pattern = os.path.join(args.inputs_dir, '*.sdf')
  results_prefix = os.path.join(args.outputs_dir, 'part')
  source = pubchem.ParseSDF(data_files_pattern)
  sink = beam.io.WriteToText(results_prefix)

elif args.verb == 'stream':
  if not project:
    parser.print_usage()
    print('error: argument --project is required for streaming')
    sys.exit(1)

  beam_options.view_as(StandardOptions).streaming = True
  source = beam.io.ReadFromPubSub(topic='projects/{}/topics/{}'.format(
      project, args.inputs_topic))
  sink = beam.io.WriteToPubSub(topic='projects/{}/topics/{}'.format(
      project, args.outputs_topic))

L'illustration suivante montre les étapes des pipelines de prédiction (par lots et par flux).

Pipeline de prétraitement Molecules

Dans predict.py, le code définit le pipeline dans la fonction run :

def run(model_dir, feature_extraction, sink, beam_options=None):
  print('Listening...')
  with beam.Pipeline(options=beam_options) as p:
    _ = (p
        | 'Feature extraction' >> feature_extraction
        | 'Predict' >> beam.ParDo(Predict(model_dir, 'ID'))
        | 'Format as JSON' >> beam.Map(json.dumps)
        | 'Write predictions' >> sink)

Le code appelle la fonction run avec les paramètres suivants :

run(
    args.model_dir,
    pubchem.SimpleFeatureExtraction(source),
    sink,
    beam_options)

Tout d'abord, le code transmet la transformation pubchem.SimpleFeatureExtraction(source) en tant que transformation feature_extraction. Cette transformation, également utilisée dans la phase de prétraitement, est appliquée au pipeline :

class SimpleFeatureExtraction(beam.PTransform):
  """The feature extraction (element-wise transformations).

  We create a `PTransform` class. This `PTransform` is a bundle of
  transformations that can be applied to any other pipeline as a step.

  We'll extract all the raw features here. Due to the nature of `PTransform`s,
  we can only do element-wise transformations here. Anything that requires a
  full-pass of the data (such as feature scaling) has to be done with
  tf.Transform.
  """
  def __init__(self, source):
    super(SimpleFeatureExtraction, self).__init__()
    self.source = source

  def expand(self, p):
    # Return the preprocessing pipeline. In this case we're reading the PubChem
    # files, but the source could be any Apache Beam source.
    return (p
        | 'Read raw molecules' >> self.source
        | 'Format molecule' >> beam.ParDo(FormatMolecule())
        | 'Count atoms' >> beam.ParDo(CountAtoms())
    )

La transformation lit les données dans la source adéquate suivant le mode d'exécution du pipeline (par lot ou par flux), met en forme les molécules et compte les différents atomes dans chaque molécule.

Ensuite, la fonction beam.ParDo(Predict(…)) est appliquée au pipeline chargé de prédire l'énergie moléculaire. Predict, la fonction DoFn qui lui est transmise, utilise le dictionnaire des caractéristiques d'entrée (décompte des atomes) pour prédire l'énergie moléculaire.

La transformation suivante appliquée au pipeline est beam.Map(lambda result: json.dumps(result)), qui prend le dictionnaire de résultats de prédiction et le sérialise dans une chaîne JSON.

Enfin, la sortie est écrite dans le récepteur (sous forme de fichiers texte dans le répertoire de travail pour le traitement par lots ou sous forme de messages publiés dans un sujet Cloud Pub/Sub pour le traitement par flux).

Prédictions par lots

Les prédictions par lots sont optimisées pour le débit plutôt que pour la latence. Elles fonctionnent mieux si vous devez effectuer un certain nombre de prédictions, mais que vous pouvez attendre qu'elles soient toutes disponibles avant d'obtenir les résultats.

Exécuter le pipeline de prédiction en mode de traitement par lots

Exécutez le pipeline de prédiction par lots localement :

# For simplicity, we'll use the same files we used for training
python predict.py \
  --model-dir $MODEL_DIR \
  batch \
  --inputs-dir /tmp/cloudml-samples/molecules/data \
  --outputs-dir /tmp/cloudml-samples/molecules/predictions

Ou bien exécutez le pipeline de prédiction par lots sur Cloud Dataflow :

# For simplicity, we'll use the same files we used for training
PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python predict.py \
  --work-dir $WORK_DIR \
  --model-dir $MODEL_DIR \
  batch \
  --project $PROJECT \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $WORK_DIR/data \
  --outputs-dir $WORK_DIR/predictions

Remarque : L'exécution de pipelines Cloud Dataflow entraîne des frais dans le cadre de votre projet GCP. Pour plus d'informations, consultez les tarifs Cloud Dataflow.

Une fois le pipeline en cours d'exécution, vous pouvez afficher sa progression dans l'interface de surveillance de Cloud Dataflow :

Pipeline de prédiction Molecules

Prédictions par flux

Les prédictions par flux sont optimisées pour la latence plutôt que pour le débit. Elles fonctionnent mieux si vous effectuez des prédictions de façon sporadique, mais que vous souhaitez obtenir les résultats le plus rapidement possible.

Le service de prédiction (pipeline de prédiction par flux) reçoit les molécules d'entrée depuis un sujet Cloud Pub/Sub et publie la sortie (les prédictions) dans un autre sujet Cloud Pub/Sub.

Créez le sujet Cloud Pub/Sub pour les entrées :

gcloud pubsub topics create molecules-inputs

Créez le sujet Cloud Pub/Sub pour les sorties :

gcloud pubsub topics create molecules-predictions

Exécutez le pipeline de prédiction par flux localement :

# Run on terminal 1
PROJECT=$(gcloud config get-value project)
python predict.py \
  --model-dir $MODEL_DIR \
  stream \
  --project $PROJECT
  --inputs-topic molecules-inputs \
  --outputs-topic molecules-predictions

Ou bien exécutez le pipeline de prédiction par flux sur Cloud Dataflow :

# Run on terminal 1
PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python predict.py \
  --work-dir $WORK_DIR \
  --model-dir $MODEL_DIR \
  stream \
  --project $PROJECT
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-topic molecules-inputs \
  --outputs-topic molecules-predictions

Une fois que le service de prédiction (le pipeline de prédiction par flux) est fonctionnel, vous devez exécuter un éditeur pour envoyer les molécules au service de prédiction et un abonné pour écouter les résultats de la prédiction. L'exemple de code Molecules fournit le code pour les services d'éditeur (publisher.py) et d'abonné (subscriber.py).

L'éditeur analyse les fichiers SDF depuis un répertoire et les publie dans le sujet des entrées. L'abonné écoute les résultats de la prédiction et les enregistre. Pour des raisons de simplicité, cet exemple utilise les mêmes fichiers SDF que ceux utilisés pour la phase d'entraînement.

Remarque : Vous devez les exécuter simultanément comme des processus distincts. Vous devez donc disposer d'un terminal différent pour exécuter chaque commande. N'oubliez pas d'activer le virtualenv sur chaque terminal.

Exécutez l'abonné :

# Run on terminal 2
python subscriber.py \
  --project $PROJECT \
  --topic molecules-predictions

Exécutez l'éditeur :

# Run on terminal 3
python publisher.py \
  --project $PROJECT \
  --topic molecules-inputs \
  --inputs-dir $WORK_DIR/data

Une fois que l'éditeur commence à analyser et à publier des molécules, vous commencez à voir apparaître les prédictions au niveau de l'abonné.

Effectuer un nettoyage

Dès que vous avez terminé d'exécuter le pipeline de prédictions par flux, arrêtez votre pipeline pour éviter les frais.

Étapes suivantes

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.