Developers & Practitioners

Faster machine learning on Dataproc with new initialization action

Apache Hadoop and Apache Spark are established and standard frameworks for distributed storage and data processing. At Google Cloud, we want to make sure data scientists and data engineers have the tools they need to use these frameworks simply, easily, and in a cost-efficient way, which is why we offer Dataproc, our fully managed cloud service for running Apache Spark and Apache Hadoop clusters. For users looking to build machine learning models, you might use Dataproc for preprocessing your data using Apache Spark and then use that same Spark cluster to power your Notebook for machine learning. We created the machine learning initialization action to provide a set of commonly used libraries to reduce the time spent configuring your cluster for machine learning.

In this blog, you will learn how you can get started working with this initialization action on a Dataproc cluster. This will provide you with an environment that lets you leverage the latest and greatest in open source machine learning, including:

Plus, you can augment your experience of using Dataproc with Jupyter and Zeppelin via Optional Components and Component Gateway

The machine learning initialization action relies on using other initialization actions for installing certain components, such as RAPIDS, Dask and GPU drivers. As such, you have access to the full functionality and configurations that these components provide.

Data preprocessing and machine learning training in one place

The machine learning initialization action for Dataproc provides a environment for running production Spark, Dask or other ETL jobs on your data while also being able to build your machine learning models with your choice of machine learning libraries, all in the same place. By adding GPUs to your cluster configuration, you can also decrease the training time for machine learning models with TensorFlow or RAPIDS, as well as use the RAPIDS SQL Accelerator to further improve the efficiencies of model training.

Configure your Google Cloud Platform project

You’ll need a Google Cloud Platform (GCP) project. Use your own or create a new one following the instructions here.

The machine learning initialization action relies on using other initialization actions for parts of its installation. You can make a copy of these scripts to effectively “pin” the versions of the scripts that you’re using. To do so, create a Cloud Storage bucket, and copy the scripts into this bucket.

  BUCKET=<your-bucket-name>
gsutil mb gs://${BUCKET}
gsutil -m rsync -r gs://dataproc-initialization-actions gs://${BUCKET}/initialization-actions

Create a Dataproc cluster with the machine learning initialization action

You’ll now proceed with the creation of your cluster. First, define a region and a cluster name.

  REGION=<region>
CLUSTER_NAME=<cluster_name>

Next, run the following command. This will create a cluster configured with the machine learning initialization action configured. The cluster will contain 1 master node, 2 worker nodes and 2 NVIDIA T4 GPUs available to each node. The cluster’s Jupyter optional component will also be enabled along with component gateway to access the cluster using Jupyter Notebooks or JupyterLab.

  gcloud dataproc clusters create ${CLUSTER_NAME} \
    --region ${REGION} \
    --master-machine-type n1-standard-16 \
    --worker-machine-type n1-highmem-16 \
    --master-accelerator type=nvidia-tesla-t4 \
    --worker-accelerator type=nvidia-tesla-t4 \
    --image-version preview-ubuntu18 \
    --metadata include-gpus=true \
    --metadata gpu-driver-provider=NVIDIA \
    --metadata init-actions-repo=gs://${BUCKET}/dataproc-initialization-actions \
    --optional-components JUPYTER \
    --initialization-actions gs://${BUCKET}/dataproc-initialization-actions/mlvm/mlvm.sh \
    --initialization-action-timeout=45m \
    --enable-component-gateway

The configuration shown above will create a Dataproc cluster equipped with NVIDIA graphics cards and their respective drivers installed. You can then take advantage of GPU-accelerated data processing using frameworks such as NVIDIA RAPIDS or TensorFlow.

In this configuration, we’re including the init-actions-repo metadata flag to tell the machine learning initialization action where to locate the necessary other installation scripts. Additionally, we’re including the include-gpus=true and gpu-driver-provider=NVIDIA flags to tell the script that we want to install GPU drivers and that the drivers should come from NVIDIA. You can optionally run the cluster without any GPUs attached or drivers included.

Alternatively, you can also equip the cluster with NVIDIA RAPIDS Spark Jars, or NVIDIA RAPIDS for Dask. You can do so with the rapids-runtime metadata flag and assign this to be DASK or RAPIDS.

