Google Cloud Big Data and Machine Learning Blog

Innovation in data processing and machine learning technology

Using BigDL for deep learning with Apache Spark and Google Cloud Dataproc

Tuesday, April 3, 2018

By Ding Ding, Intel Software Architect; Karthik Palaniappan, Cloud Dataproc Software Engineer; and Sergey Ermolin, Intel Solutions Architect

BigDL is a distributed deep learning library developed and open-sourced by Intel Corp to bring native deep learning support to Apache Spark. By leveraging the distributed execution capabilities in Apache Spark, BigDL can help you take advantage  of large-scale distributed training in deep learning.

BigDL is implemented as a library on top of Apache Spark, as shown below. It can be seamlessly integrated with other Spark libraries (e.g., Spark SQL and Dataframes, Spark ML pipelines, Spark Streaming, Structured Streaming). With BigDL, users can write their deep learning applications as standard Spark programs in either Scala or Python and directly run them on top of Cloud Dataproc clusters.

In this blog post, we’ll show you how to train models on Cloud Dataproc using BigDL’s Scala and Python APIs in Apache Zeppelin.

Deploying BigDL on Cloud Dataproc

You can use use the BigDL and Apache Zeppelin initialization actions to create a new Cloud Dataproc cluster with BigDL and Apache Zeppelin pre-installed. In order to do so, run the commands using gcloud, a command line utility included in the Google Cloud SDK.

Note: the below instructions have been tested on Linux and Mac OS. There are some known issues with ‘ssh’ command-line options on Windows version of Google Cloud SDK.
gcloud dataproc clusters create $CLUSTER_NAME 

By default, the BigDL initialization action downloads the latest compatible BigDL release (BigDL 0.4.0, Spark 2.2.0, and Scala 2.11.8 at the time of this publication). To download a different version of BigDL or one targeted to a different version of Spark/Scala, refer to the instructions in initialization action’s README.

Now you can run existing BigDL examples or develop your own deep learning applications on Cloud Dataproc! More information can be found on how to run BigDL on Cloud Dataproc and in the BigDL documentation.

Prepare MNIST data

Before we connect to Zeppelin, let’s download the MNIST dataset locally. MNIST is a small dataset of handwritten digits that is popular for machine learning examples, mainly because it requires minimal preprocessing and formatting.

mkdir mnist
cd mnist
gunzip *

Then use gsutil to upload the files to your Cloud Storage bucket. gsutil is another command line utility that comes with the Cloud SDK.

gsutil mb $BUCKET # Create bucket if necessary
gsutil cp * $BUCKET/mnist

Connect to Apache Zeppelin

In one terminal, create an SSH tunnel so you can connect to the notebook in a browser. This command will hang without printing any output.

If your corporate firewall blocks outbound ssh, you will need to disconnect from your corporate network to run the following commands.

gcloud compute ssh --ssh-flag="-nND 1080" $CLUSTER_NAME-m

In another terminal, run a browser that uses the proxy server you just created. The instructions vary by operating system:


/Applications/Google\\ Chrome --proxy-server="socks5://localhost:1080" --host-resolver-rules="MAP * , EXCLUDE localhost" --user-data-dir=/tmp


/usr/bin/google-chrome --proxy-server="socks5://localhost:1080" \
--host-resolver-rules="MAP * , EXCLUDE localhost" \


C:\Program Files (x86)\Google\Chrome\Application\chrome.exe --proxy-server="socks5://localhost:1080" --host-resolver-rules="MAP * , EXCLUDE localhost" --user-data-dir=/tmp

Now you can connect to Zeppelin on your cluster by typing “localhost:8080” in the new browser window.

Click “Create new note” to get started. All the code in this blog should be run in the note you create.

Lenet example with BigDL Scala API

We will build a Lenet5 model to classify the MNIST digits.

1. BigDL initialization

Let's start our experiment by importing some necessary packages and initializing the engine.

import{BytesToGreyImg, GreyImgNormalizer, GreyImgToBatch}
import{ClassNLLCriterion, Module}
import{Engine, LoggerFilter, T, Table}

2. Set hyperparameters

You may need to tune these hyperparameters depending on your dataset and cluster size.

val lr = 0.05
val lrDecay = 0.0
val batchSize = 12
val maxEpoch = 5

