AI & Machine Learning

NVIDIA’s RAPIDS joins our set of Deep Learning VM images for faster data science

nvidia.png

If you’re a data scientist, researcher, engineer, or developer, you may be familiar with Google Cloud’s set of Deep Learning Virtual Machine (VM) images, which enable the one-click setup machine learning-focused development environments. But some data scientists still use a combination of pandas, Dask, scikit-learn, and Spark on traditional CPU-based instances. If you’d like to speed up your end-to-end pipeline through scale, Google Cloud’s Deep Learning VMs now include an experimental image with RAPIDS, NVIDIA’s open source and Python-based GPU-accelerated data processing and machine learning libraries that are a key part of NVIDIA’s larger collection of CUDA-X AI accelerated software. CUDA-X AI is the collection of NVIDIA's GPU acceleration libraries to accelerate deep learning, machine learning, and data analysis.

The Deep Learning VM Images comprise a set of Debian 9-based Compute Engine virtual machine disk images optimized for data science and machine learning tasks. All images include common machine learning (typically deep learning, specifically) frameworks and tools installed from first boot, and can be used out of the box on instances with GPUs to accelerate your data processing tasks. In this blog post you’ll learn to use a Deep Learning VM which includes GPU-accelerated RAPIDS libraries.

RAPIDS is an open-source suite of data processing and machine learning libraries developed by NVIDIA that enables GPU-acceleration for data science workflows. RAPIDS relies on NVIDIA’s CUDA language allowing users to leverage GPU processing and high-bandwidth GPU memory through user-friendly Python interfaces. It includes the DataFrame API based on Apache Arrow data structures called cuDF, which will be familiar to users of pandas. It also includes cuML, a growing library of GPU-accelerated ML algorithms that will be familiar to users of scikit-learn. Together, these libraries provide an accelerated solution for ML practitioners to use requiring only  minimal code changes and no new tools to learn.

RAPIDS is available as a conda or pip package, in a Docker image, and as source code.

Using the RAPIDS Google Cloud Deep Learning VM image automatically initializes a Compute Engine instance with all the pre-installed packages required to run RAPIDS. No extra steps required!

rapids.png

Creating a new RAPIDS virtual machine instance

Compute Engine offers predefined machine types that you can use when you create an instance. Each predefined machine type includes a preset number of vCPUs and amount of memory, and bills you at a fixed rate, as described on the pricing page.

If predefined machine types do not meet your needs, you can create an instance with a custom virtualized hardware configuration. Specifically, you can create an instance with a custom number of vCPUs and amount of memory, effectively using a custom machine type.

In this case, we’ll create a custom Deep Learning VM image with 48 vCPUs, extended memory of 384 GB, 4 NVIDIA Tesla T4 GPUs and RAPIDS support.

  export IMAGE_FAMILY="rapids-latest-gpu-experimental"
export ZONE="us-central1-b"
export INSTANCE_NAME="rapids-instance"
export INSTANCE_TYPE="custom-48-393216-ext"
gcloud compute instances create $INSTANCE_NAME \
        --zone=$ZONE \
        --image-family=$IMAGE_FAMILY \
        --image-project=deeplearning-platform-release \
        --maintenance-policy=TERMINATE \
        --accelerator='type=nvidia-tesla-t4,count=4' \
        --machine-type=$INSTANCE_TYPE \
        --boot-disk-size=1TB \
        --scopes=https://www.googleapis.com/auth/cloud-platform \
        --metadata='install-nvidia-driver=True,proxy-mode=project_editors'

Notes:

  • You can create this instance in any available zone that supports T4 GPUs.
  • The option install-nvidia-driver=True Installs NVIDIA GPU driver automatically.
  • The option proxy-mode=project_editors makes the VM visible in the Notebook Instances section.
  • To define extended memory, use 1024*X where X is the number of GB required for RAM.

Using RAPIDS

To put the RAPIDS through its paces on Google Cloud Platform (GCP), we focused on a common HPC workload: a parallel sum reduction test. This test can operate on very large problems (the default size is 2TB) using distributed memory and parallel task processing.

There are several applications that require the computation of parallel sum reductions in high performance computing (HPC). Some examples include:

It turns out that parallel sum reduction is useful for the data science community at large. To manage the deluge of big data, a parallel programming model called “MapReduce” is used for processing data using distributed clusters. The “Map” portion of this model supports sorting: for example, sorting products into queues. Once the model maps the data, it then summarizes the output with the “Reduce” algorithm—for example, count the number of products in each queue. A summation operation is the most compute-heavy step, and given the scale of data that the model is processing, these sum operations must be carried out using parallel distributed clusters in order to complete in a reasonable amount of time.

But certain reduction sum operations contain dependencies that inhibit parallelization. To illustrate such a dependency, suppose we want to add a series of numbers as shown in Figure 1.

Screen Shot 2019-03-15 at 10.21.24 AM.png

From the figure 1 on the left, we must first add 7 + 6 to obtain 13, before we can add 13 + 14 to obtain 27 and so on in a sequential fashion. These dependencies inhibit parallelization. However, since addition is associative, the summation can be expressed as a tree (figure 2 on the right). The benefit of this tree representation is that the dependency chain is shallow, and since the root node summarizes its leaves, this calculation can be split into independent tasks.

Speaking of tasks, this brings us to the Python package Dask, a popular distributed computing framework. With Dask, data scientists and researchers can use Python to express their problems as tasks. Dask then distributes these tasks across processing elements within a single system, or across a cluster of systems. The RAPIDS team recently integrated GPU support into a package called dask-cuda. When you import both dask-cuda and another package called CuPY, which allows data to be allocated on GPUs using familiar numpy constructs, you can really explore the full breadths of models you can build with your data set. To illustrate, shown in Figures 3 and 4 show side-by-side comparisons of the same test run. On the left, 48 cores of a single system are used to process 2 terabytes (TB) of randomly initialized data using 48 Dask workers. On the right, 4 Dask workers process the same 2 TB of data, but dask-cuda is used to automatically associate those workers with 4 Tesla T4 GPUs installed in the same system.

Running RAPIDS

To test parallel sum-reduction, perform the following steps:

1. SSH into the instance. See Connecting to Instances for more details.

2. Download the code required from this repository and upload it to your Deep Learning Virtual Machine Compute Engine instance. These two files are of particular importance as you profile performance:

  • run.sh helper bash shell script
  • sum.py summation Python script

You can find the sample code to run these tests, based on this blog, GPU Dask Arrays, below.

3. Run the tests:

Run test on the instance’s CPU complex, in this case specifying 48 vCPUs (indicated by the -c flag):

  time ./run.sh -c 48

Using CPUs and Local Dask
Allocating and initializing arrays using CPU memory
Array size: 2.00 TB.  Computing parallel sum . . .
Processing complete.
Wall time create data + computation time: 695.50063515 seconds

real    11m 45.523s
user    0m 52.720s
sys     0m 10.100s

Now, run the test using 4 (indicated by the -g flag) NVIDIA Tesla T4 GPUs:

  time ./run.sh -g 4

Using GPUs and Local Dask
Allocating and initializing arrays using GPU memory with CuPY
Array size: 2.00 TB. Computing parallel sum . . .
Processing complete.
Wall time create data + computation time: 57.94356680 seconds

real 1m 13.680s
user 0m 9.856s
sys  0m 13.832s
image5.gif
Figure 3.c: CPU-based solution. Figure 4.d: GPU-based solution
Screen Shot 2019-03-15 at 10.29.54 AM.png

Here are some initial conclusions we derived from these tests:

  • Processing 2 TB of data on GPUs is much faster (an ~12x speed-up for this test)
  • Using Dask’s dashboard, you can visualize the performance of the reduction sum as it is executing
  • CPU cores are fully occupied during processing on CPUs, but the GPUs are not fully utilized
  • You can also run this test in a distributed environment
  import argparse
import subprocess
import sys
import time
import cupy

import dask.array as da
from dask.distributed import Client, LocalCluster, wait
from dask_cuda import LocalCUDACluster

def create_data(rs, xdim, ydim, x_chunk_size, y_chunk_size):
    x = rs.normal(10, 1, size=(xdim, ydim), 
        chunks=(x_chunk_size, y_chunk_size))
    return x

def run(data):
    (data + 1)[::2, ::2].sum().compute()
    return