Use the spark-tensorflow-distributor to run distributed TensorFlow jobs

You can run distributed TensorFlow jobs on your Spark cluster with the spark-tensorflow-distributor included in the machine learning initialization action. This library is a wrapper for the TensorFlow distributed library. Copy the following code into a file spark_tf_dist.py.

  from pyspark.sql import SparkSession
from spark_tensorflow_distributor import MirroredStrategyRunner

spark = (SparkSession.builder.appName("tf")
  .config("spark.dynamicAllocation.enabled", "false")
  .config("spark.executor.resource.gpu.amount", "1")
  .config("spark.executor.cores", "15")
  .config("spark.task.cpus", "15")
  .config("spark.task.resource.gpu.amount", "1")
  .config("spark.executor.resource.gpu.discoveryScript", "/usr/lib/spark/scripts/gpu/getGpusResources.sh")
  .config("spark.driver.resource.gpu.discoveryScript", "/usr/lib/spark/scripts/gpu/getGpusResources.sh")
  .config("spark.driver.resource.gpu.amount", "1")         
  .getOrCreate())

def train():
    import tensorflow as tf
    import uuid

    BUFFER_SIZE = 10000
    BATCH_SIZE = 64

    def make_datasets():
        (mnist_images, mnist_labels), _ = \
            tf.keras.datasets.mnist.load_data(path=str(uuid.uuid4())+'mnist.npz')

        dataset = tf.data.Dataset.from_tensor_slices((
            tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
            tf.cast(mnist_labels, tf.int64))
        )
        dataset = dataset.repeat().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
        return dataset

    def build_and_compile_cnn_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax'),
        ])
        model.compile(
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
            metrics=['accuracy'],
        )
        return model

    train_datasets = make_datasets()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    train_datasets = train_datasets.with_options(options)
    multi_worker_model = build_and_compile_cnn_model()
    multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)
    
MirroredStrategyRunner(num_slots=2).run(train)

This code uses the MirroredStrategyRunner to submit a TensorFlow training job to your Spark cluster. The Spark config provided will ensure your cluster is able to best utilize the GPUs on your cluster for training.

You can then submit a TensorFlow code as a PySpark job to your cluster:

  gcloud dataproc jobs submit pyspark \
  --cluster ${CLUSTER_NAME} \
  --region ${REGION} \
  spark_tf_dist.py

Dataproc Hub

The machine learning initialization action is great to use in a notebook environment. One way to do this is using the Jupyter Optional Component with Dataproc, and more information for this can be found in this blog post. Another excellent way is to use Dataproc Hub, Dataproc’s managed JupyterLab service. This service allows IT administrators to provide preconfigured environments optimized for security and resources allocation to their data scientists while giving the data scientists the flexibility to customize the packages and libraries available on the cluster. You can configure your cluster with the machine learning initialization action by following the instructions here and using the following YAML configuration:

  config:
  endpointConfig:
    enableHttpPortAccess: true
  gceClusterConfig:
    metadata:
      gpu-driver-provider: NVIDIA
      include-gpus: 'true'
      init-actions-repo: gs://<YOUR-BUCKET>/dataproc-initialization-actions 
  initializationActions:
  - executableFile: gs://<YOUR-BUCKET>/dataproc-initialization-actions/mlvm/mlvm.sh
    executionTimeout: 2700s
  masterConfig:
    accelerators:
    - acceleratorCount: 1
      acceleratorTypeUri: nvidia-tesla-t4
    machineTypeUri: n1-standard-16
  softwareConfig:
    imageVersion: preview-ubuntu18
    optionalComponents:
    - JUPYTER
  workerConfig:
    accelerators:
    - acceleratorCount: 1
      acceleratorTypeUri: nvidia-tesla-t4
    machineTypeUri: n1-standard-16
    numInstances: 2

For more Information on Dataproc Hub, check out this announcement blog.

Next steps

The machine learning initialization action is a great place to run both your ETL processing jobs as well as train machine learning models. You can also customize your experience with any of our other open source initialization actions. Additionally, you can create a custom Dataproc image for convenience and faster cluster creation times. For more information on getting started with the machine learning initialization action, check out the documentation here.  You can also get started using the machine learning initialization action on Dataproc with Google Cloud Platform’s $300 credit for new customers.