Preprocessing with the Apache Beam DataFrames API

Pandas DataFrames is one of the most common tools used for data exploration and preprocessing. Pandas is popular because of its ease of use. It has intuitive methods to perform common analytical tasks and data preprocessing.

Run in Google Colab View source on GitHub

For rapid execution, Pandas loads all of the data into memory on a single machine (one node). This configuration works well when dealing with small-scale datasets. However, many projects involve datasets that are too big to fit in memory. These use cases generally require parallel data processing frameworks, such as Apache Beam.

Beam DataFrames provide a Pandas-like API to declare and define Beam processing pipelines. It provides a familiar interface for machine learning practioners to build complex data-processing pipelines by only invoking standard pandas commands.

To learn more about Apache Beam DataFrames, see the Beam DataFrames overview page.

Overview

The goal of this example is to explore a dataset preprocessed with the Beam DataFrame API for machine learning model training.

This example demonstrates the use of the Apache Beam DataFrames API to perform common data exploration as well as the preprocessing steps that are necessary to prepare your dataset for machine learning model training and inference. This example includes the following steps:

  • Removing unwanted columns.
  • One-hot encoding categorical columns.
  • Normalizing numerical columns.

In this example, the first section demonstrates how to build and execute a pipeline locally using the interactive runner. The second section uses a distributed runner to demonstrate how to run the pipeline on the full dataset.

Install Apache Beam

To explore the elements within a PCollection, install Apache Beam with the interactive component to use the Interactive runner. The DataFrames API methods invoked in this example are available in Apache Beam SDK versions 2.43 and later.

Install the latest Apache Beam SDK version.

!git clone https://github.com/apache/beam.git

!cd beam/sdks/python && pip3 install -r build-requirements.txt 

%pip install -e beam/sdks/python/.[interactive,gcp]

Local exploration with the Interactive Beam runner

Use the Interactive Beam runner to explore and develop your pipeline. This runner allows you to test the code interactively, progressively building out the pipeline before deploying it on a distributed runner.

This section uses a subset of the original dataset, because the notebook instance has limited compute resources.

Load the data

To read CSV files into DataFrames, Pandas has the pandas.read_csv function. This notebook uses the Beam beam.dataframe.io.read_csv function, which emulates pandas.read_csv. The main difference is that the Beam function returns a deferred Beam DataFrame, whereas the Pandas function returns a standard DataFrame.

import os

import numpy as np
import pandas as pd 
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import dataframe
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.dataflow import DataflowRunner

# Available options: [sample_1000, sample_10000, sample_100000, full], where
# sample contains the entire dataset (around 1000000 samples).

source_csv_file = 'gs://apache-beam-samples/nasa_jpl_asteroid/sample_10000.csv'

# Initialize the pipeline.
p = beam.Pipeline(InteractiveRunner())

beam_df = p | beam.dataframe.io.read_csv(source_csv_file)

Preprocess the data

This example uses the NASA - Nearest Earth Objects dataset. This dataset includes information about objects in outer space. Some objects are close enough to Earth to cause harm. This dataset compiles the list of NASA certified asteroids that are classified as the nearest earth objects to understand which objects pose a risk.

Inspect the dataset columns and their types.

beam_df.dtypes
spk_id                       int64
full_name                   object
near_earth_object           object
absolute_magnitude         float64
diameter                   float64
albedo                     float64
diameter_sigma             float64
eccentricity               float64
inclination                float64
moid_ld                    float64
object_class                object
semi_major_axis_au_unit    float64
hazardous_flag              object
dtype: object

When using Interactive Beam, to bring a Beam DataFrame into local memory as a Pandas DataFrame, use ib.collect().

ib.collect(beam_df)

The datasets contain the following two types of columns:

  • Numerical columns: Use normalization to transform these columns so that they can be used to train a machine learning model.

  • Categorical columns: Transform those columns with one-hot encoding to use them during training.

Use the standard pandas command DataFrame.describe() to generate descriptive statistics for the numerical columns, such as percentile, mean, std, and so on.

with dataframe.allow_non_parallel_operations():
  beam_df_description = ib.collect(beam_df.describe())

beam_df_description

