Cloud TPU 上的 TPUEstimator API
本文档介绍如何将 TPUEstimator API 与 Cloud TPU 搭配使用。TPUEstimator 通过处理大量硬件特有的低级细节,简化了在 Cloud TPU 上运行模型的操作。
使用 TPUEstimator 编写的模型适用于 CPU、GPU、单个 TPU 设备和 TPU Pod,通常无需更改代码。此外,TPUEstimator 还可以代表您自动执行某些优化,从而让您更轻松地实现最高性能。
如需从整体上了解机器学习工作负载如何在 TPU 硬件上运行,请参阅系统架构文档。
标准 TensorFlow Estimator API
概括来讲,标准 TensorFlow Estimator API 提供:
Estimator.train()
- 利用指定输入对模型进行固定步数的训练。Estimator.evaluate()
- 在测试集上评估模型。Estimator.predict()
- 使用经过训练的模型运行推断。Estimator.export_savedmodel()
- 导出模型以提供支持。
此外,Estimator
包含训练作业的默认行为,如保存和恢复检查点、为 TensorBoard 创建摘要等。
Estimator
要求您必须编写分别与 TensorFlow 图的模型和输入部分对应的 model_fn
和 input_fn
。
TPUEstimator 编程模型
TPUEstimator
会封装计算 (model_fn
) 并将其分发到所有可用的 Cloud TPU 核心。必须根据批次大小调整学习速率。
input_fn
函数可模拟在远程主机 CPU 上运行的输入流水线。使用tf.data
可对输入操作编程,如程序员指南中所述。每次调用都会将全局批量的输入处理到一个设备上。系统会从params['batch_size']
检索分片批次大小。专业提示:为了获得最佳性能,请返回数据集而不是张量。model_fn
函数可模拟正在复制并分发给 TPU 的计算。计算应该仅包含 Cloud TPU 支持的操作。TensorFlow 操作包含可用操作的列表。
使用 TPUEstimator 的训练示例
以下代码演示了如何使用 TPUEstimator
训练 MNIST:
def model_fn(features, labels, mode, params):
"""A simple CNN."""
del params # unused
input_layer = tf.reshape(features, [-1, 28, 28, 1])
conv1 = tf.layers.conv2d(
inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same",
activation=tf.nn.relu)
pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
conv2 = tf.layers.conv2d(
inputs=pool1, filters=64, kernel_size=[5, 5],
padding="same", activation=tf.nn.relu)
pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])
dense = tf.layers.dense(inputs=pool2_flat, units=128, activation=tf.nn.relu)
dropout = tf.layers.dropout(
inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN)
logits = tf.layers.dense(inputs=dropout, units=10)
onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10)
loss = tf.losses.softmax_cross_entropy(
onehot_labels=onehot_labels, logits=logits)
learning_rate = tf.train.exponential_decay(
FLAGS.learning_rate, tf.train.get_global_step(), 100000, 0.96)
optimizer = tpu_optimizer.CrossShardOptimizer(
tf.train.GradientDescentOptimizer(learning_rate=learning_rate))
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tpu_estimator.TPUEstimatorSpec(mode=mode, loss=loss, train_op=train_op)
def make_input_fn(filename):
"""Returns an `input_fn` for train and eval."""
def input_fn(params):
"""An input_fn to parse 28x28 images from filename using tf.data."""
batch_size = params["batch_size"]
def parser(serialized_example):
"""Parses a single tf.Example into image and label tensors."""
features = tf.parse_single_example(
serialized_example,
features={
"image_raw": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.int64),
})
image = tf.decode_raw(features["image_raw"], tf.uint8)
image.set_shape([28 * 28])
# Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]
image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
label = tf.cast(features["label"], tf.int32)
return image, label
dataset = tf.contrib.data.TFRecordDataset(
filename, buffer_size=FLAGS.dataset_reader_buffer_size)
dataset = dataset.repeat()
dataset = dataset.apply(
tf.contrib.data.map_and_batch(
parser, batch_size=batch_size,
num_parallel_batches=8,
drop_remainder=True))
return dataset
return input_fn
def main(unused_argv):
tf.logging.set_verbosity(tf.logging.INFO)
run_config = tpu_config.RunConfig(
master=FLAGS.master,
model_dir=FLAGS.model_dir,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=True),
tpu_config=tpu_config.TPUConfig(FLAGS.iterations))
estimator = tpu_estimator.TPUEstimator(
model_fn=model_fn,
use_tpu=FLAGS.use_tpu,
train_batch_size=FLAGS.batch_size,
eval_batch_size=FLAGS.batch_size,
config=run_config)
estimator.train(input_fn=make_input_fn(FLAGS.train_file),
max_steps=FLAGS.train_steps)
下一部分介绍上述示例中引入的新概念,以帮助您有效地使用 Cloud TPU。
TPUEstimator 概念
TPUEstimator 使用图内复制方法运行 TensorFlow 程序。图内(单会话)复制不同于通常用于分布式 TensorFlow 的图间(多会话)复制训练。主要区别包括:
在 TPUEstimator 中,TensorFlow 会话发起程序不在本地。您的 Python 程序会创建一个图表,该图表会复制到 Cloud TPU 中的所有核心。典型的配置会将 TensorFlow 会话发起程序设置为第一个工作器。
输入流水线放置在远程主机(而不是本地)上,以确保尽快将训练示例馈送给 Cloud TPU。需要使用数据集 (
tf.data
)。多个 Cloud TPU 工作器同步运行,每个工作器同时执行相同的步骤。
从 TensorFlow Estimator 转换为 TPUEstimator
我们建议您先移植一个小模型并测试其行为。这样做可帮助您巩固 TPUEstimator
的基本概念。当该模型运行时,逐步添加更多功能。
请参阅教程,了解一组示例模型以及使用 Cloud TPU 运行模型的说明。GitHub 上提供了更多模型。
如需将代码从 tf.estimator.Estimator
类转换为使用 tf.contrib.tpu.TPUEstimator
,请更改以下内容:
- 将
tf.estimator.RunConfig
更改为tf.contrib.tpu.RunConfig
。 - 设置
TPUConfig
(tf.contrib.tpu.RunConfig
的一部分)以指定iterations_per_loop
。iterations_per_loop
是要在 Cloud TPU 上针对一次session.run
调用(每个训练循环)运行的迭代次数。
Cloud TPU 会运行训练循环的指定迭代次数,然后再返回到主机。直到所有 Cloud TPU 迭代均已运行完毕,系统才会保存检查点或摘要。
在
model_fn
中,使用tf.contrib.tpu.CrossShardOptimizer
封装您的优化器。例如:optimizer = tf.contrib.tpu.CrossShardOptimizer( tf.train.GradientDescentOptimizer(learning_rate=learning_rate))
将
tf.estimator.Estimator
更改为tf.contrib.tpu.TPUEstimator
。
默认的 RunConfig
每 100 步保存一次 TensorBoard 摘要,每 10 分钟写入一次检查点。
常见问题解答
为什么输入流水线需要 tf.data
?
有两个原因:
您的应用代码在客户端上运行,而 TPU 计算是在
worker
上运行。输入流水线操作必须在远程工作器上运行以获得良好的性能。tf.data
在工作器上运行操作。为了分摊 TPU 启动费用,模型训练步封装在
tf.while_loop
中,其中每个Session.run
运行单个训练循环的多次迭代。目前只有tf.data
可以封装在tf.while_loop
中。
如何分析模型训练性能?
可以使用为 TensorBoard 提供的分析器来分析模型训练性能。