从 Estimator API 迁移到 TPUEstimator API
本教程介绍如何将使用 Estimator API 的模型程序转换为使用 TPUEstimator API 的模型程序。
警告 只有 Tensorflow 1.x 支持 TPUEstimator API。如果您要使用 Tensorflow 2.x 编写模型,请改用 Keras。
概览
使用 TPUEstimator API 的模型程序可以充分利用张量处理单元 (TPU),同时保持与 CPU 和 GPU 兼容。
完成本教程后,您将了解:
- 如何将代码从使用 Estimator API 转换为使用 TPUEstimator API
- 如何在 Cloud TPU 上运行预测
准备工作
在开始学习本教程之前,请检查您的 Google Cloud 项目是否已正确设置。
本演示使用 Google Cloud 的收费组件。请查看 Cloud TPU 价格页面估算您的费用。请务必在使用完您创建的资源以后清理这些资源,以免产生不必要的费用。
设置资源
本部分介绍如何为教程设置 Cloud Storage 存储空间、虚拟机和 Cloud TPU 资源。
创建 Cloud Storage 存储桶
您需要一个 Cloud Storage 存储桶来存储模型的训练数据和训练结果。本教程中使用的 gcloud
命令会为 Cloud TPU 服务帐号设置默认权限。如果您需要更精细的权限,请查看访问级层权限。
存储桶位置必须与虚拟机 (VM) 和 TPU 节点位于同一区域。虚拟机和 TPU 节点位于特定地区,即区域内的细分。
转到 Google Cloud 控制台上的 Cloud Storage 页面。
创建一个新的存储桶,并指定以下选项:
- 您选择的唯一名称。
- 为“位置类型”选择
Region
,为“位置(地区)”选择us-central1
- 默认存储类别:
Standard
- 位置:在计划创建 TPU 节点的区域指定存储桶位置。请参阅 TPU 类型和地区,了解可以使用各种 TPU 类型的位置。
创建 TPU 和虚拟机
TPU 资源由具有相同名称的虚拟机 (VM) 和 Cloud TPU 组成。这些资源必须与您刚创建的存储桶位于同一区域/可用区中。
您可以使用 gcloud
命令或通过 Cloud 控制台设置虚拟机和 TPU 资源。如需详细了解如何管理 TPU 资源,请参阅创建和删除 TPU。
打开一个 Cloud Shell 窗口。
配置
gcloud
以使用您的项目。 要在其中创建 Cloud TPU 的项目。$ gcloud config set project your-project
使用
gcloud
命令启动 Compute Engine 虚拟机和 Cloud TPU。$ gcloud compute tpus execution-groups create \ --name=tpu-name \ --zone=europe-west4-a \ --tf-version=2.12.0 \ --machine-type=n1-standard-1 \ --accelerator-type=v3-8
命令标志说明
如需详细了解
gcloud
命令,请参阅 gcloud 参考文档。当
gcloud compute tpus execution-groups
命令执行完毕后,验证 shell 提示符已从username@projectname
更改为username@vm-name
。此变化表明您现已登录 Compute Engine 虚拟机。gcloud compute ssh tpu-name --zone=europe-west4-a
在您继续按照这些说明操作时,请在虚拟机会话窗口中运行以 (vm)$
开头的每个命令。
安装 pandas
输入以下命令可以安装或升级 pandas:
pip install pandas
定义超参数
在此代码部分中,您将添加 TPU 所需的一些超参数。您可以将这些超参数作为标志添加到训练脚本中,以便在运行时更改它们。
可以添加以下参数:
- tpu。此参数标识要在其上运行模型的 TPU 节点的名称或 IP 地址。
- model_dir。保存模型检查点的路径。此路径必须是 Cloud Storage 存储桶。
- iterations。每个训练循环的迭代次数。
- use_tpu。根据可用性指定在 TPU 还是 GPU/CPU 上运行模型。
Estimator API
# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
default=50,
help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
default=1000,
help="Total number of training steps.")
FLAGS = tf.flags.FLAGS
TPUEstimator API
# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
"tpu", default=None,
help="The Cloud TPU to use for training. This should be the name used when "
"creating the Cloud TPU. To find out the name of TPU, either use command "
"'gcloud compute tpus list --zone=<zone-name>', or use "
"'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")
# Model specific parameters
tf.flags.DEFINE_string(
"model_dir", default="",
help="This should be the path of storage bucket which will be used as "
"model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
"batch_size", default=128,
help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
"train_steps", default=1000,
help="Total number of training steps.")
tf.flags.DEFINE_integer(
"eval_steps", default=4,
help="Total number of evaluation steps. If `0`, evaluation "
"after training is skipped.")
# TPU specific parameters.
tf.flags.DEFINE_bool(
"use_tpu", default=True,
help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
"iterations", default=500,
help="Number of iterations per TPU training loop.")
加载数据
此代码部分指定如何读取和加载数据。
TPU 支持以下数据类型:
tf.float32
tf.complex64
tf.int64
tf.bool
tf.bfloat64
Estimator API
def load_data(y_name='Species'):
"""Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
train_path, test_path = maybe_download()
train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
train_x, train_y = train, train.pop(y_name)
test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
test_x, test_y = test, test.pop(y_name)
return (train_x, train_y), (test_x, test_y)
TPUEstimator API
def load_data(y_name='Species'):
"""Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
train_path, test_path = maybe_download()
train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
dtype={'SepalLength': pd.np.float32,
'SepalWidth': pd.np.float32,
'PetalLength': pd.np.float32,
'PetalWidth': pd.np.float32,
'Species': pd.np.int32})
train_x, train_y = train, train.pop(y_name)
test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
dtype={'SepalLength': pd.np.float32,
'SepalWidth': pd.np.float32,
'PetalLength': pd.np.float32,
'PetalWidth': pd.np.float32,
'Species': pd.np.int32})
test_x, test_y = test, test.pop(y_name)
return (train_x, train_y), (test_x, test_y)
定义输入函数
Estimator API 和 TPUEstimator API 之间的主要区别在于输入函数的函数签名。对于 Estimator API,您可以使用任意数量的参数编写输入函数。对于 TPUEstimator API,输入函数只能使用一个参数 params
。此 params
包含 TPUEstimator 对象中的所有键值对,以及 batch_size
等额外的键。
消除这种差异的一种方法是在调用输入函数时使用 lambda 函数。借助 lambda 函数,您只需对现有输入函数进行细微更改即可。
以下部分演示了如何更新输入函数。稍后,您将看到如何使用 lambda 函数转换这些输入函数以使用 TPUEstimator API。
训练输入函数
对于 TPUEstimator API,您的训练输入函数 train_input_fn
必须返回可按 Cloud TPU 核心的数量进行分片的大量输入样本。例如,如果您使用 8 个核心,则每个批量大小都必须可以被 8 整除。
为此,前面的代码使用 dataset.batch(batch_size, drop_remainder=True)
函数。此函数使用 batch_size
参数进行批处理并舍弃余数。
Estimator API
def train_input_fn(features, labels, batch_size):
"""An input function for training"""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(1000).repeat().batch(batch_size)
# Return the dataset.
return dataset
TPUEstimator API
def train_input_fn(features, labels, batch_size):
"""An input function for training."""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.batch(batch_size, drop_remainder=True)
# Return the dataset.
return dataset
评估输入函数
在此步骤中,您将更新评估输入函数 eval_input_fn
,以确保可以按 TPU 核心的数量对输入样本进行分片。为此,请使用 dataset.batch(batch_size, drop_remainder=True)
函数。
Estimator API
def eval_input_fn(features, labels, batch_size):
"""An input function for evaluation or prediction"""
features=dict(features)
if labels is None:
# No labels, use only features.
inputs = features
else:
inputs = (features, labels)
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices(inputs)
# Batch the examples
assert batch_size is not None, "batch_size must not be None"
dataset = dataset.batch(batch_size)
# Return the dataset.
return dataset
TPUEstimator API
def eval_input_fn(features, labels, batch_size):
"""An input function for evaluation."""
features = dict(features)
inputs = (features, labels)
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices(inputs)
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.batch(batch_size, drop_remainder=True)
# Return the dataset.
return dataset
预测输入函数
要在 TPUEstimator 中执行预测,输入数据集必须包含带有附加外部维度 batch_size
的张量。因此,您必须添加将 features
和 batch_size
作为参数的预测输入函数。此函数可使您的输入样本少于 batch_size
。
如果您使用 Estimator API,则预测输入函数是可选的。
Estimator API
预测输入函数对于 Estimator API 来说是可选的,因为评估函数 eval_input_fn 会执行此任务。
TPUEstimator API
def predict_input_fn(features, batch_size):
"""An input function for prediction."""
dataset = tf.data.Dataset.from_tensor_slices(features)
dataset = dataset.batch(batch_size)
return dataset
更新自定义模型函数
您的下一个任务是更新自定义模型函数:
- 替换
tf.estimator.EstimatorSpec
的实例以使用tf.contrib.tpu.TPUEstimatorSpec
。 - 移除
tf.summary
的所有实例。TPUEstimator API 不支持 tensorboard 的自定义摘要。但是,基本摘要会自动记录到模型目录中的事件文件中。 - 使用
tf.contrib.tpu.CrossShardOptimizer
封装优化器。CrossShardOptimizer
使用allreduce
来聚合梯度并将结果播送到每个分片。由于CrossShardOptimizer
与本地训练不兼容,因此您还必须检查use_tpu
标志。
Estimator API
def my_model(features, labels, mode, params):
"""DNN with three hidden layers, and dropout of 0.1 probability."""
# Create three fully connected layers each layer having a dropout
# probability of 0.1.
net = tf.feature_column.input_layer(features, params['feature_columns'])
for units in params['hidden_units']:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
# Compute logits (1 per class).
logits = tf.layers.dense(net, params['n_classes'], activation=None)
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
'class_ids': predicted_classes[:, tf.newaxis],
'probabilities': tf.nn.softmax(logits),
'logits': logits,
}
return tf.estimator.EstimatorSpec(mode, predictions=predictions)
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
logits=logits)
# Compute evaluation metrics.
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predicted_classes,
name='acc_op')
metrics = {'accuracy': accuracy}
tf.summary.scalar('accuracy', accuracy[1])
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode, loss=loss, eval_metric_ops=metrics)
# Create training op.
if mode == tf.estimator.ModeKeys.TRAIN
optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
TPUEstimator API
def my_model(features, labels, mode, params):
"""Deep Neural Network(DNN) model.
This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
10 neurons in each. And number of neurons in the last layer is equal to the
number of output classes. This is a densely connected network where each
neuron of previous layer is connected to each neuron of next layer.
Args:
features: Feature values for input samples.
labels: label/class assigned to the corresponding input sample.
mode: "TRAIN"/"EVAL"/"PREDICT"
params: Dictionary used to pass extra parameters to model function from
the main function.
Returns:
TPUEstimatorSpec object.
"""
# Create three fully connected layers.
net = tf.feature_column.input_layer(features, params["feature_columns"])
for units in params["hidden_units"]:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
# Compute logits (1 per class).
logits = tf.layers.dense(net, params["n_classes"], activation=None)
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
"class_ids": predicted_classes[:, tf.newaxis],
"probabilities": tf.nn.softmax(logits),
"logits": logits,
}
return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
logits=logits)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.contrib.tpu.TPUEstimatorSpec(
mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))
# Create training op.
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
if FLAGS.use_tpu:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)
添加评估指标函数
Estimator API 和 TPUEstimator API 之间的另一区别在于它们处理指标的方式。对于 Estimator API,您可以将指标作为标准字典传递。对于 TPUEstimator API,您必须改为使用函数。
Estimator API
可选。my_model
函数会生成指标。
TPUEstimator API
def metric_fn(labels, logits):
"""Function to return metrics for evaluation."""
predicted_classes = tf.argmax(logits, 1)
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predicted_classes,
name="acc_op")
return {"accuracy": accuracy}
更新主函数
配置 TPU
在此步骤中,您将配置 TPU 集群。
要配置集群,您可以使用分配给超参数的值。如需了解详情,请参阅定义超参数。此外,还必须设置以下值:
allow_soft_placement
。如果设置为“true”,此参数允许 TensorFlow 在 TPU 不可用的情况下使用 GPU 设备。如果 GPU 设备也不可用,则使用 CPU 设备。log_device_placement
。指示 TensorFlow 应记录设备展示位置。
Estimator API
不需要,因为此代码部分仅影响 TPU。
TPUEstimator API
# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
FLAGS.tpu)
run_config = tf.contrib.tpu.RunConfig(
model_dir=FLAGS.model_dir,
cluster=tpu_cluster_resolver,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=True),
tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)
将特定于 TPU 的参数添加到分类器
在此代码部分中,您将更新分类器变量以使用 TPUEstimator 类。此更改要求添加以下参数:
use_tpu
train_batch_size
eval_batch_size
predict_batch_size
config
Estimator API
# Build 2 hidden layer DNN with 10, 10 units respectively.
classifier = tf.estimator.Estimator(
model_fn=my_model,
params={
'feature_columns': my_feature_columns,
# Two hidden layers of 10 nodes each.
'hidden_units': [10, 10],
# The model must choose between 3 classes.
'n_classes': 3,
})
TPUEstimator API
# Build 2 hidden layer DNN with 10, 10 units respectively.
classifier = tf.contrib.tpu.TPUEstimator(
model_fn=my_model,
use_tpu=FLAGS.use_tpu,
train_batch_size=FLAGS.batch_size,
eval_batch_size=FLAGS.batch_size,
predict_batch_size=FLAGS.batch_size,
config=run_config,
params={
# Name of the feature columns in the input data.
"feature_columns": my_feature_columns,
# Two hidden layers of 10 nodes each.
"hidden_units": [10, 10],
# The model must choose between 3 classes.
"n_classes": 3,
"use_tpu": FLAGS.use_tpu,
})
调用训练方法
下一个更改是更新训练方法。请注意使用 lambda 函数来调用 train_input_fn
函数。此方法可让您更轻松地将现有函数用于 TPUEstimator API。
此外,您必须将 steps 参数更改为 max_steps
。在下一部分中,您将略微更改 steps 参数以指定评估步骤数。
Estimator API
# Train the Model.
classifier.train(
input_fn=lambda:iris_data.train_input_fn(
train_x, train_y, FLAGS.batch_size),
steps=FLAGS.train_steps)
TPUEstimator API
# Train the Model.
classifier.train(
input_fn=lambda params: iris_data.train_input_fn(
train_x, train_y, params["batch_size"]),
max_steps=FLAGS.train_steps)
调用评估方法
此更改类似于您对训练方法所做的更改。同样,借助 lambda 函数可更轻松地使用现有评估输入函数。
此外,还必须将 steps
参数更改为从 eval_steps
命令行标志中设置的值。
Estimator API
# Evaluate the model.
eval_result = classifier.evaluate(
input_fn=lambda:iris_data.eval_input_fn(
test_x, test_y, FLAGS.batch_size))
print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))
TPUEstimator API
# Evaluate the model.
eval_result = classifier.evaluate(
input_fn=lambda params: iris_data.eval_input_fn(
test_x, test_y, params["batch_size"]),
steps=FLAGS.eval_steps)
调用预测方法
与训练和评估方法一样,您必须更新预测方法。 同样,借助 lambda 函数可更轻松地使用现有评估输入函数。
Estimator API
# Generate predictions from the model
predictions = classifier.predict(
input_fn=lambda: iris_data.eval_input_fn(
iris_data.PREDICTION_INPUT_DATA,
labels=None,
batch_size=FLAGS.batch_size))
for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')
class_id = pred_dict['class_ids'][0]
probability = pred_dict['probabilities'][class_id]
print(template.format(iris_data.SPECIES[class_id],
100 * probability, expec))
TPUEstimator API
# Generate predictions from the model
predictions = classifier.predict(
input_fn=lambda params: iris_data.predict_input_fn(
iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))
for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")
class_id = pred_dict["class_ids"][0]
probability = pred_dict["probabilities"][class_id]
print(template.format(iris_data.SPECIES[class_id],
100 * probability, expec))
清理
为避免系统因本主题中使用的资源向您的 GCP 帐号收取费用,请执行以下操作:
与 Compute Engine 虚拟机断开连接:
(vm)$ exit
您的提示符现在应为
username@projectname
,表明您位于 Cloud Shell 中。在您的 Cloud Shell 中,使用您在设置 Cloud TPU 时所用的 --zone 标志运行
ctpu delete
,以删除 Compute Engine 虚拟机和 Cloud TPU:$ ctpu delete [optional: --zone]
运行
ctpu status
以确保未分配任何实例,避免产生不必要的 TPU 使用费。删除操作可能需要几分钟时间才能完成。 如下所示的响应表明不再有已分配的实例:$ ctpu status --zone=europe-west4-a
2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a" No instances currently exist. Compute Engine VM: -- Cloud TPU: --
如下所示运行
gsutil
,将 bucket-name 替换为您为本教程创建的 Cloud Storage 存储桶的名称:$ gsutil rm -r gs://bucket-name
后续步骤
如需详细了解 Estimator 和 TPUEstimator API,请参阅以下主题:
- Estimator API
- TPUEstimator API