Before running any transformations, verify that all of the columns need to be used for model training. Start by looking at the column description provided by the JPL website:

  • spk_id: Object primary SPK-ID.
  • full_name: Asteroid name.
  • near_earth_object: Near-earth object flag.
  • absolute_magnitude: The apparent magnitude an object would have if it were located at a distance of 10 parsecs.
  • diameter: Object diameter (from equivalent sphere) km unit.
  • albedo: A measure of the diffuse reflection of solar radiation out of the total solar radiation, measured on a scale from 0 to 1.
  • diameter_sigma: 1-sigma uncertainty in object diameter km unit.
  • eccentricity: A value between 0 and 1 that refers to how flat or round the asteroid is.
  • inclination: The angle with respect to the x-y ecliptic plane.
  • moid_ld: Earth Minimum Orbit Intersection Distance au unit.
  • object_class: The classification of the asteroid. For a more detailed description, see NASA object classifications.
  • Semi-major axis au Unit: The length of half of the long axis in AU unit.
  • hazardous_flag: Identifies hazardous asteroids.

The spk_id and full_name columns are unique for each row. You can remove these columns, because they are not needed for model training.

beam_df = beam_df.drop(['spk_id', 'full_name'], axis='columns', inplace=False)

Review the number of missing values.

ib.collect(beam_df.isnull().mean() * 100)
/content/beam/sdks/python/apache_beam/dataframe/frame_base.py:145: RuntimeWarning: invalid value encountered in long_scalars
  lambda left, right: getattr(left, op)(right), name=op, args=[other])
near_earth_object           0.000000
absolute_magnitude          0.000000
diameter                   13.111311
albedo                     13.271327
diameter_sigma             14.081408
eccentricity                0.000000
inclination                 0.000000
moid_ld                     0.000000
object_class                0.000000
semi_major_axis_au_unit     0.000000
hazardous_flag              0.000000
dtype: float64

Most of the columns do not have missing values. However, the columns diameter, albedo, and diameter_sigma have many missing values. Because these values cannot be measured or derived and aren't needed for training the ML model, remove the columns.

beam_df = beam_df.drop(['diameter', 'albedo', 'diameter_sigma'], axis='columns', inplace=False)
ib.collect(beam_df)

Normalize the data

Normalize the numerical columns so that they can be used to train a model. To standarize the data, you can subtract the mean and divide by the standard deviation. This process is also known as finding the z-score. This step improves the performance and training stability of the model during training and inference.

First, retrieve both the numerical columns and the categorical columns.

numerical_cols = beam_df.select_dtypes(include=np.number).columns.tolist()
categorical_cols = list(set(beam_df.columns) - set(numerical_cols))
# Get the numerical columns.
beam_df_numericals = beam_df.filter(items=numerical_cols)

# Standarize DataFrames with only the numerical columns.
beam_df_numericals = (beam_df_numericals - beam_df_numericals.mean())/beam_df_numericals.std()

ib.collect(beam_df_numericals)
/content/beam/sdks/python/apache_beam/dataframe/frame_base.py:145: RuntimeWarning: invalid value encountered in double_scalars
  lambda left, right: getattr(left, op)(right), name=op, args=[other])

Next, convert the categorical columns into one-hot encoded variables to use during training.

def get_one_hot_encoding(df: pd.DataFrame, categorical_col:list) -> pd.DataFrame:
  beam_df_categorical= beam_df[categorical_col]
  # Get unique values.
  with dataframe.allow_non_parallel_operations():
    unique_classes = pd.CategoricalDtype(ib.collect(beam_df_categorical.unique(as_series=True)))
  # Use `str.get_dummies()` to get the one-hot encoded representation of the categorical columns.
  beam_df_categorical = beam_df_categorical.astype(unique_classes).str.get_dummies()
  # Add a column name prefix to the newly created categorical columns.
  beam_df_categorical = beam_df_categorical.add_prefix(f'{categorical_col}_')

  return beam_df_categorical
for categorical_col in categorical_cols:
  beam_df_categorical = get_one_hot_encoding(df=beam_df, categorical_col=categorical_col)
  beam_df_numericals = beam_df_numericals.merge(beam_df_categorical, left_index = True, right_index = True)
ib.collect(beam_df_numericals)

Run the pipeline

This section combines the previous steps into a full pipeline implementation, and then visualizes the preprocessed data.

Note that the only standard Apache Beam method invoked here is the pipeline instance. The rest of the preprocessing commands are based on native pandas methods that are integrated with the Apache Beam DataFrame API.

# Specify the location of the source CSV file.
source_csv_file = 'gs://apache-beam-samples/nasa_jpl_asteroid/sample_10000.csv'

