Preprocess data with MLTransform

Run in Google Colab View source on GitHub

This notebook demonstrates how to use MLTransform to preprocess data for machine learning workflows. Apache Beam provides a set of transforms for preprocessing data for training and inference. The MLTransform class wraps various transforms in one PTransform, simplifying your workflow. For a list of available preprocessing transforms see the Preprocess data with MLTransform page in the Apache Beam documentation.

This notebook uses data processing transforms defined in the apache_beam/ml/transforms/tft module.

Import the required modules

To use MLTransfrom, install tensorflow_transform and the Apache Beam SDK version 2.50.0 or later.

pip install tensorflow_transform --quiet
pip install apache_beam>=2.50.0 --quiet
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.ml.transforms.utils import ArtifactsFetcher

Artifacts are additional data elements created by data transformations. Examples of artifacts are the minimum and maximum values from a ScaleTo01 transformation, or the mean and variance from a ScaleToZScore transformation. For more information about artifacts, see Artifacts.

# Store artifacts generated by MLTransform.
# Each MLTransform instance requires an empty artifact location.
# This method deletes and refreshes the artifact location for each example.
artifact_location = './my_artifacts'
def delete_artifact_location(artifact_location):
  import shutil
  import os
  if os.path.exists(artifact_location):
      shutil.rmtree(artifact_location)

Compute and map the vocabulary

ComputeAndApplyVocabulary is a data processing transform that computes a unique vocabulary from a dataset and then maps each word or token to a distinct integer index. It facilitates transforming textual data into numerical representations for machine learning tasks.

Use ComputeAndApplyVocabulary with MLTransform.

delete_artifact_location(artifact_location)

data = [
    {'x': ['I', 'love', 'pie']},
    {'x': ['I', 'love', 'going', 'to', 'the', 'park']}
]
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    data = (
        p
        | 'CreateData' >> beam.Create(data)
        | 'MLTransform' >> MLTransform(write_artifact_location=artifact_location).with_transform(ComputeAndApplyVocabulary(columns=['x']))
        | 'PrintResults' >> beam.Map(print)
    )
Row(x=array([1, 0, 4]))
Row(x=array([1, 0, 6, 2, 3, 5]))

Fetch vocabulary artifacts

This example generates a file with all the vocabulary in the dataset, referred to in MLTransform as an artifact. To fetch artifacts generated by the ComputeAndApplyVocabulary transform, use the ArtifactsFetcher class. This class fetches both a vocabulary list and a path to the vocabulary file.

fetcher = ArtifactsFetcher(artifact_location=artifact_location)
# get vocab list
vocab_list = fetcher.get_vocab_list()
print(vocab_list)
# get vocab file path
vocab_file_path = fetcher.get_vocab_filepath()
print(vocab_file_path)
# get vocab size
vocab_size = fetcher.get_vocab_size()
print(vocab_size)
['love', 'I', 'to', 'the', 'pie', 'park', 'going']
./my_artifacts/transform_fn/assets/compute_and_apply_vocab
7

Use TD-IDF to weight terms

TF-IDF (Term Frequency-Inverse Document Frequency) is a numerical statistic used in text processing to reflect how important a word is to a document in a collection or corpus. It balances the frequency of a word in a document against its frequency in the entire corpus, giving higher value to more specific terms.

Use TF-IDF with MLTransform.

  1. Compute the vocabulary of the dataset by using ComputeAndApplyVocabulary.
  2. Use the output of ComputeAndApplyVocabulary to calculate the TF-IDF weights.
from apache_beam.ml.transforms.tft import TFIDF
data = [
    {'x': ['I', 'love', 'pie']},
    {'x': ['I', 'love', 'going', 'to', 'the', 'park']}
]
delete_artifact_location(artifact_location)
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    data = (
        p
        | beam.Create(data)
        | MLTransform(write_artifact_location=artifact_location
                     ).with_transform(ComputeAndApplyVocabulary(columns=['x'])
                     ).with_transform(TFIDF(columns=['x']))
    )
    _ = data | beam.Map(print)
Row(x=array([1, 0, 4]), x_tfidf_weight=array([0.33333334, 0.33333334, 0.4684884 ], dtype=float32), x_vocab_index=array([0, 1, 4]))
Row(x=array([1, 0, 6, 2, 3, 5]), x_tfidf_weight=array([0.16666667, 0.16666667, 0.2342442 , 0.2342442 , 0.2342442 ,
       0.2342442 ], dtype=float32), x_vocab_index=array([0, 1, 2, 3, 5, 6]))

