Google Cloud Platform

Using BigDL for deep learning with Apache Spark and Google Cloud Dataproc

BigDL is a distributed deep learning library developed and open-sourced by Intel Corp to bring native deep learning support to Apache Spark. By leveraging the distributed execution capabilities in Apache Spark, BigDL can help you take advantage of large-scale distributed training in deep learning.

BigDL is implemented as a library on top of Apache Spark, as shown below. It can be seamlessly integrated with other Spark libraries (e.g., Spark SQL and Dataframes, Spark ML pipelines, Spark Streaming, Structured Streaming). With BigDL, users can write their deep learning applications as standard Spark programs in either Scala or Python and directly run them on top of Cloud Dataproc clusters.

bigdl-spark-1e4uh.PNG

In this blog post, we’ll show you how to train models on Cloud Dataproc using BigDL’s Scala and Python APIs in Apache Zeppelin.

Deploying BigDL on Cloud Dataproc

You can use use the BigDL and Apache Zeppelin initialization actions to create a new Cloud Dataproc cluster with BigDL and Apache Zeppelin pre-installed. In order to do so, run the commands using gcloud, a command line utility included in the Google Cloud SDK.

Note: the below instructions have been tested on Linux and Mac OS. There are some known issues with ‘ssh’ command-line options on Windows version of Google Cloud SDK.

  CLUSTER_NAME=bigdl-on-zeppelin
gcloud dataproc clusters create $CLUSTER_NAME 
--initialization-actions=gs://dataproc-initialization-actions/bigdl/bigdl.sh,gs://dataproc-initialization-actions/zeppelin/zeppelin.sh 
--initialization-action-timeout=10m

By default, the BigDL initialization action downloads the latest compatible BigDL release (BigDL 0.4.0, Spark 2.2.0, and Scala 2.11.8 at the time of this publication). To download a different version of BigDL or one targeted to a different version of Spark/Scala, refer to the instructions in initialization action’s README.

Now you can run existing BigDL examples or develop your own deep learning applications on Cloud Dataproc! More information can be found on how to run BigDL on Cloud Dataproc and in the BigDL documentation.

Prepare MNIST data

Before we connect to Zeppelin, let’s download the MNIST dataset locally. MNIST is a small dataset of handwritten digits that is popular for machine learning examples, mainly because it requires minimal preprocessing and formatting.

  mkdir mnist
cd mnist
wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
gunzip *

Then use gsutil to upload the files to your Cloud Storage bucket. gsutil is another command line utility that comes with the Cloud SDK.

  BUCKET=gs://your-bucket
gsutil mb $BUCKET # Create bucket if necessary
gsutil cp * $BUCKET/mnist

Connect to Apache Zeppelin

In one terminal, create an SSH tunnel so you can connect to the notebook in a browser. This command will hang without printing any output.

If your corporate firewall blocks outbound ssh, you will need to disconnect from your corporate network to run the following commands.

  gcloud compute ssh --ssh-flag="-nND 1080" $CLUSTER_NAME-m

In another terminal, run a browser that uses the proxy server you just created. The instructions vary by operating system:

Mac:

  /Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --proxy-server="socks5://localhost:1080" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp

Linux:

  /usr/bin/google-chrome --proxy-server="socks5://localhost:1080" \
--host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" \
--user-data-dir=/tmp

Windows:

  C:\Program Files (x86)\Google\Chrome\Application\chrome.exe --proxy-server="socks5://localhost:1080" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp

Now you can connect to Zeppelin on your cluster by typing “localhost:8080” in the new browser window.

bigdl-spark-2ofi7.PNG

Click “Create new note” to get started. All the code in this blog should be run in the note you create.

Lenet example with BigDL Scala API

We will build a Lenet5 model to classify the MNIST digits.

1. BigDL initialization

Let's start our experiment by importing some necessary packages and initializing the engine.

  import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.dataset.DataSet
import com.intel.analytics.bigdl.dataset.image.{BytesToGreyImg, GreyImgNormalizer, GreyImgToBatch}
import com.intel.analytics.bigdl.nn.{ClassNLLCriterion, Module}
import com.intel.analytics.bigdl.numeric.NumericFloat
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter, T, Table}
import com.intel.analytics.bigdl.models.lenet.LeNet5
import com.intel.analytics.bigdl.models.lenet.Utils._
      
Engine.init

2. Set hyperparameters

You may need to tune these hyperparameters depending on your dataset and cluster size.

  val lr = 0.05
val lrDecay = 0.0
val batchSize = 12
val maxEpoch = 5

3. Load MNIST dataset