def get_scheduler_info():
    scheduler_ip =  subprocess.check_output(['hostname','--all-ip-addresses'])
    scheduler_ip = scheduler_ip.decode('UTF-8').split()[0]
    scheduler_port = '8786'
    scheduler_uri = '{}:{}'.format(scheduler_ip, scheduler_port)
    return scheduler_ip, scheduler_uri

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--xdim', type=int, default=500000)
    parser.add_argument('--ydim', type=int, default=500000)
    parser.add_argument('--x_chunk_size', type=int, default=10000)
    parser.add_argument('--y_chunk_size', type=int, default=10000)
    parser.add_argument('--use_gpus_only', action="store_true")
    parser.add_argument('--n_gpus', type=int, default=1)
    parser.add_argument('--use_cpus_only', action="store_true")
    parser.add_argument('--n_sockets', type=int, default=1)
    parser.add_argument('--n_cores_per_socket', type=int, default=1)
    parser.add_argument('--use_dist_dask', action="store_true")
    args = parser.parse_args()

    sched_ip, sched_uri = get_scheduler_info()

    if args.use_dist_dask:
        print('Using Distributed Dask')
        client = Client(sched_uri)

    elif args.use_gpus_only:
        print('Using GPUs and Local Dask')
        cluster = LocalCUDACluster(ip=sched_ip,n_workers=args.n_gpus)
        client = Client(cluster)

    elif args.use_cpus_only:
        print('Using CPUs and Local Dask')
        cluster = LocalCluster(ip=sched_ip, 
                     n_workers=args.n_sockets, 
                     threads_per_worker=args.n_cores_per_socket)
        client = Client(cluster)
        
    start = time.time()
    if args.use_gpus_only:
        print('Allocating arrays using GPU memory with CuPY')
        rs=da.random.RandomState(RandomState=cupy.random.RandomState)
    elif args.use_cpus_only:
        print('Allocating arrays using CPU memory')
        rs = da.random.RandomState()

    x = create_data(rs, args.xdim, args.ydim, 
                        args.x_chunk_size, args.y_chunk_size)
    print('Array size: {:.2f}TB. Computing...'.format(x.nbytes/1e12))
    run(x)
    end = time.time()
    
    delta = (end - start)
    print('Processing complete.')
    print('Wall time: {:10.8f} seconds'.format(delta))

    del x

if __name__ == '__main__':
    main()

In this example, we allocate Python arrays using the double data type by default. Since this code allocates an array size of (500K x 500K) elements, this represents 2 TB  (500K × 500K × 8 bytes / word). Dask initializes these array elements randomly via normal Gaussian distribution using the dask.array package.

Running RAPIDS on a distributed cluster

You can also run RAPIDS in a distributed environment using multiple Compute Engine instances. You can use the same code to run RAPIDS in a distributed way with minimal modification and still decrease the processing time. If you want to explore RAPIDS in a distributed environment please follow the complete guide here.

  time ./run.sh -g 20 -d
Using Distributed Dask
Allocating and initializing arrays using GPU memory with CuPY
Array size: 2.00 TB.  Computing parallel sum . . .
Processing complete.
Wall time create data + computation time: 11.63004732 seconds

real    0m 12.465s
user    0m 1.432s
sys     0m 0.324s
image3.gif
Screen Shot 2019-03-15 at 10.31.47 AM.png

Conclusion

As you can see from the above example, the RAPIDS VM Image can dramatically speed up your ML workflows. Running RAPIDS with Dask lets you seamlessly integrate your data science environment with Python and its myriad libraries and wheels, HPC schedulers such as SLURM, PBS, SGE, and LSF, and open-source infrastructure orchestration projects such as Kubernetes and YARN. Dask also helps you develop your model once, and adaptably run it on either a single system, or scale it out across a cluster. You can then dynamically adjust your resource usage based on computational demands. Lastly, Dask helps you ensure that you’re maximizing uptime, through fault tolerance capabilities intrinsic in failover-capable cluster computing.

It’s also easy to deploy on Google’s Compute Engine distributed environment. If you’re eager to learn more, check out the RAPIDS project and open-source community website, or review the RAPIDS VM Image documentation.

Acknowledgements: Ty McKercher, NVIDIA, Principal Solution Architect; Vartika Singh, NVIDIA, Solution Architect; Gonzalo Gasca Meza, Google, Developer Programs Engineer; Viacheslav Kovalevskyi, Google, Software Engineer