从 Estimator API 迁移到 TPUEstimator API

本教程介绍如何将使用 Estimator API 的模型程序转换为使用 TPUEstimator API 的模型程序。

警告 只有 Tensorflow 1.x 支持 TPUEstimator API。如果您要使用 Tensorflow 2.x 编写模型,请改用 Keras

概览

使用 TPUEstimator API 的模型程序可以充分利用张量处理单元 (TPU),同时保持与 CPU 和 GPU 兼容。

完成本教程后,您将了解:

  • 如何将代码从使用 Estimator API 转换为使用 TPUEstimator API
  • 如何在 Cloud TPU 上运行预测

准备工作

在开始学习本教程之前,请检查您的 Google Cloud 项目是否已正确设置。

本演示使用 Google Cloud 的收费组件。请查看 Cloud TPU 价格页面估算您的费用。请务必在使用完您创建的资源以后清理这些资源,以免产生不必要的费用。

设置资源

本部分介绍如何为教程设置 Cloud Storage 存储空间、虚拟机和 Cloud TPU 资源。

创建 Cloud Storage 存储桶

您需要一个 Cloud Storage 存储桶来存储模型的训练数据和训练结果。本教程中使用的 gcloud 命令会为 Cloud TPU 服务帐号设置默认权限。如果您需要更精细的权限,请查看访问级层权限

存储桶位置必须与虚拟机 (VM) 和 TPU 节点位于同一区域。虚拟机和 TPU 节点位于特定地区,即区域内的细分。

  1. 转到 Google Cloud 控制台上的 Cloud Storage 页面。

    转到 Cloud Storage 页面

  2. 创建一个新的存储桶,并指定以下选项:

    • 您选择的唯一名称。
    • 为“位置类型”选择 Region,为“位置(地区)”选择 us-central1
    • 默认存储类别:Standard
    • 位置:在计划创建 TPU 节点的区域指定存储桶位置。请参阅 TPU 类型和地区,了解可以使用各种 TPU 类型的位置。

创建 TPU 和虚拟机

TPU 资源由具有相同名称的虚拟机 (VM) 和 Cloud TPU 组成。这些资源必须与您刚创建的存储桶位于同一区域/可用区中

您可以使用 gcloud 命令或通过 Cloud 控制台设置虚拟机和 TPU 资源。如需详细了解如何管理 TPU 资源,请参阅创建和删除 TPU

  1. 打开一个 Cloud Shell 窗口。

    打开 Cloud Shell

  2. 配置 gcloud 以使用您的项目。

    $ gcloud config set project your-project
    
    要在其中创建 Cloud TPU 的项目。

  3. 使用 gcloud 命令启动 Compute Engine 虚拟机和 Cloud TPU。

    $ gcloud compute tpus execution-groups create \
     --name=tpu-name \
     --zone=europe-west4-a \
     --tf-version=2.12.0 \
     --machine-type=n1-standard-1 \
     --accelerator-type=v3-8
    

    命令标志说明

    name
    要创建的 Cloud TPU 的名称。
    zone
    拟在其中创建 Cloud TPU 的可用区
    tf-version
    gcloud 命令在您的虚拟机上安装的 Tensorflow 版本。
    machine-type
    要创建的 Compute Engine 虚拟机的 机器类型
    accelerator-type
    要创建的 Cloud TPU 的类型。

    如需详细了解 gcloud 命令,请参阅 gcloud 参考文档

  4. gcloud compute tpus execution-groups 命令执行完毕后,验证 shell 提示符已从 username@projectname 更改为 username@vm-name。此变化表明您现已登录 Compute Engine 虚拟机。

    gcloud compute ssh tpu-name --zone=europe-west4-a
    

在您继续按照这些说明操作时,请在虚拟机会话窗口中运行以 (vm)$ 开头的每个命令。

安装 pandas

输入以下命令可以安装或升级 pandas:

pip install pandas

定义超参数

在此代码部分中,您将添加 TPU 所需的一些超参数。您可以将这些超参数作为标志添加到训练脚本中,以便在运行时更改它们。

可以添加以下参数:

  • tpu。此参数标识要在其上运行模型的 TPU 节点的名称或 IP 地址。
  • model_dir。保存模型检查点的路径。此路径必须是 Cloud Storage 存储桶。
  • iterations。每个训练循环的迭代次数。
  • use_tpu。根据可用性指定在 TPU 还是 GPU/CPU 上运行模型。

Estimator API

# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
    default=50,
    help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
    default=1000,
    help="Total number of training steps.")
FLAGS = tf.flags.FLAGS

TPUEstimator API

# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
    "tpu", default=None,
    help="The Cloud TPU to use for training. This should be the name used when "
    "creating the Cloud TPU. To find out the name of TPU, either use command "
    "'gcloud compute tpus list --zone=<zone-name>', or use "
    "'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")

# Model specific parameters
tf.flags.DEFINE_string(
    "model_dir", default="",
    help="This should be the path of storage bucket which will be used as "
    "model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
    "batch_size", default=128,
    help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
    "train_steps", default=1000,
    help="Total number of training steps.")
tf.flags.DEFINE_integer(
    "eval_steps", default=4,
    help="Total number of evaluation steps. If `0`, evaluation "
    "after training is skipped.")

# TPU specific parameters.
tf.flags.DEFINE_bool(
    "use_tpu", default=True,
    help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
    "iterations", default=500,
    help="Number of iterations per TPU training loop.")

加载数据

此代码部分指定如何读取和加载数据。

TPU 支持以下数据类型:

  • tf.float32
  • tf.complex64
  • tf.int64
  • tf.bool
  • tf.bfloat64

Estimator API

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

TPUEstimator API

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
                      dtype={'SepalLength': pd.np.float32,
                             'SepalWidth': pd.np.float32,
                             'PetalLength': pd.np.float32,
                             'PetalWidth': pd.np.float32,
                             'Species': pd.np.int32})
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
                     dtype={'SepalLength': pd.np.float32,
                            'SepalWidth': pd.np.float32,
                            'PetalLength': pd.np.float32,
                            'PetalWidth': pd.np.float32,
                            'Species': pd.np.int32})
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

定义输入函数

Estimator API 和 TPUEstimator API 之间的主要区别在于输入函数的函数签名。对于 Estimator API,您可以使用任意数量的参数编写输入函数。对于 TPUEstimator API,输入函数只能使用一个参数 params。此 params 包含 TPUEstimator 对象中的所有键值对,以及 batch_size 等额外的键。

消除这种差异的一种方法是在调用输入函数时使用 lambda 函数。借助 lambda 函数,您只需对现有输入函数进行细微更改即可。

以下部分演示了如何更新输入函数。稍后,您将看到如何使用 lambda 函数转换这些输入函数以使用 TPUEstimator API。

训练输入函数

对于 TPUEstimator API,您的训练输入函数 train_input_fn 必须返回可按 Cloud TPU 核心的数量进行分片的大量输入样本。例如,如果您使用 8 个核心,则每个批量大小都必须可以被 8 整除。

为此,前面的代码使用 dataset.batch(batch_size, drop_remainder=True) 函数。此函数使用 batch_size 参数进行批处理并舍弃余数。

Estimator API

def train_input_fn(features, labels, batch_size):
  """An input function for training"""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat().batch(batch_size)

  # Return the dataset.
  return dataset

TPUEstimator API

def train_input_fn(features, labels, batch_size):
  """An input function for training."""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat()

  dataset = dataset.batch(batch_size, drop_remainder=True)

  # Return the dataset.
  return dataset

评估输入函数

在此步骤中,您将更新评估输入函数 eval_input_fn,以确保可以按 TPU 核心的数量对输入样本进行分片。为此,请使用 dataset.batch(batch_size, drop_remainder=True) 函数。

Estimator API

def eval_input_fn(features, labels, batch_size):
  """An input function for evaluation or prediction"""
  features=dict(features)
  if labels is None:
      # No labels, use only features.
      inputs = features
  else:
      inputs = (features, labels)

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices(inputs)

  # Batch the examples
  assert batch_size is not None, "batch_size must not be None"
  dataset = dataset.batch(batch_size)

  # Return the dataset.
  return dataset

TPUEstimator API

 def eval_input_fn(features, labels, batch_size):
    """An input function for evaluation."""
    features = dict(features)
    inputs = (features, labels)

    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices(inputs)
    dataset = dataset.shuffle(1000).repeat()

    dataset = dataset.batch(batch_size, drop_remainder=True)

    # Return the dataset.
    return dataset

预测输入函数

要在 TPUEstimator 中执行预测,输入数据集必须包含带有附加外部维度 batch_size 的张量。因此,您必须添加将 featuresbatch_size 作为参数的预测输入函数。此函数可使您的输入样本少于 batch_size

如果您使用 Estimator API,则预测输入函数是可选的。