TF-IDF output

TF-IDF produces two output columns for a given input. For example, if you input x, the output column names in the dictionary are x_vocab_index and x_tfidf_weight.

  • vocab_index: indices of the words computed in the ComputeAndApplyVocabulary transform.
  • tfidif_weight: the weight for each vocabulary index. The weight represents how important the word present at that vocab_index is to the document.

Scale the data

The following examples show two ways to scale data:

  • Scale data between 0 and 1.
  • Scale data using z-score.

Scale the data between 0 and 1

Scale the data so that it's in the range of 0 and 1. To scale the data, the transform calculates minimum and maximum values on the whole dataset, and then performs the following calculation:

x = (x - x_min) / (x_max)

To scale the data, use the ScaleTo01 data processing transform in MLTransform.

delete_artifact_location(artifact_location)

from apache_beam.ml.transforms.tft import ScaleTo01
data = [
    {'x': [1, 2, 3]}, {'x': [4, 5, 7]}, {'x': [10, 2, 10, 34, 100, 54, 20, 10, 2, 3, 11, 12]}]

with beam.Pipeline() as p:
    _ = (
        p
        | 'CreateData' >> beam.Create(data)
        | 'MLTransform' >> MLTransform(write_artifact_location=artifact_location).with_transform(ScaleTo01(columns=['x']))
        | 'PrintResults' >> beam.Map(print)
    )
Row(x=array([0.        , 0.01010101, 0.02020202], dtype=float32), x_max=array([100.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0.03030303, 0.04040404, 0.06060606], dtype=float32), x_max=array([100.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0.09090909, 0.01010101, 0.09090909, 0.33333334, 1.        ,
       0.53535354, 0.1919192 , 0.09090909, 0.01010101, 0.02020202,
       0.1010101 , 0.11111111], dtype=float32), x_max=array([100.], dtype=float32), x_min=array([1.], dtype=float32))

The output contains artifacts such as x_max and x_min, which represent the maximum and minimum values of the entire dataset.

Scale the data by using the z-score

