Use RunInference in Apache Beam

Run in Google Colab View source on GitHub

You can use Apache Beam versions 2.40.0 and later with the RunInference API for local and remote inference with batch and streaming pipelines. The RunInference API leverages Apache Beam concepts, such as the BatchElements transform and the Shared class, to support models in your pipelines that create transforms optimized for machine learning inference.

For more information about the RunInference API, see About Beam ML in the Apache Beam documentation.

This example demonstrates how to use the RunInference API with three popular ML frameworks: PyTorch, TensorFlow, and scikit-learn. The three pipelines use a text classification model for generating predictions.

Follow these steps to build a pipeline:

  • Read the images.
  • If needed, preprocess the text.
  • Run inference with the PyTorch, TensorFlow, or Scikit-learn model.
  • If needed, postprocess the output.

RunInference with a PyTorch model

This section demonstrates how to use the RunInference API with a PyTorch model.

Install dependencies

First, download and install the dependencies.

pip install --upgrade pip
pip install apache_beam[gcp]>=2.40.0
pip install transformers
pip install google-api-core==1.32

Install the model

This example uses a pretrained text classification model, distilbert-base-uncased-finetuned-sst-2-english. This model is a checkpoint of DistilBERT-base-uncased, fine-tuned on the SST-2 dataset.

 git lfs install
 git clone https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english
 ls
Error: Failed to call git rev-parse --git-dir --show-toplevel: "fatal: not a git repository (or any of the parent directories): .git\n"
Git LFS initialized.
fatal: destination path 'distilbert-base-uncased-finetuned-sst-2-english' already exists and is not an empty directory.
'=2.40.0'   distilbert-base-uncased-finetuned-sst-2-english   sample_data

Install helper functions

The model also uses helper functions.

from collections import defaultdict

import torch
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer, DistilBertConfig

import apache_beam as beam
from apache_beam.ml.inference import RunInference
from apache_beam.ml.inference.base import PredictionResult, KeyedModelHandler
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor


class HuggingFaceStripBatchingWrapper(DistilBertForSequenceClassification):
  """Wrapper around HugginFace model because RunInference requires a batch
  as a list of dicts instead of a dict of lists. Another workaround can be found
  here where they disable batching instead.
  https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py"""
  def forward(self, **kwargs):
    output = super().forward(**kwargs)
    return [dict(zip(output, v)) for v in zip(*output.values())]



class Tokenize(beam.DoFn):
  def __init__(self, model_name: str):
    self._model_name = model_name

  def setup(self):
    self._tokenizer = DistilBertTokenizer.from_pretrained(self._model_name)

  def process(self, text_input: str):
    # Pad the token tensors to max length to make sure that all of the tensors
    # are of the same length and stack-able by the RunInference API. Normally, you would batch first
    # then tokenize the batch, padding each tensor the max length in the batch.
    # See: https://beam.apache.org/documentation/ml/about-ml/#unable-to-batch-tensor-elements
    tokens = self._tokenizer(text_input, return_tensors='pt', padding='max_length', max_length=512)
    # Squeeze because tokenization adds an extra dimension, which is empty,
    # in this case because we tokenize one element at a time.
    tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
    return [(text_input, tokens)]

class PostProcessor(beam.DoFn):
  def process(self, tuple_):
    text_input, prediction_result = tuple_
    softmax = torch.nn.Softmax(dim=-1)(prediction_result.inference['logits']).detach().numpy()
    return [{"input": text_input, "softmax": softmax}]

Run the pipeline

This section demonstrates how to create and run the RunInference pipeline.

inputs = [
    "This is the worst food I have ever eaten",
    "In my soul and in my heart, I’m convinced I’m wrong!",
    "Be with me always—take any form—drive me mad! only do not leave me in this abyss, where I cannot find you!",
    "Do I want to live? Would you like to live with your soul in the grave?",
    "Honest people don’t hide their deeds.",
    "Nelly, I am Heathcliff!  He’s always, always in my mind: not as a pleasure, any more than I am always a pleasure to myself, but as my own being.",
]
model_handler = PytorchModelHandlerKeyedTensor(
    state_dict_path="./distilbert-base-uncased-finetuned-sst-2-english/pytorch_model.bin",
    model_class=HuggingFaceStripBatchingWrapper,
    model_params={"config": DistilBertConfig.from_pretrained("./distilbert-base-uncased-finetuned-sst-2-english/config.json")},
    device='cuda:0')