Estimator API

预测输入函数对于 Estimator API 来说是可选的,因为评估函数 eval_input_fn 会执行此任务。

TPUEstimator API

  def predict_input_fn(features, batch_size):
    """An input function for prediction."""

    dataset = tf.data.Dataset.from_tensor_slices(features)
    dataset = dataset.batch(batch_size)
    return dataset

更新自定义模型函数

您的下一个任务是更新自定义模型函数:

  • 替换 tf.estimator.EstimatorSpec 的实例以使用 tf.contrib.tpu.TPUEstimatorSpec
  • 移除 tf.summary 的所有实例。TPUEstimator API 不支持 tensorboard 的自定义摘要。但是,基本摘要会自动记录到模型目录中的事件文件中。
  • 使用 tf.contrib.tpu.CrossShardOptimizer 封装优化器。CrossShardOptimizer 使用 allreduce 来聚合梯度并将结果播送到每个分片。由于 CrossShardOptimizer 与本地训练不兼容,因此您还必须检查 use_tpu 标志。

Estimator API

def my_model(features, labels, mode, params):
  """DNN with three hidden layers, and dropout of 0.1 probability."""

  # Create three fully connected layers each layer having a dropout
  # probability of 0.1.
  net = tf.feature_column.input_layer(features, params['feature_columns'])
  for units in params['hidden_units']:
      net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params['n_classes'], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
      predictions = {
          'class_ids': predicted_classes[:, tf.newaxis],
          'probabilities': tf.nn.softmax(logits),
          'logits': logits,
      }
      return tf.estimator.EstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  # Compute evaluation metrics.
  accuracy = tf.metrics.accuracy(labels=labels,
                                 predictions=predicted_classes,
                                 name='acc_op')
  metrics = {'accuracy': accuracy}
  tf.summary.scalar('accuracy', accuracy[1])
  if mode == tf.estimator.ModeKeys.EVAL:
      return tf.estimator.EstimatorSpec(
          mode, loss=loss, eval_metric_ops=metrics)

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN
      optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
      train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
      return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

TPUEstimator API

def my_model(features, labels, mode, params):
  """Deep Neural Network(DNN) model.

  This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
  10 neurons in each. And number of neurons in the last layer is equal to the
  number of output classes. This is a densely connected network where each
  neuron of previous layer is connected to each neuron of next layer.

  Args:
    features: Feature values for input samples.
    labels: label/class assigned to the corresponding input sample.
    mode: "TRAIN"/"EVAL"/"PREDICT"
    params: Dictionary used to pass extra parameters to model function from
      the main function.

  Returns:
    TPUEstimatorSpec object.

  """

  # Create three fully connected layers.
  net = tf.feature_column.input_layer(features, params["feature_columns"])
  for units in params["hidden_units"]:
    net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params["n_classes"], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        "class_ids": predicted_classes[:, tf.newaxis],
        "probabilities": tf.nn.softmax(logits),
        "logits": logits,
    }
    return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.contrib.tpu.TPUEstimatorSpec(
        mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN:
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    if FLAGS.use_tpu:
      optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

添加评估指标函数

Estimator API 和 TPUEstimator API 之间的另一区别在于它们处理指标的方式。对于 Estimator API,您可以将指标作为标准字典传递。对于 TPUEstimator API,您必须改为使用函数。

Estimator API

可选。my_model 函数会生成指标。

TPUEstimator API

  def metric_fn(labels, logits):
    """Function to return metrics for evaluation."""

    predicted_classes = tf.argmax(logits, 1)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predicted_classes,
                                   name="acc_op")
    return {"accuracy": accuracy}

更新主函数

配置 TPU

在此步骤中,您将配置 TPU 集群。

要配置集群,您可以使用分配给超参数的值。如需了解详情,请参阅定义超参数。此外,还必须设置以下值:

  • allow_soft_placement。如果设置为“true”,此参数允许 TensorFlow 在 TPU 不可用的情况下使用 GPU 设备。如果 GPU 设备也不可用,则使用 CPU 设备。
  • log_device_placement。指示 TensorFlow 应记录设备展示位置。

Estimator API

不需要,因为此代码部分仅影响 TPU。

TPUEstimator API

# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
    FLAGS.tpu)

run_config = tf.contrib.tpu.RunConfig(
    model_dir=FLAGS.model_dir,
    cluster=tpu_cluster_resolver,
    session_config=tf.ConfigProto(
        allow_soft_placement=True, log_device_placement=True),
    tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)

