AI & Machine Learning

TensorFlow Enterprise makes accessing data on Google Cloud faster and easier

TensorFlowEnterprise.jpg

Data is at the heart of all AI initiatives. Put simply, you need to collect and store a lot of it to train a deep learning model, and with the advancements and increased availability of accelerators such as GPUs and Cloud TPUs, the speed of getting the data from its storage location to the training process is increasingly important.

TensorFlow, one of the most popular machine learning frameworks, was open sourced by Google in 2015. Although it caters to every user, those deploying it on Google Cloud can benefit from enterprise-grade support and performance from the creators of TensorFlow. That’s why we recently launched TensorFlow Enterprise—for AI-enabled businesses on Google Cloud.

In this post, we look at the improvements TensorFlow Enterprise offers in terms of accessing data stored on Google Cloud. If you use Cloud Storage for storing training data, jump to the GCS reader improvements section to see how TensorFlow Enterprise doubles data throughput from Cloud Storage. If you use BigQuery to store data, jump to the BigQuery reader section to learn how TensorFlow Enterprise allows you to access BigQuery data directly in TensorFlow with high throughput.

Cloud Storage reader improvements
TensorFlow Enterprise introduces some improvements in the way TensorFlow Dataset reads data from Cloud Storage. To measure the effect of these improvements, we will run the same TensorFlow code with 1.14 and TensorFlow Enterprise and compare the average number of examples per second read from Cloud Storage. The code we run simply reads tfrecord files and prints the number of examples per second:

  import tensorflow.compat.v1 as tf
import time

tf.disable_eager_execution()

def input_fn(data_files_pattern,
             batch_size,
             num_iterations=1):
  filenames = tf.io.gfile.glob(data_files_pattern)
  dataset = tf.data.Dataset.from_tensor_slices(filenames).repeat()
  dataset = dataset.apply(tf.data.experimental.parallel_interleave(
      map_func=tf.data.TFRecordDataset,
      cycle_length=10,
      sloppy=True))
  dataset = dataset.batch(batch_size, drop_remainder=False)
  dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
  return dataset


def run_benchmark(data_files_pattern, num_iterations):
  batch_size = 2048
  dataset = input_fn(
        data_files_pattern=data_files_pattern,
        batch_size=batch_size)
  itr = tf.data.make_one_shot_iterator(dataset)
  size = tf.shape(itr.get_next())[0]
  with tf.Session() as sess:
    size_callable = sess.make_callable(size)
    n = 0
    mini_batch = 100
    for i in range(num_iterations // mini_batch):
      local_start = time.time()
      start_n = n
      for j in range(mini_batch):
        n += size_callable()
      local_end = time.time()
      examples_per_second = (mini_batch * batch_size) / (local_end - local_start)
      print('Processed %d entries in %f seconds. [%f] examples/s' % (
          n - start_n, local_end - local_start, examples_per_second
          ))

run_benchmark('gs://cloud-samples-data/ai-platform/fake_imagenet/train*', 20000)

The output looks something like this:

  Processed 204800 entries in 21.885156 seconds. [9357.941099] examples/s
Processed 204800 entries in 17.571123 seconds. [11655.487321] examples/s
Processed 204800 entries in 16.429514 seconds. [12465.371641] examples/s
...

The dataset we use for this experiment will be the fake imagenet data, a copy of which can be found at gs://cloud-samples-data/ai-platform/fake_imagenet.

We will run the same code on a Compute Engine VM first with TensorFlow 1.14, then with TensorFlow Enterprise and compare the average number of examples per second. In our experiments we ran the code on a VM with 8 CPUs and 64GB of memory, and reading from a regional Cloud Storage bucket in the same region as the VM.

We see significant improvement:

TensorFlow Speeds.png

You can further improve data reading speed from Cloud Storage by adjusting various parameters TensorFlow provides. For example, when we replace the parallel_interleave call above to the following code:

  dataset = dataset.apply(tf.data.experimental.parallel_interleave(
      map_func=tf.data.TFRecordDataset,
      cycle_length=20,
      block_length=1,
      buffer_output_elements=50000,
      prefetch_input_elements=40,
      sloppy=True))

We see further improvement:

TensorFlow Speed Improvements.png

There are other factors that affect reading speed from Cloud Storage. One common mistake is to have too many small tfrecord files on Cloud Storage as opposed to fewer larger ones. To achieve high throughput from TensorFlow reading data from Cloud Storage, you should group the data so that each file is more than 150MB.

BigQuery Reader
TensorFlow Enterprise introduces the BigQuery reader that allows you to read data directly from BigQuery. For example:

  import tensorflow as tf
from tensorflow_io.bigquery import BigQueryClient

import time

PROJECT_ID = "<YOUR-GCP-PROJECT-ID>"
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "samples"
TABLE_ID = "wikipedia"

def run_benchmark(num_iterations):
  batch_size = 2048
  client = BigQueryClient()
  read_session = client.read_session(
      "projects/" + PROJECT_ID,
      DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
      ["title",
       "id",
       "num_characters",
       "language",
       "timestamp",
       "wp_namespace",
       "contributor_username"],
      [tf.string,
       tf.int64,
       tf.int64,
       tf.string,
       tf.int64,
       tf.int64,
       tf.string],
      requested_streams=10
  )

  dataset = read_session.parallel_read_rows(sloppy=True).batch(batch_size)
  itr = dataset.make_one_shot_iterator()

  n = 0
  mini_batch = 100
  for i in range(num_iterations // mini_batch):
    local_start = time.time()
    start_n = n
    for j in range(mini_batch):
      n += batch_size
      batch = itr.get_next()

    local_end = time.time()
    print('Processed %d entries in %f seconds. [%f] examples/s' % (
        n - start_n, local_end - local_start,
        (mini_batch * batch_size) / (local_end - local_start)))

run_benchmark(10000)

The BigQuery reader uses BigQuery’s Storage API for parallelized data access to allow high data throughput.  The output looks like the following:

  Processed 204800 entries in 4.917719 seconds. [41645.322712] examples/s
Processed 204800 entries in 4.066770 seconds. [50359.375164] examples/s
Processed 204800 entries in 4.859606 seconds. [42143.334014] examples/s
...

With the code above, each batch of of examples is structured as a Python OrderedDict whose keys are the column names specified in the read_session call, and whose values are TensorFlow tensors ready to be consumed by a model. For instance:

  OrderedDict([..., 
('id', <tf.Tensor: id=34, shape=(2048,), dtype=int64, numpy=array([504869,   551777, 21024884, ...])>),
...,
('title', <tf.Tensor: id=38, shape=(2048,), dtype=string, numpy=
array(['Touch typing', 'Air ioniser',
       'List of number-one songs on American Top 40 of 2009', ...], dtype=object)>),
...])

Conclusion
The speed of getting data from its storage location to the machine learning training process is increasingly critical to a deep learning model builders’ productivity. TensorFlow Enterprise can help by providing optimized performance and easy access to data sources, and continues to work with users to introduce improvements that make TensorFlow workloads more efficient on GCP.  

For more information, please see our summary blog of Tensorflow Enterprise. To get started, visit TensorFlow Enterprise.