keyed_model_handler = KeyedModelHandler(model_handler)

with beam.Pipeline() as pipeline:
  _ = (pipeline | "Create inputs" >> beam.Create(inputs)
                | "Tokenize" >> beam.ParDo(Tokenize("distilbert-base-uncased-finetuned-sst-2-english"))
                | "Inference" >> RunInference(model_handler=keyed_model_handler)
                | "Postprocess" >> beam.ParDo(PostProcessor())
                | "Print" >> beam.Map(lambda x: print(f"Input: {x['input']} -> negative={100 * x['softmax'][0]:.4f}%/positive={100 * x['softmax'][1]:.4f}%"))
  )
/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:10: FutureWarning: PytorchModelHandlerKeyedTensor is experimental. No backwards-compatibility guarantees.
  # Remove the CWD from sys.path while we load stuff.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
/usr/local/lib/python3.7/dist-packages/dill/_dill.py:472: FutureWarning: PytorchModelHandlerKeyedTensor is experimental. No backwards-compatibility guarantees.
  obj = StockUnpickler.load(self)
/usr/local/lib/python3.7/dist-packages/dill/_dill.py:472: FutureWarning: PytorchModelHandlerKeyedTensor is experimental. No backwards-compatibility guarantees.
  obj = StockUnpickler.load(self)
Input: This is the worst food I have ever eaten -> negative=99.9777%/positive=0.0223%
Input: In my soul and in my heart, I’m convinced I’m wrong! -> negative=1.6313%/positive=98.3687%
Input: Be with me always—take any form—drive me mad! only do not leave me in this abyss, where I cannot find you! -> negative=62.1188%/positive=37.8812%
Input: Do I want to live? Would you like to live with your soul in the grave? -> negative=73.6841%/positive=26.3159%
Input: Honest people don’t hide their deeds. -> negative=0.2377%/positive=99.7623%
Input: Nelly, I am Heathcliff!  He’s always, always in my mind: not as a pleasure, any more than I am always a pleasure to myself, but as my own being. -> negative=0.0672%/positive=99.9328%

RunInference with a TensorFlow model

This section demonstrates how to use the RunInference API with a TensorFlow model.

Install dependencies

First, download and install the dependencies.

pip install --upgrade pip
pip install google-api-core==1.32
pip install apache_beam[gcp]==2.41.0
pip install tensorflow==2.8
pip install tfx_bsl
pip install tensorflow-text==2.8.1
import numpy as np
import tensorflow as tf
import tensorflow_text as text
from scipy.special import expit

import apache_beam as beam
import tfx_bsl
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public import tfxio
from tfx_bsl.public.proto import model_spec_pb2
from tfx_bsl.public.tfxio import TFExampleRecord
from tensorflow_serving.apis import prediction_log_pb2

Install the model

Download a pretrained binary classifier to perform sentiment analysis on an IMDB dataset from Google Cloud Storage. This model was trained by following this TensorFlow text classification tutorial.

model_dir = "gs://apache-beam-testing-ml-examples/imdb_bert"

Install helper functions

The model also uses helper functions.

class ExampleProcessor:
  """
  Process the raw text input to a format suitable for RunInference.
  TensorFlow model handler expects a serialized tf.Example as input
  """
  def create_example(self, feature):
    return tf.train.Example(
        features=tf.train.Features(
              feature={'x' : self.create_feature(feature)})
        )

  def create_feature(self, element):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[element]))

class PredictionProcessor(beam.DoFn):
   """
   Process the RunInference output to return the input text and the softmax probability
   """
   def process(
           self,
           element: prediction_log_pb2.PredictionLog):
       predict_log = element.predict_log
       input_value = tf.train.Example.FromString(predict_log.request.inputs['text'].string_val[0])
       output_value = predict_log.response.outputs
      #  print(output_value)
       yield (f"input is [{input_value.features.feature['x'].bytes_list.value}] output is {expit(output_value['classifier'].float_val)}")

Prepare the input

This section demonstrates how to prepare the input for your model.