将特定于 TPU 的参数添加到分类器

在此代码部分中,您将更新分类器变量以使用 TPUEstimator 类。此更改要求添加以下参数:

  • use_tpu
  • train_batch_size
  • eval_batch_size
  • predict_batch_size
  • config

Estimator API

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.estimator.Estimator(
      model_fn=my_model,
      params={
          'feature_columns': my_feature_columns,
          # Two hidden layers of 10 nodes each.
          'hidden_units': [10, 10],
          # The model must choose between 3 classes.
          'n_classes': 3,
      })

TPUEstimator API

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.contrib.tpu.TPUEstimator(
      model_fn=my_model,
      use_tpu=FLAGS.use_tpu,
      train_batch_size=FLAGS.batch_size,
      eval_batch_size=FLAGS.batch_size,
      predict_batch_size=FLAGS.batch_size,
      config=run_config,
      params={
          # Name of the feature columns in the input data.
          "feature_columns": my_feature_columns,
          # Two hidden layers of 10 nodes each.
          "hidden_units": [10, 10],
          # The model must choose between 3 classes.
          "n_classes": 3,
          "use_tpu": FLAGS.use_tpu,
      })

调用训练方法

下一个更改是更新训练方法。请注意使用 lambda 函数来调用 train_input_fn 函数。此方法可让您更轻松地将现有函数用于 TPUEstimator API。

此外,您必须将 steps 参数更改为 max_steps。在下一部分中,您将略微更改 steps 参数以指定评估步骤数。

Estimator API

  # Train the Model.
  classifier.train(
      input_fn=lambda:iris_data.train_input_fn(
          train_x, train_y, FLAGS.batch_size),
      steps=FLAGS.train_steps)

TPUEstimator API

  # Train the Model.
  classifier.train(
      input_fn=lambda params: iris_data.train_input_fn(
          train_x, train_y, params["batch_size"]),
      max_steps=FLAGS.train_steps)

调用评估方法

此更改类似于您对训练方法所做的更改。同样,借助 lambda 函数可更轻松地使用现有评估输入函数。

此外,还必须将 steps 参数更改为从 eval_steps 命令行标志中设置的值。

Estimator API

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda:iris_data.eval_input_fn(
          test_x, test_y, FLAGS.batch_size))

  print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))

TPUEstimator API

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda params: iris_data.eval_input_fn(
          test_x, test_y, params["batch_size"]),
      steps=FLAGS.eval_steps)

调用预测方法

与训练和评估方法一样,您必须更新预测方法。 同样,借助 lambda 函数可更轻松地使用现有评估输入函数。

Estimator API

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda: iris_data.eval_input_fn(
          iris_data.PREDICTION_INPUT_DATA,
          labels=None,
          batch_size=FLAGS.batch_size))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
      template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')

      class_id = pred_dict['class_ids'][0]
      probability = pred_dict['probabilities'][class_id]

      print(template.format(iris_data.SPECIES[class_id],
                            100 * probability, expec))

TPUEstimator API

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda params: iris_data.predict_input_fn(
          iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
    template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")

    class_id = pred_dict["class_ids"][0]
    probability = pred_dict["probabilities"][class_id]

    print(template.format(iris_data.SPECIES[class_id],
                          100 * probability, expec))

清理

为避免系统因本主题中使用的资源向您的 GCP 帐号收取费用,请执行以下操作:

  1. 与 Compute Engine 虚拟机断开连接:

    (vm)$ exit
    

    您的提示符现在应为 username@projectname,表明您位于 Cloud Shell 中。

  2. 在您的 Cloud Shell 中,使用您在设置 Cloud TPU 时所用的 --zone 标志运行 ctpu delete,以删除 Compute Engine 虚拟机和 Cloud TPU:

    $ ctpu delete [optional: --zone]
    
  3. 运行 ctpu status 以确保未分配任何实例,避免产生不必要的 TPU 使用费。删除操作可能需要几分钟时间才能完成。 如下所示的响应表明不再有已分配的实例:

    $ ctpu status --zone=europe-west4-a
    
    2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a"
    No instances currently exist.
        Compute Engine VM:     --
        Cloud TPU:             --
    
  4. 如下所示运行 gsutil,将 bucket-name 替换为您为本教程创建的 Cloud Storage 存储桶的名称:

    $ gsutil rm -r gs://bucket-name
    

后续步骤

如需详细了解 Estimator 和 TPUEstimator API,请参阅以下主题: