在 Cloud TPU 上使用 TPUEstimator API

本文档介绍如何将 TPUEstimator API 与 Cloud TPU 搭配使用。TPUEstimator 通过处理大量硬件特有的低级细节,简化了在 Cloud TPU 上运行模型的操作。

使用 TPUEstimator 编写的模型可以在 CPU、GPU、单个 TPU 设备和整个 TPU pod 中工作,通常不需要更改代码。此外,TPUEstimator 还可以代表您自动执行某些优化,从而让您更轻松地实现最高性能。

如需从整体上了解机器学习工作负载如何在 TPU 硬件上运行,请参阅系统架构文档。

标准 TensorFlow Estimator API

概括来讲,标准 TensorFlow Estimator API 提供:

  • Estimator.train() - 利用指定输入对模型进行固定步数的训练。
  • Estimator.evaluate() - 在测试集上评估模型。
  • Estimator.predict() - 使用经过训练的模型运行推断。
  • Estimator.export_savedmodel() - 导出模型以提供支持。

此外,Estimator 还包括训练作业共有的默认行为,例如保存和恢复检查点、为 TensorBoard 创建摘要等。

Estimator 要求您必须编写分别与 TensorFlow 图的模型和输入部分对应的 model_fninput_fn

TPUEstimator 编程模型

TPUEstimator 会封装计算 (model_fn) 并将其分发到所有可用的 Cloud TPU 核心。必须根据批次大小调整学习速率。

  • input_fn 函数可模拟在远程主机 CPU 上运行的输入流水线。使用 tf.data 可对输入操作编程,如程序员指南中所述。每次调用都会将全局批量的输入处理到一个设备上。系统会从 params['batch_size'] 检索分片批次大小。专业提示:为了获得最佳性能,请返回数据集而不是张量。

  • model_fn 函数可模拟正在复制并分发给 TPU 的计算。计算应该仅包含 Cloud TPU 支持的操作。TensorFlow 操作包含可用操作的列表。

使用 TPUEstimator 的训练示例

以下代码演示如何使用 TPUEstimator 训练 MNIST 模型:

def model_fn(features, labels, mode, params):
  """A simple CNN."""
  del params  # unused

  input_layer = tf.reshape(features, [-1, 28, 28, 1])
  conv1 = tf.layers.conv2d(
      inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same",
      activation=tf.nn.relu)
  pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
  conv2 = tf.layers.conv2d(
      inputs=pool1, filters=64, kernel_size=[5, 5],
      padding="same", activation=tf.nn.relu)
  pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
  pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])
  dense = tf.layers.dense(inputs=pool2_flat, units=128, activation=tf.nn.relu)
  dropout = tf.layers.dropout(
      inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN)
  logits = tf.layers.dense(inputs=dropout, units=10)
  onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10)

  loss = tf.losses.softmax_cross_entropy(
      onehot_labels=onehot_labels, logits=logits)

  learning_rate = tf.train.exponential_decay(
      FLAGS.learning_rate, tf.train.get_global_step(), 100000, 0.96)

  optimizer = tpu_optimizer.CrossShardOptimizer(
      tf.train.GradientDescentOptimizer(learning_rate=learning_rate))

  train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
  return tpu_estimator.TPUEstimatorSpec(mode=mode, loss=loss, train_op=train_op)

def make_input_fn(filename):
  """Returns an `input_fn` for train and eval."""

  def input_fn(params):
    """An input_fn to parse 28x28 images from filename using tf.data."""
    batch_size = params["batch_size"]

    def parser(serialized_example):
      """Parses a single tf.Example into image and label tensors."""
      features = tf.parse_single_example(
          serialized_example,
          features={
              "image_raw": tf.FixedLenFeature([], tf.string),
              "label": tf.FixedLenFeature([], tf.int64),
          })
      image = tf.decode_raw(features["image_raw"], tf.uint8)
      image.set_shape([28 * 28])
      # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]
      image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
      label = tf.cast(features["label"], tf.int32)
      return image, label

    dataset = tf.contrib.data.TFRecordDataset(
        filename, buffer_size=FLAGS.dataset_reader_buffer_size)
    dataset = dataset.repeat()
    dataset = dataset.apply(
      tf.contrib.data.map_and_batch(
         parser, batch_size=batch_size,
         num_parallel_batches=8,
         drop_remainder=True))
    return dataset

  return input_fn