3. Load MNIST dataset

Read the binary file into a ByteRecord, which contains a byte array to represent the features and a label representing which digit (0-9) the image contains. You can read about the MNIST feature and label file formats here.

import java.nio.ByteBuffer
import org.apache.spark.rdd.RDD
def preprocess(feature: Array[Byte], label: Array[Byte]): Array[ByteRecord] = {
  val featureBuffer = ByteBuffer.wrap(feature)
  val labelBuffer = ByteBuffer.wrap(label)
  val labelMagicNumber = labelBuffer.getInt()
  require(labelMagicNumber == 2049)
  val featureMagicNumber = featureBuffer.getInt()
  require(featureMagicNumber == 2051)
  val labelCount = labelBuffer.getInt()
  val featureCount = featureBuffer.getInt()
  require(labelCount == featureCount)
  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()
  val result = new Array[ByteRecord](featureCount)
  var i = 0
  while (i < featureCount) {
    val img = new Array[Byte]((rowNum * colNum))
    var y = 0
    while (y < rowNum) {
      var x = 0
      while (x < colNum) {
  img(x + y * colNum) = featureBuffer.get()
  x += 1
      y += 1
    result(i) = ByteRecord(img, labelBuffer.get().toFloat + 1.0f)
    i += 1

def loadBinaryFile(filePath: String, labelPath: String): RDD[ByteRecord] = {
    val img = sc.binaryFiles(filePath).map{data => data._2.toArray}
    val label = sc.binaryFiles(labelPath).map{data => data._2.toArray}
    val imgLabel ={iter =>
        //there is only one element in the iterator as only one file is read
        val zipData =
        val byteRecords = preprocess(zipData._1, zipData._2)

Make sure to replace “YOUR-BUCKET” with the name of the Cloud Storage bucket containing your MNIST data.
Pre-processes the data by normalizing the image pixel values for better neural network performance.

val bucket = "YOUR-BUCKET"
val trainData = s"gs://$bucket/mnist/train-images-idx3-ubyte"
val trainLabel = s"gs://$bucket/mnist/train-labels-idx1-ubyte"
val validationData = s"gs://$bucket/mnist/t10k-images-idx3-ubyte"
val validationLabel = s"gs://$bucket/mnist/t10k-labels-idx1-ubyte"
 * Several transformers are used to preprocess the image data.
 * BytesToGreyImg is used to convert Bytes to Grey image.
 * GreyImgNormalizer will normalize a grey image.
 * GreyImgToBatch is used to convert a batch of labeled grey images into a   
 * Mini-batch.
val trainSet =  DataSet.rdd(loadBinaryFile(trainData, trainLabel)) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(batchSize)
val validationSet = DataSet.rdd(loadBinaryFile(validationData, validationLabel)) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(batchSize)

More information about BigDL’s `DataSet` class can be found in the API documentation.

4. Set up the model

val model = LeNet5(classNum = 10)

5. Train the model

Run an Optimizer that minimizes our model’s loss on the training data. It will take around ten minutes to finish on a default two-worker cluster.

val optimizer = Optimizer(
  model = model,
  dataset = trainSet,
  criterion = ClassNLLCriterion[Float]())
val optimMethod = new SGD[Float](learningRate = lr,
  learningRateDecay = lrDecay)
val trainedModel = optimizer
          trigger = Trigger.everyEpoch,
    dataset = validationSet,
    vMethods = Array(new Top1Accuracy, new Top5Accuracy[Float], new Loss[Float]))

6. Test the model

Let’s use a Validator and the trainedModel to test the accuracy of the model on validation dataset.

val validator = Validator(trainedModel, validationSet)
val result = validator.test(Array(new Top1Accuracy[Float]))
result.foreach(r => {
    println(s"${r._2} is ${r._1}")

You will achieve a respectable 99% accuracy.


Autoencoder example with BigDL Python API

In this tutorial, we are going to use an autoencoder to learn how to compress handwritten digit images from the MNIST dataset into a lower dimensional representation. Then, we’ll reconstruct the original image from the representation.

The autoencoder model attempts to minimize differences between an original input image and its reconstructed form. While the conversion is fairly lossy, it is still good enough for some practical applications, such as data denoising and dimensionality reduction for data visualization.

1. BigDL initialization

Again, let's start by importing the necessary packages and initializing the engine:

import numpy as np
import datetime as dt
import matplotlib.pyplot as plt

from bigdl.nn.layer import Sequential, Linear, ReLU, Sigmoid
from bigdl.nn.criterion import MSECriterion
from bigdl.optim.optimizer import Optimizer, Adam, MaxEpoch
from bigdl.util.common import *

from bigdl.dataset import mnist
from matplotlib.pyplot import imshow

2. Load MNIST Dataset

Define the get_mnist method. It will convert the image data into Spark RDD and normalize the image pixel value. 

import numpy as np
from bigdl.util import common
from bigdl.dataset import mnist
def get_mnist(sc, images, labels, std, mean):
    rdd_train_images = sc.parallelize(images)
    rdd_train_labels = sc.parallelize(labels)

    # zip features and label into a sample, normalize the image value
    rdd_train_sample = (features, label): common.Sample.from_ndarray(
  (features - mean) / std,
  label + 1))
return rdd_train_sample

Similar to the Scala example, we need to normalize the image pixel values for better neural network performance.


# Get and store MNIST into RDD of Sample
mnist_path = "datasets/mnist"
(train_images, train_labels) = mnist.read_data_sets(mnist_path, "train")
(test_images, test_labels) = mnist.read_data_sets(mnist_path, "test")

training_mean = np.mean(train_images)
training_std = np.std(train_images)
train_data = get_mnist(sc, train_images, train_labels, training_mean, training_std)test_data = get_mnist(sc, test_images, test_labels, training_mean, training_std)
train_data = sample:
      Sample.from_ndarray(np.resize(sample.features[0].to_ndarray(), (28*28,)), np.resize(sample.features[0].to_ndarray(), (28*28,))))
test_data = sample:
      Sample.from_ndarray(np.resize(sample.features[0].to_ndarray(), (28*28,)), np.resize(sample.features[0].to_ndarray(), (28*28,))))

3. Model Setup

This is a very simple model with one hidden layer in the encoder. The size of the hidden layer (32) is the number of features in the compressed image. Since our input image had 784 features, the compressed images are about 4% of their original size.

Note that we can use such a simple model because the MNIST dataset is very simple.

# Hyperparameters
training_epochs = 10
batch_size = 120
display_step = 1
# Network Parameters
n_hidden = 32
n_input = 784 # MNIST data input (img shape: 28*28)

def build_autoencoder(n_input, n_hidden):
    # Initialize a sequential container
    model = Sequential()
    # encoder
    model.add(Linear(n_input, n_hidden))
    # decoder
    model.add(Linear(n_hidden, n_input))
    return model
model = build_autoencoder(n_input, n_hidden)

4. Train the model

Run an optimizer that minimizes the differences between the input and reconstructed images. This will take several minutes.

optimizer = Optimizer(
trained_model = optimizer.optimize()
print "Optimization Done."

5. Prediction on Test Dataset

We are going to use 10 examples to demonstrate that our trained autoencoder produces reasonable reconstructed images. We will compress and reconstruct the original images, then take 10 to compare with the original inputs.

examples_to_show = 10
examples = trained_model.predict(test_data).take(examples_to_show)
f, a = plt.subplots(2, examples_to_show, figsize=(examples_to_show, 2))
for i in range(examples_to_show):
    a[0][i].imshow(np.reshape(test_images[i], (28, 28)))
    a[1][i].imshow(np.reshape(examples[i], (28, 28)))

You will see that the dataset has been reasonably reconstructed. The first line contains 10 of the original images, and the second line contains their reconstructed versions.

In this blog post, we demonstrated how users can easily build deep learning applications in a distributed fashion on Cloud Dataproc using familiar tools such as Spark and Zeppelin. The high scalability, high performance, and ease of use of BigDL make it easy to analyze large datasets using deep learning.

To learn more about the BigDL project, please check out the BigDL website, and for some interesting applications, please check out these BigDL talks. Lastly, we’ve made more tutorials available here.

  • Big Data Solutions

  • Product deep dives, technical comparisons, how-to's and tips and tricks for using the latest data processing and machine learning technologies.

  • Learn More

12 Months FREE TRIAL

Try BigQuery, Machine Learning and other cloud products and get $300 free credit to spend over 12 months.