Similar to ScaleTo01, use ScaleToZScore to scale the values by using the [z-score](z-score.

delete_artifact_location(artifact_location)

from apache_beam.ml.transforms.tft import ScaleToZScore
data = [
    {'x': [1, 2, 3]}, {'x': [4, 5, 7]}, {'x': [10, 2, 10, 34, 100, 54, 20, 10, 2, 3, 11, 12]}]

# delete_artifact_location(artifact_location)
with beam.Pipeline() as p:
    _ = (
        p
        | 'CreateData' >> beam.Create(data)
        | 'MLTransform' >> MLTransform(write_artifact_location=artifact_location).with_transform(ScaleToZScore(columns=['x']))
        | 'PrintResults' >> beam.Map(print)
    )
Row(x=array([-0.62608355, -0.5846515 , -0.54321957], dtype=float32), x_mean=array([16.11111], dtype=float32), x_var=array([582.5432], dtype=float32))
Row(x=array([-0.50178754, -0.46035555, -0.37749153], dtype=float32), x_mean=array([16.11111], dtype=float32), x_var=array([582.5432], dtype=float32))
Row(x=array([-0.25319555, -0.5846515 , -0.25319555,  0.7411725 ,  3.4756844 ,
        1.5698125 ,  0.16112447, -0.25319555, -0.5846515 , -0.54321957,
       -0.21176355, -0.17033154], dtype=float32), x_mean=array([16.11111], dtype=float32), x_var=array([582.5432], dtype=float32))

Use multiple transforms on a single MLTransform

Apply the same transform on multiple columns. For example, columns x and y require scaling by 0 and 1. For column s, compute vocabulary. You can use a single MLTransform for both of these tasks.

When using multiple data processing transforms, either pass the transforms as chained transforms or directly as a list.

Use multiple data processing transforms in a single MLTransform

The following example shows multiple data processing transforms chained to MLTransform.

delete_artifact_location(artifact_location)

from apache_beam.ml.transforms.tft import ScaleTo01
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary

data = [
    {'x': [1, 2, 3], 'y': [10, 100, 111], 's': ['I', 'love', 'pie']},
    {'x': [4, 5, 7], 'y': [11, 21, 50], 's': ['I', 'love', 'going', 'to', 'the', 'park']}
]

# delete_artifact_location(artifact_location)
with beam.Pipeline() as p:
  _ = (
      p
      | 'CreateData' >> beam.Create(data)
      | 'MLTransform' >> MLTransform(write_artifact_location=artifact_location).with_transform(
          ScaleTo01(columns=['x', 'y'])).with_transform(ComputeAndApplyVocabulary(columns=['s']))
      | 'PrintResults' >> beam.Map(print)
  )
Row(s=array([1, 0, 4]), x=array([0.        , 0.16666667, 0.33333334], dtype=float32), x_max=array([7.], dtype=float32), x_min=array([1.], dtype=float32), y=array([0.       , 0.8910891, 1.       ], dtype=float32), y_max=array([111.], dtype=float32), y_min=array([10.], dtype=float32))
Row(s=array([1, 0, 6, 2, 3, 5]), x=array([0.5      , 0.6666667, 1.       ], dtype=float32), x_max=array([7.], dtype=float32), x_min=array([1.], dtype=float32), y=array([0.00990099, 0.10891089, 0.3960396 ], dtype=float32), y_max=array([111.], dtype=float32), y_min=array([10.], dtype=float32))

The following example shows multiple data processing transforms passed in as a list to MLTransform.

delete_artifact_location(artifact_location)

from apache_beam.ml.transforms.tft import ScaleTo01
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary

data = [
    {'x': [1, 2, 3], 'y': [10, 100, 111], 's': ['I', 'love', 'pie']},
    {'x': [4, 5, 7], 'y': [11, 21, 50], 's': ['I', 'love', 'going', 'to', 'the', 'park']}
]

transforms = [
    ScaleTo01(columns=['x', 'y']),
    ComputeAndApplyVocabulary(columns=['s'])
]

with beam.Pipeline() as p:
  _ = (
      p
      | 'CreateData' >> beam.Create(data)
      | 'MLTransform' >> MLTransform(write_artifact_location=artifact_location,
                                     transforms=transforms)
      | 'PrintResults' >> beam.Map(print)
  )
Row(s=array([1, 0, 4]), x=array([0.        , 0.16666667, 0.33333334], dtype=float32), x_max=array([7.], dtype=float32), x_min=array([1.], dtype=float32), y=array([0.       , 0.8910891, 1.       ], dtype=float32), y_max=array([111.], dtype=float32), y_min=array([10.], dtype=float32))
Row(s=array([1, 0, 6, 2, 3, 5]), x=array([0.5      , 0.6666667, 1.       ], dtype=float32), x_max=array([7.], dtype=float32), x_min=array([1.], dtype=float32), y=array([0.00990099, 0.10891089, 0.3960396 ], dtype=float32), y_max=array([111.], dtype=float32), y_min=array([10.], dtype=float32))

MLTransform for inference workloads

The previous examples show how to preprocess data for model training. This example uses the same preprocessing steps on the inference data. By using the same steps on the inference data, you can maintain consistent results.

Preprocess the data used by the inference by using the same preprocessing steps that you used on the data prior to training. When using MLTransform, pass the artifact location from the previous transforms to the parameter read_artifact_location. MLTransform uses the values and artifacts produced in the previous steps. You don't need to provide the transforms, because they are saved with the artifacts in the artifact location.

data = [
    {'x': [2], 'y': [59, 91, 85], 's': ['love']},
    {'x': [4, 5, 7], 'y': [111, 26, 30], 's': ['I', 'love', 'parks', 'and', 'dogs']}
]

with beam.Pipeline() as p:
  _ = (
      p
      | 'CreateData' >> beam.Create(data)
      | 'MLTransform' >> MLTransform(read_artifact_location=artifact_location)
      | 'PrintResults' >> beam.Map(print)
  )
Row(s=array([0]), x=array([0.16666667], dtype=float32), x_max=array([7.], dtype=float32), x_min=array([1.], dtype=float32), y=array([0.48514852, 0.8019802 , 0.7425743 ], dtype=float32), y_max=array([111.], dtype=float32), y_min=array([10.], dtype=float32))
Row(s=array([ 1,  0, -1, -1, -1]), x=array([0.5      , 0.6666667, 1.       ], dtype=float32), x_max=array([7.], dtype=float32), x_min=array([1.], dtype=float32), y=array([1.        , 0.15841584, 0.1980198 ], dtype=float32), y_max=array([111.], dtype=float32), y_min=array([10.], dtype=float32))