Run in Google Colab | View source on GitHub |
Running inference with multiple differently-trained models performing the same task is useful in many scenarios, including the following examples:
- You want to compare the performance of multiple different models.
- You have models trained on different datasets that you want to use conditionally based on additional metadata.
In Apache Beam, the recommended way to run inference is to use the RunInference
transform. By using a KeyedModelHandler
, you can efficiently run inference with O(100s) of models without having to manage memory yourself.
This notebook demonstrates how to use a KeyedModelHandler
to run inference in an Apache Beam pipeline with multiple different models on a per-key basis. This notebook uses pretrained pipelines from Hugging Face. Before continuing with this notebook, it is recommended that you walk through the Use RunInference in Apache Beam notebook.
Install dependencies
Install both Apache Beam and the dependencies needed by Hugging Face.
!pip install apache_beam[gcp]>=2.51.0 --quiet
!pip install torch --quiet
!pip install transformers --quiet
# To use the newly installed versions, restart the runtime.
exit()
from typing import Dict
from typing import Iterable
from typing import Tuple
from transformers import pipeline
import apache_beam as beam
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import KeyModelMapping
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler
from apache_beam.ml.inference.base import RunInference
Define the model configurations
A model handler is the Apache Beam method used to define the configuration needed to load and invoke models. Because this example uses two models, we define two model handlers, one for each model. Because both models are incapsulated within Hugging Face pipelines, we use the model handler HuggingFacePipelineModelHandler
.
For this example, load the models using Hugging Face, and then run them against an example. The models produce different outputs.
distilbert_mh = HuggingFacePipelineModelHandler('text-classification', model="distilbert-base-uncased-finetuned-sst-2-english")
roberta_mh = HuggingFacePipelineModelHandler('text-classification', model="roberta-large-mnli")
distilbert_pipe = pipeline('text-classification', model="distilbert-base-uncased-finetuned-sst-2-english")
roberta_large_pipe = pipeline(model="roberta-large-mnli")
Downloading (…)lve/main/config.json: 0%| | 0.00/629 [00:00<?, ?B/s] Downloading model.safetensors: 0%| | 0.00/268M [00:00<?, ?B/s] Downloading (…)okenizer_config.json: 0%| | 0.00/48.0 [00:00<?, ?B/s] Downloading (…)solve/main/vocab.txt: 0%| | 0.00/232k [00:00<?, ?B/s] Downloading (…)lve/main/config.json: 0%| | 0.00/688 [00:00<?, ?B/s] Downloading model.safetensors: 0%| | 0.00/1.43G [00:00<?, ?B/s] Downloading (…)olve/main/vocab.json: 0%| | 0.00/899k [00:00<?, ?B/s] Downloading (…)olve/main/merges.txt: 0%| | 0.00/456k [00:00<?, ?B/s] Downloading (…)/main/tokenizer.json: 0%| | 0.00/1.36M [00:00<?, ?B/s]
distilbert_pipe("This restaurant is awesome")
[{'label': 'POSITIVE', 'score': 0.9998743534088135}]
roberta_large_pipe("This restaurant is awesome")
[{'label': 'NEUTRAL', 'score': 0.7313134670257568}]
Define the examples
Define examples to input into the pipeline. The examples include the correct classifications.
examples = [
("This restaurant is awesome", "positive"),
("This restaurant is bad", "negative"),
("I feel fine", "neutral"),
("I love chocolate", "positive"),
]
To feed the examples into RunInference, you need distinct keys that can map to the model. In this case, to make it possible to extract the actual sentiment of the example later, define keys in the form <model_name>-<actual_sentiment>
.
class FormatExamples(beam.DoFn):
"""
Map each example to a tuple of ('<model_name>-<actual_sentiment>', 'example').
Use these keys to map our elements to the correct models.
"""
def process(self, element: Tuple[str, str]) -> Iterable[Tuple[str, str]]:
yield (f'distilbert-{element[1]}', element[0])
yield (f'roberta-{element[1]}', element[0])
Use the formatted keys to define a KeyedModelHandler
that maps keys to the ModelHandler
used for those keys. The KeyedModelHandler
method lets you define an optional max_models_per_worker_hint
, which limits the number of models that can be held in a single worker process at one time. If your worker might run out of memory, use this option. For more information about managing memory, see Use a keyed ModelHandler.
per_key_mhs = [
KeyModelMapping(['distilbert-positive', 'distilbert-neutral', 'distilbert-negative'], distilbert_mh),
KeyModelMapping(['roberta-positive', 'roberta-neutral', 'roberta-negative'], roberta_mh)
]
mh = KeyedModelHandler(per_key_mhs, max_models_per_worker_hint=2)
Postprocess the results
The RunInference
transform returns a tuple that contains the following objects:
- the original key
- a
PredictionResult
object containing the original example and the inference Use those outputs to extract the relevant data. Then, to compare each model's prediction, group this data by the original example.
class ExtractResults(beam.DoFn):
"""
Extract the relevant data from the PredictionResult object.
"""
def process(self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, Dict[str, str]]]:
actual_sentiment = element[0].split('-')[1]
model = element[0].split('-')[0]
result = element[1]
example = result.example
predicted_sentiment = result.inference[0]['label']
yield (example, {'model': model, 'actual_sentiment': actual_sentiment, 'predicted_sentiment': predicted_sentiment})
Finally, print the results produced by each model.
class PrintResults(beam.DoFn):
"""
Print the results produced by each model along with the actual sentiment.
"""
def process(self, element: Tuple[str, Iterable[Dict[str, str]]]):
example = element[0]
actual_sentiment = element[1][0]['actual_sentiment']
predicted_sentiment_1 = element[1][0]['predicted_sentiment']
model_1 = element[1][0]['model']
predicted_sentiment_2 = element[1][1]['predicted_sentiment']
model_2 = element[1][1]['model']
if model_1 == 'distilbert':
distilbert_prediction = predicted_sentiment_1
roberta_prediction = predicted_sentiment_2
else:
roberta_prediction = predicted_sentiment_1
distilbert_prediction = predicted_sentiment_2
print(f'Example: {example}\nActual Sentiment: {actual_sentiment}\n'
f'Distilbert Prediction: {distilbert_prediction}\n'
f'Roberta Prediction: {roberta_prediction}\n------------')
Run the pipeline
To run a single Apache Beam pipeline, combine the previous steps.
with beam.Pipeline() as beam_pipeline:
formatted_examples = (
beam_pipeline
| "ReadExamples" >> beam.Create(examples)
| "FormatExamples" >> beam.ParDo(FormatExamples()))
inferences = (
formatted_examples
| "Run Inference" >> RunInference(mh)
| "ExtractResults" >> beam.ParDo(ExtractResults())
| "GroupByExample" >> beam.GroupByKey()
)
inferences | beam.ParDo(PrintResults())
Example: This restaurant is awesome Actual Sentiment: positive Distilbert Prediction: POSITIVE Roberta Prediction: NEUTRAL ------------ Example: This restaurant is bad Actual Sentiment: negative Distilbert Prediction: NEGATIVE Roberta Prediction: NEUTRAL ------------ Example: I love chocolate Actual Sentiment: positive Distilbert Prediction: POSITIVE Roberta Prediction: NEUTRAL ------------ Example: I feel fine Actual Sentiment: neutral Distilbert Prediction: POSITIVE Roberta Prediction: ENTAILMENT ------------