Compute and apply vocabulary on a dataset

Run in Google Colab View source on GitHub

The ComputeAndApplyVocabulary data processing transform computes a unique vocabulary from a dataset and then maps each word or token to a distinct integer index. Use this transform to change textual data into numerical representations for machine learning (ML) tasks.

When you train ML models that use text data, generating a vocabulary on the incoming dataset is an important preprocessing step. By mapping words to numerical indices, the vocabulary reduces the complexity and dimensionality of dataset. This step allows ML models to process the same words in a consistent way.

This notebook shows how to use MLTransform to complete the following tasks:

  • Use write mode to generate a vocabulary on the input text and assign an index value to each token.
  • Use read mode to use the generated vocabulary and assign an index to a different dataset.

MLTransform uses the ComputeAndApplyVocabulary transform, which is implemented by using tensorflow_transform to generate the vocabulary.

For more information about using MLTransform, see Preprocess data with MLTransform in the Apache Beam documentation

Install the required modules

To use ComputeAndVocabulary with MLTransfrom, install tensorflow_transform and the Apache Beam SDK version 2.53.0 or later.

 pip install apache_beam>=2.53.0 --quiet
 pip install tensorflow-transform --quiet
import os
import tempfile
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary

Use the artifact location

In write mode, the artifact location is used to store artifacts, such as the vocabulary file generated by ComputeAndApplyVocabulary.

In read mode, MLTransform fetches artifacts from the specified artifact location. Pass the same artifact location that you used in write mode. Otherwise, a RuntimeError occurs or MLTransform produces unexpected results in read mode.

artifact_location = tempfile.mkdtemp(prefix='compute_and_apply_vocab_')
artifact_location_with_frequency_threshold = tempfile.mkdtemp(prefix='compute_and_apply_vocab_frequency_threshold_')
documents = [
    {"feature": "the quick brown fox jumps over the lazy dog"},
    {"feature": "the five boxing wizards jump quickly in the sky"},
    {"feature": "dogs are running in the park"},
    {"feature": "the quick brown fox"}
]

In this example, in write mode, MLTransform uses ComputeAndApplyVocabulary to generate vocabulary on the incoming dataset. The incoming text data is split into tokens. Each token is assigned an unique index.

The generated vocabulary is stored in an artifact location that you can use on a different dataset in read mode.

with beam.Pipeline() as pipeline:
  data_pcoll = pipeline | "CreateData" >> beam.Create(documents)
  # Compute and apply vocabulary by using MLTransform.
  transformed_pcoll = (
      data_pcoll
      | "MLTransform" >> MLTransform(write_artifact_location=artifact_location).with_transform(
          ComputeAndApplyVocabulary(columns=['feature'], split_string_by_delimiter=' ', vocab_filename='vocab_index'))
      )
  transformed_pcoll | "Print" >> beam.Map(print)
Row(feature=array([ 0,  1,  4,  3, 12, 10,  0, 11, 16]))
Row(feature=array([ 0, 14, 17,  5, 13,  8,  2,  0,  6]))
Row(feature=array([15, 18,  7,  2,  0,  9]))
Row(feature=array([0, 1, 4, 3]))

Understand and visualize vocabulary

When working with text data in machine learning, one common step is the generation of a vocabulary index. MLTransform completes this step by using the ComputeAndApplyVocabulary transformation. Each unique word in your text data is assigned a specific index. This index is then used to represent the text in a numerical format, which is needed for machine learning algorithms.

In this example, the ComputeAndApplyVocabulary transformation is applied to the feature column. A vocabulary index is created for each unique word found in this column.

To visualize and understand this generated vocabulary, use the ArtifactsFetcher class. This class allows you to retrieve the vocabulary list from your specified location. When you have this list, you can see the index associated with each word in your vocabulary. This index corresponds to the numerical representation used in the transformation output of ComputeAndApplyVocabulary.