# Initialize the pipeline.
p = beam.Pipeline(InteractiveRunner())

# Create a deferred Apache Beam DataFrame with the contents of the CSV file.
beam_df = p | beam.dataframe.io.read_csv(source_csv_file)

# Drop irrelevant columns and columns with missing values.
beam_df = beam_df.drop(['spk_id', 'full_name','diameter', 'albedo', 'diameter_sigma'], axis='columns', inplace=False)

# Get numerical columns and columns with categorical values.
numerical_cols = beam_df.select_dtypes(include=np.number).columns.tolist()
categorical_cols = list(set(beam_df.columns) - set(numerical_cols))

# Normalize the numerical values.
beam_df_numericals = beam_df.filter(items=numerical_cols)
beam_df_numericals = (beam_df_numericals - beam_df_numericals.mean())/beam_df_numericals.std()


# One-hot encode the categorical values.
for categorical_col in categorical_cols:
  beam_df_categorical= get_one_hot_encoding(df=beam_df, categorical_col=categorical_col)
  beam_df_numericals = beam_df_numericals.merge(beam_df_categorical, left_index = True, right_index = True)

ib.collect(beam_df_numericals)
/content/beam/sdks/python/apache_beam/dataframe/frame_base.py:145: RuntimeWarning: invalid value encountered in double_scalars
  lambda left, right: getattr(left, op)(right), name=op, args=[other])

Process the full dataset with the distributed runner

The previous section demonstrates how to build and execute the pipeline locally using the interactive runner. This section demonstrates how to run the pipeline on the full dataset by switching to a distributed runner. For this example, the pipeline runs on Dataflow.

PROJECT_ID = "<my-gcp-project>" # @param {type:'string'}
REGION = "us-central1"
TEMP_DIR = "gs://<my-bucket>/tmp" # @param {type:'string'}
OUTPUT_DIR = "gs://<my-bucket>/dataframe-result" # @param {type:'string'}

These steps process the full dataset, full.csv, which contains approximately one million rows. To materialize the deferred DataFrame, these steps also write the results to a CSV file instead of using ib.collect().

To switch from an interactive runner to a distributed runner, update the pipeline options. The rest of the pipeline steps don't change.

# Specify the location of the source CSV file (the full dataset).
source_csv_file = 'gs://apache-beam-samples/nasa_jpl_asteroid/full.csv'

# Build a new pipeline that runs on Dataflow.
p = beam.Pipeline(DataflowRunner(),
                  options=beam.options.pipeline_options.PipelineOptions(
                      project=PROJECT_ID,
                      region=REGION,
                      temp_location=TEMP_DIR,
                      # To speed up the demo, disable autoscaling.
                      autoscaling_algorithm='NONE',
                      num_workers=10))

# Create a deferred Apache Beam DataFrame with the contents of the CSV file.
beam_df = p | beam.dataframe.io.read_csv(source_csv_file)

# Drop irrelevant columns and columns with missing values.
beam_df = beam_df.drop(['spk_id', 'full_name','diameter', 'albedo', 'diameter_sigma'], axis='columns', inplace=False)

# Get numerical columns and columns with categorical values.
numerical_cols = beam_df.select_dtypes(include=np.number).columns.tolist()
categorical_cols = list(set(beam_df.columns) - set(numerical_cols))

# Normalize the numerical values. 
beam_df_numericals = beam_df.filter(items=numerical_cols)
beam_df_numericals = (beam_df_numericals - beam_df_numericals.mean())/beam_df_numericals.std()


# One-hot encode the categorical values. 
for categorical_col in categorical_cols:
  beam_df_categorical= get_one_hot_encoding(df=beam_df, categorical_col=categorical_col)
  beam_df_numericals = beam_df_numericals.merge(beam_df_categorical, left_index = True, right_index = True

# Write the preprocessed dataset to a CSV file.
beam_df_numericals.to_csv(os.path.join(OUTPUT_DIR, "preprocessed_data.csv"))

Submit and run the pipeline.

p.run().wait_until_finish()

Wait while the pipeline job runs.

What's next

This tutorial demonstrated how to analyze and preprocess a large-scale dataset with the Apache Beam DataFrames API. You can now train a model on a classification task using the preprocessed dataset.

To learn more about how to get started with classifying structured data, see Structured data classification from scratch.

To continue learning, find another dataset to use with the Apache Beam DataFrames API processing. Think carefully about which features to include in your model and how to represent them.

Resources