inputs = np.array([
    b"this is such an amazing movie",
    b"The movie was great",
    b"The movie was okish",
    b"The movie was terrible"
])
input_strings_file = 'input_strings.tfrecord'

# Because RunInference is expecting a serialized tf.example as an input, preprocess the input.
# Write the processed input to a file. 
# You can also do this preprocessing as a pipeline step by using beam.Map().

with tf.io.TFRecordWriter(input_strings_file) as writer:
 for i in inputs:
   example = ExampleProcessor().create_example(feature=i)
   writer.write(example.SerializeToString())

Run the pipeline

This section demonstrates how to create and run the RunInference pipeline.

saved_model_spec = model_spec_pb2.SavedModelSpec(model_path=model_dir)
inference_spec_type = model_spec_pb2.InferenceSpecType(saved_model_spec=saved_model_spec)

# A Beam I/O that reads a file of serialized tf.Examples
tfexample_beam_record = TFExampleRecord(file_pattern='input_strings.tfrecord')

with beam.Pipeline() as pipeline:
    _ = ( pipeline | "Create Input PCollection" >> tfexample_beam_record.RawRecordBeamSource()
                   | "Do Inference" >> RunInference(model_spec_pb2.InferenceSpecType(
                                  saved_model_spec=model_spec_pb2.SavedModelSpec(model_path=model_dir)))
                   | "Post Process" >> beam.ParDo(PredictionProcessor())
                   | beam.Map(print)
        )
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tfx_bsl/beam/run_inference.py:615: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
input is [[b'this is such an amazing movie']] output is [0.99906057]
input is [[b'The movie was great']] output is [0.99307914]
input is [[b'The movie was okish']] output is [0.03274685]
input is [[b'The movie was terrible']] output is [0.00680008]

RunInference with scikit-learn

This section demonstrates how to use the RunInference API with scikit-learn.

Install dependencies

First, download and install the dependencies.

pip install --upgrade pip
pip install google-api-core==1.32
pip install apache_beam[gcp]==2.41.0
import pickle

import apache_beam as beam
from apache_beam.ml.inference import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy, ModelFileType

Install the model

To classify movie reviews as either positive or negative, train and save a sentiment analysis pipeline about movie reviews.

This model was trained by following this scikit-learn tutorial

model_dir = "gs://apache-beam-testing-ml-examples/sklearn-text-classification/sklearn_sentiment_analysis_pipeline.pkl"

Run the pipeline

This section demonstrates how to create and run the RunInference pipeline.

inputs = [
    "In my soul and in my heart, I’m convinced I’m wrong!",
    "Be with me always—take any form—drive me mad! only do not leave me in this abyss, where I cannot find you!",
    "Do I want to live? Would you like to live with your soul in the grave?",
    "Honest people don’t hide their deeds.",
    "Nelly, I am Heathcliff!  He’s always, always in my mind: not as a pleasure, any more than I am always a pleasure to myself, but as my own being.",
]
# Choose an sklearn model handler based on the input data type:
# 1. SklearnModelHandlerNumpy: For using numpy arrays as input.
# 2. SklearnModelHandlerPandas: For using pandas dataframes as input.

# The sklearn model handler supports loading two serialized formats:
# 1. ModelFileType.PICKLE: For models saved using pickle.
# 2. ModelFileType.JOBLIB: For models saved using Joblib.

model_handler = SklearnModelHandlerNumpy(model_uri=model_dir, model_file_type=ModelFileType.PICKLE)

with beam.Pipeline() as pipeline:
  _ = (pipeline | "Create inputs" >> beam.Create(inputs)
                | "Inference" >> RunInference(model_handler=model_handler)
                | "Print" >> beam.Map(lambda x: print(f"input: {x.example} -> {'positive' if x.inference == 0 else 'negative'}"))
  )
input: In my soul and in my heart, I’m convinced I’m wrong! -> negative
input: Be with me always—take any form—drive me mad! only do not leave me in this abyss, where I cannot find you! -> positive
input: Do I want to live? Would you like to live with your soul in the grave? -> positive
input: Honest people don’t hide their deeds. -> negative
input: Nelly, I am Heathcliff!  He’s always, always in my mind: not as a pleasure, any more than I am always a pleasure to myself, but as my own being. -> negative