def main(unused_argv):

  tf.logging.set_verbosity(tf.logging.INFO)

  run_config = tpu_config.RunConfig(
      master=FLAGS.master,
      model_dir=FLAGS.model_dir,
      session_config=tf.ConfigProto(
          allow_soft_placement=True, log_device_placement=True),
      tpu_config=tpu_config.TPUConfig(FLAGS.iterations))

  estimator = tpu_estimator.TPUEstimator(
      model_fn=model_fn,
      use_tpu=FLAGS.use_tpu,
      train_batch_size=FLAGS.batch_size,
      eval_batch_size=FLAGS.batch_size,
      config=run_config)

  estimator.train(input_fn=make_input_fn(FLAGS.train_file),
                  max_steps=FLAGS.train_steps)

下一部分介绍上述示例中引入的新概念,以帮助您有效地使用 Cloud TPU。

TPUEstimator 概念

TPUEstimator 使用图内复制方法运行 TensorFlow 程序。图内(单会话)复制不同于通常用于分布式 TensorFlow 的图间(多会话)复制训练。主要区别包括:

  1. 在 TPUEstimator 中,TensorFlow 会话发起程序不在本地。您的 Python 程序会创建一个图,该图会复制到 Cloud TPU 中的所有核心。典型的配置会将 TensorFlow 会话发起程序设置为第一个工作器。

  2. 输入流水线放置在远程主机(而不是本地)上,以确保可以尽快将训练示例馈送给 Cloud TPU。需要使用数据集 (tf.data)。

  3. 多个 Cloud TPU 工作器同步运行,每个工作器同时执行相同的步骤。

从 TensorFlow Estimator 转换为 TPUEstimator

建议您首先移植一个小型的简单模型,并测试端到端行为。这样做可帮助您巩固 TPUEstimator 的基本概念。当简单模型运行时,逐步添加更多功能。

请参阅相关教程,了解一组示例模型以及使用 Cloud TPU 运行这些模型的说明。GitHub 上提供了更多模型。

如需将代码从 tf.estimator.Estimator 类转换为使用 tf.contrib.tpu.TPUEstimator,请更改以下内容:

  • tf.estimator.RunConfig 更改为 tf.contrib.tpu.RunConfig
  • 设置 TPUConfigtf.contrib.tpu.RunConfig 的一部分)以指定 iterations_per_loopiterations_per_loop 是要在 Cloud TPU 上针对一次 session.run 调用(每个训练循环)运行的迭代次数。

Cloud TPU 会运行训练循环的指定迭代次数,然后再返回到主机。直到所有 Cloud TPU 迭代均已运行完毕,系统才会保存检查点或摘要。

  • model_fn 中,使用 tf.contrib.tpu.CrossShardOptimizer 封装您的优化器。例如:

     optimizer = tf.contrib.tpu.CrossShardOptimizer(
          tf.train.GradientDescentOptimizer(learning_rate=learning_rate))
    
  • tf.estimator.Estimator 更改为 tf.contrib.tpu.TPUEstimator

默认的 RunConfig 每 100 步保存一次 TensorBoard 摘要,每 10 分钟写入一次检查点。

常见问题解答

为什么输入流水线需要 tf.data

有两个原因:

  1. 您的应用代码在客户端上运行,而 TPU 计算是在 worker 上运行。输入流水线操作必须放在远程工作器上,才能获得良好的性能。只有 tf.data(数据集)支持这一点。

  2. 为了分摊 TPU 启动费用,模型训练步封装在 tf.while_loop 中,以确保一个 Session.run 实际上运行单个训练循环的多次迭代。目前,只有 tf.data 可由 tf.while_loop 进行封装。

如何分析模型训练性能?

可以使用为 TensorBoard 提供的分析器来分析模型训练性能。