Examine this vocabulary index to understand how your text data is being processed and represented numerically. This understanding is useful for debugging and improving machine learning models that rely on text data.

from apache_beam.ml.transforms.utils import ArtifactsFetcher
artifact_fetcher = ArtifactsFetcher(artifact_location)
vocab_list = artifact_fetcher.get_vocab_list(vocab_filename='vocab_index_feature')
for i in range(len(vocab_list)):
  print(f'{i}: {vocab_list[i]}')
0: the
1: quick
2: in
3: fox
4: brown
5: wizards
6: sky
7: running
8: quickly
9: park
10: over
11: lazy
12: jumps
13: jump
14: five
15: dogs
16: dog
17: boxing
18: are

Set the frequency threshold

The frequency_threshold parameter identifies the elements that appear frequently in the dataset. This parameter limits the generated vocabulary to elements with an absolute frequency greater than or equal to the specified threshold. If you don't specify the parameter, the entire vocabulary is generated.

If the frequency of a vocabulary item is less than the threshold, it's assigned a default value. You can use the default_value parameter to set this value. Otherwise, it defaults to -1.

with beam.Pipeline() as pipeline:
  data_pcoll = pipeline | "CreateData" >> beam.Create(documents)
  # Compute and apply vocabulary by using MLTransform.
  transformed_pcoll = (
      data_pcoll
      | "MLTransform" >> MLTransform(write_artifact_location=artifact_location_with_frequency_threshold).with_transform(
          ComputeAndApplyVocabulary(columns=['feature'], split_string_by_delimiter=' ', frequency_threshold=2, vocab_filename='vocab_index'))
      )
  transformed_pcoll | "Print" >> beam.Map(print)
Row(feature=array([ 0,  1,  4,  3, -1, -1,  0, -1, -1]))
Row(feature=array([ 0, -1, -1, -1, -1, -1,  2,  0, -1]))
Row(feature=array([-1, -1, -1,  2,  0, -1]))
Row(feature=array([0, 1, 4, 3]))

In the output, if the frequency of the token is less than the specified frequency, it's assigned to a default_value of -1. For the other tokens, a vocabulary file is generated.

from apache_beam.ml.transforms.utils import ArtifactsFetcher
artifact_fetcher = ArtifactsFetcher(artifact_location_with_frequency_threshold)
vocab_list = artifact_fetcher.get_vocab_list(vocab_filename='vocab_index_feature')
for i in range(len(vocab_list)):
  print(f'{i}: {vocab_list[i]}')
0: the
1: quick
2: in
3: fox
4: brown

Use MLTransform for inference workloads

When MLTransform is in write mode, it produces artifacts, such as vocabulary files for ComputeAndApplyVocabulary. These artifacts allow you to apply the same vocabulary, and any other preprocessing transforms, when you train your model and serve it in production, or when you test its accuracy.

When MLTransform is used read mode, it uses the previously generated vocabulary files to map the incoming text data. If the incoming vocabulary isn't found in the generated vocabulary, then the incoming vocabulary is mapped to a default_value provided during write mode. In this case, the default_value is -1.

When MLTransform is in write mode, it produces artifacts, such as vocabulary files for ComputeAndApplyVocabulary.

test_documents = [
    {'feature': 'wizards are flying in the sky'},
    {'feature': 'I love dogs'}
]

with beam.Pipeline() as pipeline:
  data_pcoll = pipeline | "CreateData" >> beam.Create(test_documents)
  # Compute and apply vocabulary by using MLTransform.
  transformed_pcoll = (
      data_pcoll
      | "MLTransform" >> MLTransform(read_artifact_location=artifact_location))

  transformed_pcoll | "Print" >> beam.Map(print)
Row(feature=array([ 5, 18, -1,  2,  0,  6]))
Row(feature=array([-1, -1, 15]))

When you specify read_artifact_location, you don't have to pass any transforms to MLTransform. Instead, MLTransform saves the artifacts and the transforms produced in the location specified by write_artifact_location.