Read the binary file into a ByteRecord, which contains a byte array to represent the features and a label representing which digit (0-9) the image contains. You can read about the MNIST feature and label file formats here.

  import java.nio.ByteBuffer
import com.intel.analytics.bigdl.dataset.ByteRecord
import org.apache.spark.rdd.RDD
def preprocess(feature: Array[Byte], label: Array[Byte]): Array[ByteRecord] = {
  val featureBuffer = ByteBuffer.wrap(feature)
  val labelBuffer = ByteBuffer.wrap(label)
       
  val labelMagicNumber = labelBuffer.getInt()
  require(labelMagicNumber == 2049)
  val featureMagicNumber = featureBuffer.getInt()
  require(featureMagicNumber == 2051)
      
  val labelCount = labelBuffer.getInt()
  val featureCount = featureBuffer.getInt()
  require(labelCount == featureCount)
      
  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()
  val result = new Array[ByteRecord](featureCount)
  var i = 0
  while (i < featureCount) {
    val img = new Array[Byte]((rowNum * colNum))
    var y = 0
    while (y < rowNum) {
      var x = 0
      while (x < colNum) {
  img(x + y * colNum) = featureBuffer.get()
  x += 1
      }
      y += 1
    }
    result(i) = ByteRecord(img, labelBuffer.get().toFloat + 1.0f)
    i += 1
  }
  result
}
def loadBinaryFile(filePath: String, labelPath: String): RDD[ByteRecord] = {
    val img = sc.binaryFiles(filePath).map{data => data._2.toArray}
    val label = sc.binaryFiles(labelPath).map{data => data._2.toArray}
    val imgLabel = img.zip(label).mapPartitions{iter =>
        //there is only one element in the iterator as only one file is read
        val zipData = iter.next
        val byteRecords = preprocess(zipData._1, zipData._2)
        byteRecords.iterator
    }
    imgLabel
}

Make sure to replace “YOUR-BUCKET” with the name of the Cloud Storage bucket containing your MNIST data.
Pre-processes the data by normalizing the image pixel values for better neural network performance.

  val bucket = "YOUR-BUCKET"
val trainData = s"gs://$bucket/mnist/train-images-idx3-ubyte"
val trainLabel = s"gs://$bucket/mnist/train-labels-idx1-ubyte"
val validationData = s"gs://$bucket/mnist/t10k-images-idx3-ubyte"
val validationLabel = s"gs://$bucket/mnist/t10k-labels-idx1-ubyte"
      
/**
 * Several transformers are used to preprocess the image data.
 * BytesToGreyImg is used to convert Bytes to Grey image.
 * GreyImgNormalizer will normalize a grey image.
 * GreyImgToBatch is used to convert a batch of labeled grey images into a   
 * Mini-batch.
 */
val trainSet = DataSet.rdd(loadBinaryFile(trainData, trainLabel)) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(batchSize)
      
val validationSet = DataSet.rdd(loadBinaryFile(validationData, validationLabel)) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(batchSize)

More information about BigDL’s `DataSet` class can be found in the API documentation.

4. Set up the model

  import com.intel.analytics.bigdl.models.lenet.LeNet5
      
val model = LeNet5(classNum = 10)

5. Train the model

Run an Optimizer that minimizes our model’s loss on the training data. It will take around ten minutes to finish on a default two-worker cluster.

  val optimizer = Optimizer(
  model = model,
  dataset = trainSet,
  criterion = ClassNLLCriterion[Float]())
      
val optimMethod = new SGD[Float](learningRate = lr,
  learningRateDecay = lrDecay)
             
val trainedModel = optimizer
        .setValidation(
          trigger = Trigger.everyEpoch,
    dataset = validationSet,
    vMethods = Array(new Top1Accuracy, new Top5Accuracy[Float], new Loss[Float]))
  .setOptimMethod(optimMethod)
  .setEndWhen(Trigger.maxEpoch(maxEpoch))
  .optimize()

6. Test the model

Let’s use a Validator and the trainedModel to test the accuracy of the model on validation dataset.

  val validator = Validator(trainedModel, validationSet)
val result = validator.test(Array(new Top1Accuracy[Float]))
      
result.foreach(r => {
    println(s"${r._2} is ${r._1}")
})

You will achieve a respectable 99% accuracy.

bigdl-spark-3uea5.PNG

Autoencoder example with BigDL Python API

In this tutorial, we are going to use an autoencoder to learn how to compress handwritten digit images from the MNIST dataset into a lower dimensional representation. Then, we’ll reconstruct the original image from the representation.

The autoencoder model attempts to minimize differences between an original input image and its reconstructed form. While the conversion is fairly lossy, it is still good enough for some practical applications, such as data denoising and dimensionality reduction for data visualization.

1. BigDL initialization

Again, let's start by importing the necessary packages and initializing the engine:

  %spark.pyspark
      
import numpy as np
import datetime as dt
import matplotlib.pyplot as plt
from bigdl.nn.layer import Sequential, Linear, ReLU, Sigmoid
from bigdl.nn.criterion import MSECriterion
from bigdl.optim.optimizer import Optimizer, Adam, MaxEpoch
from bigdl.util.common import *
from bigdl.dataset import mnist
from matplotlib.pyplot import imshow
      
init_engine()

2. Load MNIST Dataset

Define the get_mnist method. It will convert the image data into Spark RDD and normalize the image pixel value.

  %spark.pyspark
import numpy as np
from bigdl.util import common
from bigdl.dataset import mnist
      
def get_mnist(sc, images, labels, std, mean):
    rdd_train_images = sc.parallelize(images)
    rdd_train_labels = sc.parallelize(labels)
    # zip features and label into a sample, normalize the image value
    rdd_train_sample = rdd_train_images.zip(rdd_train_labels).map(lambda (features, label): common.Sample.from_ndarray(
  (features - mean) / std,
  label + 1))
return rdd_train_sample

Similar to the Scala example, we need to normalize the image pixel values for better neural network performance.

  %spark.pyspark
# Get and store MNIST into RDD of Sample
mnist_path = "datasets/mnist"
(train_images, train_labels) = mnist.read_data_sets(mnist_path, "train")
(test_images, test_labels) = mnist.read_data_sets(mnist_path, "test")
training_mean = np.mean(train_images)
training_std = np.std(train_images)
train_data = get_mnist(sc, train_images, train_labels, training_mean, training_std)test_data = get_mnist(sc, test_images, test_labels, training_mean, training_std)
train_data = train_data.map(lambda sample:
      Sample.from_ndarray(np.resize(sample.features[0].to_ndarray(), (28*28,)), np.resize(sample.features[0].to_ndarray(), (28*28,))))
test_data = test_data.map(lambda sample:
      Sample.from_ndarray(np.resize(sample.features[0].to_ndarray(), (28*28,)), np.resize(sample.features[0].to_ndarray(), (28*28,))))

3. Model Setup

This is a very simple model with one hidden layer in the encoder. The size of the hidden layer (32) is the number of features in the compressed image. Since our input image had 784 features, the compressed images are about 4% of their original size.

Note that we can use such a simple model because the MNIST dataset is very simple.

  %spark.pyspark
      
# Hyperparameters
training_epochs = 10
batch_size = 120
display_step = 1
      
# Network Parameters
n_hidden = 32
n_input = 784 # MNIST data input (img shape: 28*28)
def build_autoencoder(n_input, n_hidden):
    # Initialize a sequential container
    model = Sequential()
      
    # encoder
    model.add(Linear(n_input, n_hidden))
    model.add(ReLU())
    # decoder
    model.add(Linear(n_hidden, n_input))
    model.add(Sigmoid())
         
    return model
model = build_autoencoder(n_input, n_hidden)

4. Train the model

Run an optimizer that minimizes the differences between the input and reconstructed images. This will take several minutes.

  %spark.pyspark
  optimizer = Optimizer(
    model=model,
    training_rdd=train_data,
    criterion=MSECriterion(),
    optim_method=Adam(),
    end_trigger=MaxEpoch(training_epochs),
    batch_size=batch_size)
      
trained_model = optimizer.optimize()
print "Optimization Done."

5. Prediction on Test Dataset

We are going to use 10 examples to demonstrate that our trained autoencoder produces reasonable reconstructed images. We will compress and reconstruct the original images, then take 10 to compare with the original inputs.

  %spark.pyspark
      
examples_to_show = 10
examples = trained_model.predict(test_data).take(examples_to_show)
f, a = plt.subplots(2, examples_to_show, figsize=(examples_to_show, 2))
for i in range(examples_to_show):
    a[0][i].imshow(np.reshape(test_images[i], (28, 28)))
    a[1][i].imshow(np.reshape(examples[i], (28, 28)))

You will see that the dataset has been reasonably reconstructed. The first line contains 10 of the original images, and the second line contains their reconstructed versions.

bigdl-spark-4iw9i.PNG

In this blog post, we demonstrated how users can easily build deep learning applications in a distributed fashion on Cloud Dataproc using familiar tools such as Spark and Zeppelin. The high scalability, high performance, and ease of use of BigDL make it easy to analyze large datasets using deep learning.

To learn more about the BigDL project, please check out the BigDL website, and for some interesting applications, please check out these BigDL talks. Lastly, we’ve made more tutorials available here.