Estimator API から TPUEstimator API への移行

このチュートリアルでは、Estimator API を使用するモデル プログラムを、TPUEstimator API を使用するものに変換する方法について説明します。

警告: TPUEstimator API は、Tensorflow 1.x でのみサポートされています。TensorFlow 2.x でモデルを記述する場合は、代わりに Keras を使用してください。

概要

TPUEstimator API を使用するモデル プログラムは、CPU および GPU との互換性を保ちながら、Tensor Processing Unit(TPU)を最大限に活用できます。

このチュートリアルを終了すると、次のことを習得できます。

  • Estimator API を使用するコードを、TPUEstimator API を使用するコードに変換する方法
  • Cloud TPU で予測を実行する方法

準備

このチュートリアルを開始する前に、Google Cloud プロジェクトが正しく設定されていることを確認します。

このチュートリアルでは、Google Cloud の課金対象となるコンポーネントを使用します。費用を見積もるには、Cloud TPU の料金ページを確認してください。不要な課金を回避するために、このチュートリアルを完了したら、作成したリソースを必ずクリーンアップしてください。

リソースを設定する

このセクションでは、チュートリアルで使用する Cloud Storage のストレージ、VM、Cloud TPU の各リソースを設定する方法を説明します。

Cloud Storage バケットの作成

モデルのトレーニングに使用するデータとトレーニング結果を格納するには、Cloud Storage バケットが必要です。このチュートリアルで使用する gcloud コマンドは、Cloud TPU サービス アカウントのデフォルトの権限を設定します。権限の詳細な設定が必要な場合は、アクセスレベル権限をご覧ください。

バケットのロケーションは、仮想マシン(VM)および TPU ノードと同じリージョンにする必要があります。VM と TPU ノードは、リージョン内のサブディビジョンである特定のゾーンに配置されます。

  1. Google Cloud コンソールの [Cloud Storage] ページに移動します。

    [Cloud Storage] ページに移動

  2. 次のオプションを指定して新しいバケットを作成します。

    • 任意の一意な名前
    • ロケーション タイプに Region、ロケーション(ゾーン)に us-central1 を選択します。
    • デフォルトのストレージ クラス: Standard
    • ロケーション: TPU ノードを作成する予定のリージョンと同じリージョンにバケットのロケーションを指定します。各種の TPU タイプをどこで使用できるかについては、TPU タイプとゾーンをご覧ください。

TPU と VM を作成する

TPU リソースは、同じ名前が設定された仮想マシン(VM)と Cloud TPU で構成されます。これらのリソースは、作成したバケットと同じリージョン / ゾーンに存在する必要があります。

VM リソースと TPU リソースを設定するには、gcloud コマンドまたは Cloud コンソールを使用します。TPU リソースの管理の詳細については、TPU の作成と削除をご覧ください。

  1. Cloud Shell ウィンドウを開きます。

    Cloud Shell を開く

  2. プロジェクトを使用するように gcloud を構成します。

    $ gcloud config set project your-project
    Cloud TPU を作成するプロジェクト。

  3. gcloud コマンドを使用して Compute Engine VM と Cloud TPU を起動します。

    $ gcloud compute tpus execution-groups create \
     --name=tpu-name \
     --zone=europe-west4-a \
     --tf-version=2.12.0 \
     --machine-type=n1-standard-1 \
     --accelerator-type=v3-8

    コマンドフラグの説明

    name
    作成する Cloud TPU の名前。
    zone
    Cloud TPU を作成するゾーン
    tf-version
    gcloud コマンドが VM にインストールする TensorFlow のバージョン。
    machine-type
    作成する Compute Engine VM のマシンタイプ
    accelerator-type
    作成する Cloud TPU のタイプ

    gcloud コマンドの詳細については、gcloud リファレンスをご覧ください。

  4. gcloud compute tpus execution-groups コマンドの実行が終了したら、shell プロンプトが username@projectname から username@vm-name に変更されたことを確認します。変更されていれば、Compute Engine VM にログインしていることになります。

    gcloud compute ssh tpu-name --zone=europe-west4-a

これらの手順を続行する場合は、VM セッション ウィンドウで、(vm)$ で始まる各コマンドを実行します。

pandas をインストールする

次のコマンドを入力して、pandas をインストールまたはアップグレードします。

pip install pandas

ハイパーパラメータの定義

このコード セクションでは、TPU に必要ないくつかのハイパー パラメータを追加します。これらのハイパー パラメータをトレーニング スクリプトにフラグとして追加すると、実行時に変更できます。

追加するパラメータは次のとおりです。

  • tpu。このパラメータは、モデルを実行する TPU ノードの名前や IP アドレスを識別します。
  • model_dir。モデルのチェックポイントを保存するパス。このパスは Cloud Storage バケットである必要があります。
  • iterations。トレーニング ループごとのイテレーション回数。
  • use_tpu。TPU または GPU / CPU 上で可用性に基づいてモデルを実行するかどうかを指定します。

Estimator API

# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
    default=50,
    help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
    default=1000,
    help="Total number of training steps.")
FLAGS = tf.flags.FLAGS

TPUEstimator API

# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
    "tpu", default=None,
    help="The Cloud TPU to use for training. This should be the name used when "
    "creating the Cloud TPU. To find out the name of TPU, either use command "
    "'gcloud compute tpus list --zone=<zone-name>', or use "
    "'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")

# Model specific parameters
tf.flags.DEFINE_string(
    "model_dir", default="",
    help="This should be the path of storage bucket which will be used as "
    "model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
    "batch_size", default=128,
    help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
    "train_steps", default=1000,
    help="Total number of training steps.")
tf.flags.DEFINE_integer(
    "eval_steps", default=4,
    help="Total number of evaluation steps. If `0`, evaluation "
    "after training is skipped.")

# TPU specific parameters.
tf.flags.DEFINE_bool(
    "use_tpu", default=True,
    help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
    "iterations", default=500,
    help="Number of iterations per TPU training loop.")

データの読み込み

このコード セクションでは、データを読み取って読み込む方法を指定します。

TPU は、次のデータ型をサポートしています。

  • tf.float32
  • tf.complex64
  • tf.int64
  • tf.bool
  • tf.bfloat64

Estimator API

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

TPUEstimator API

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
                      dtype={'SepalLength': pd.np.float32,
                             'SepalWidth': pd.np.float32,
                             'PetalLength': pd.np.float32,
                             'PetalWidth': pd.np.float32,
                             'Species': pd.np.int32})
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
                     dtype={'SepalLength': pd.np.float32,
                            'SepalWidth': pd.np.float32,
                            'PetalLength': pd.np.float32,
                            'PetalWidth': pd.np.float32,
                            'Species': pd.np.int32})
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

入力関数の定義

Estimator API と TPUEstimator API では、主に入力関数の関数シグネチャの記述方法が異なります。Estimator API では、任意の数のパラメータで入力関数を作成できます。一方、TPUEstimator API では、入力関数に 1 つのパラメータ(params)しか指定できません。この params には、TPUEstimator オブジェクトのすべての Key-Value ペアに加え、batch_size などの追加のキーが含まれます。

この違いに対処する 1 つの方法は、入力関数を呼び出すときにラムダ関数を使用することです。ラムダ関数を使用すると、既存の入力関数にわずかな変更を加えるだけでその機能を利用できるようになります。

次のセクションでは、入力関数を更新する方法について説明します。ラムダ関数を使用してこれらの入力関数を変換し、TPUEstimator API で使用する方法については、後ほど説明します。

トレーニング用の入力関数

TPUEstimator API を使用する場合、トレーニング用の入力関数 train_input_fn は、Cloud TPU コア数でシャーディングできる入力サンプル数を返す必要があります。たとえば、8 つのコアを使用している場合、各バッチサイズは 8 で割り切れる必要があります。

そのため、前述のコードでは dataset.batch(batch_size, drop_remainder=True) 関数を使用しています。この関数は、batch_size パラメータを使用してバッチ処理を行い、残りの部分を破棄します。

Estimator API

def train_input_fn(features, labels, batch_size):
  """An input function for training"""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat().batch(batch_size)

  # Return the dataset.
  return dataset

TPUEstimator API

def train_input_fn(features, labels, batch_size):
  """An input function for training."""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat()

  dataset = dataset.batch(batch_size, drop_remainder=True)

  # Return the dataset.
  return dataset

評価入力関数

このステップでは、評価入力関数 eval_input_fn を更新して、入力サンプルが TPU コア数でシャーディングされるようにします。これは、dataset.batch(batch_size, drop_remainder=True) 関数で実現できます。

Estimator API

def eval_input_fn(features, labels, batch_size):
  """An input function for evaluation or prediction"""
  features=dict(features)
  if labels is None:
      # No labels, use only features.
      inputs = features
  else:
      inputs = (features, labels)

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices(inputs)

  # Batch the examples
  assert batch_size is not None, "batch_size must not be None"
  dataset = dataset.batch(batch_size)

  # Return the dataset.
  return dataset

TPUEstimator API

 def eval_input_fn(features, labels, batch_size):
    """An input function for evaluation."""
    features = dict(features)
    inputs = (features, labels)

    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices(inputs)
    dataset = dataset.shuffle(1000).repeat()

    dataset = dataset.batch(batch_size, drop_remainder=True)

    # Return the dataset.
    return dataset

予測入力関数

TPUEstimator の予測では、入力データセットに、batch_size の外側の次元が追加されたテンソルが含まれている必要があります。このため、予測入力関数を追加する必要があり、この関数は、featuresbatch_size をパラメータとして取得します。この関数を使用すると、batch_size よりも少ない入力サンプルで済みます。

予測入力関数は、Estimator API を使用している場合は省略可能です。

Estimator API

Estimator API では予測入力関数は省略可能です。評価関数 eval_input_fn がこのタスクを実行します。

TPUEstimator API

  def predict_input_fn(features, batch_size):
    """An input function for prediction."""

    dataset = tf.data.Dataset.from_tensor_slices(features)
    dataset = dataset.batch(batch_size)
    return dataset

カスタムモデル関数の更新

次のタスクは、カスタムモデル関数の更新です。

  • tf.estimator.EstimatorSpec のインスタンスを置き換えて、tf.contrib.tpu.TPUEstimatorSpec を使用します。
  • tf.summary のインスタンスをすべて削除します。TPUEstimator API は TensorBoard のカスタム サマリーをサポートしていません。ただし、基本サマリーは、モデル ディレクトリのイベント ファイルに自動的に記録されます。
  • tf.contrib.tpu.CrossShardOptimizer を使用して、オプティマイザーをラップします。CrossShardOptimizer は、allreduce を使用して勾配を集約し、結果を各シャードにブロードキャストします。CrossShardOptimizer はローカル トレーニングと互換性がないため、use_tpu フラグも確認する必要があります。

Estimator API

def my_model(features, labels, mode, params):
  """DNN with three hidden layers, and dropout of 0.1 probability."""

  # Create three fully connected layers each layer having a dropout
  # probability of 0.1.
  net = tf.feature_column.input_layer(features, params['feature_columns'])
  for units in params['hidden_units']:
      net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params['n_classes'], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
      predictions = {
          'class_ids': predicted_classes[:, tf.newaxis],
          'probabilities': tf.nn.softmax(logits),
          'logits': logits,
      }
      return tf.estimator.EstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  # Compute evaluation metrics.
  accuracy = tf.metrics.accuracy(labels=labels,
                                 predictions=predicted_classes,
                                 name='acc_op')
  metrics = {'accuracy': accuracy}
  tf.summary.scalar('accuracy', accuracy[1])
  if mode == tf.estimator.ModeKeys.EVAL:
      return tf.estimator.EstimatorSpec(
          mode, loss=loss, eval_metric_ops=metrics)

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN
      optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
      train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
      return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

TPUEstimator API

def my_model(features, labels, mode, params):
  """Deep Neural Network(DNN) model.

  This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
  10 neurons in each. And number of neurons in the last layer is equal to the
  number of output classes. This is a densely connected network where each
  neuron of previous layer is connected to each neuron of next layer.

  Args:
    features: Feature values for input samples.
    labels: label/class assigned to the corresponding input sample.
    mode: "TRAIN"/"EVAL"/"PREDICT"
    params: Dictionary used to pass extra parameters to model function from
      the main function.

  Returns:
    TPUEstimatorSpec object.

  """

  # Create three fully connected layers.
  net = tf.feature_column.input_layer(features, params["feature_columns"])
  for units in params["hidden_units"]:
    net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params["n_classes"], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        "class_ids": predicted_classes[:, tf.newaxis],
        "probabilities": tf.nn.softmax(logits),
        "logits": logits,
    }
    return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.contrib.tpu.TPUEstimatorSpec(
        mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN:
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    if FLAGS.use_tpu:
      optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

評価指標関数の追加

Estimator API と TPUEstimator API では、指標の処理方法も異なります。Estimator API では、指標を通常の辞書として渡すことができます。TPUEstimator API では、代わりに関数を使用する必要があります。

Estimator API

省略可。my_model 関数は、指標を生成します。

TPUEstimator API

  def metric_fn(labels, logits):
    """Function to return metrics for evaluation."""

    predicted_classes = tf.argmax(logits, 1)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predicted_classes,
                                   name="acc_op")
    return {"accuracy": accuracy}

メイン関数の更新

TPU を構成する

このステップでは、TPU クラスタを構成します。

クラスタを構成するには、ハイパーパラメータに割り当てられた値を使用します。詳細については、ハイパー パラメータを定義するをご覧ください。また、次の値も設定する必要があります。

  • allow_soft_placement。true に設定すると、TPU を使用できない場合に TensorFlow は GPU デバイスを使用できます。GPU デバイスも使用できない場合は、CPU デバイスが使用されます。
  • log_device_placement。TensorFlow がデバイスの割り当てをログに記録する必要があることを示します。

Estimator API

このコード セクションは TPU にのみ影響するため、必須ではありません。

TPUEstimator API

# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
    FLAGS.tpu)

run_config = tf.contrib.tpu.RunConfig(
    model_dir=FLAGS.model_dir,
    cluster=tpu_cluster_resolver,
    session_config=tf.ConfigProto(
        allow_soft_placement=True, log_device_placement=True),
    tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)

TPU 固有のパラメータを分類に追加する

コードのこのセクションでは、分類変数を更新して TPUEstimator クラスを使用します。この変更では、次のパラメータを追加する必要があります。

  • use_tpu
  • train_batch_size
  • eval_batch_size
  • predict_batch_size
  • config

Estimator API

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.estimator.Estimator(
      model_fn=my_model,
      params={
          'feature_columns': my_feature_columns,
          # Two hidden layers of 10 nodes each.
          'hidden_units': [10, 10],
          # The model must choose between 3 classes.
          'n_classes': 3,
      })

TPUEstimator API

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.contrib.tpu.TPUEstimator(
      model_fn=my_model,
      use_tpu=FLAGS.use_tpu,
      train_batch_size=FLAGS.batch_size,
      eval_batch_size=FLAGS.batch_size,
      predict_batch_size=FLAGS.batch_size,
      config=run_config,
      params={
          # Name of the feature columns in the input data.
          "feature_columns": my_feature_columns,
          # Two hidden layers of 10 nodes each.
          "hidden_units": [10, 10],
          # The model must choose between 3 classes.
          "n_classes": 3,
          "use_tpu": FLAGS.use_tpu,
      })

train メソッドを呼び出す

次の変更は、train メソッドの更新です。ラムダ関数を使用して、train_input_fn 関数を呼び出すことに注意してください。この方法論では、TPUEstimator API で既存の関数を簡単に使用できます。

また、steps パラメータを max_steps に変更する必要があります。次のセクションでは、steps パラメータの用途を変えて、評価ステップの数を指定します。

Estimator API

  # Train the Model.
  classifier.train(
      input_fn=lambda:iris_data.train_input_fn(
          train_x, train_y, FLAGS.batch_size),
      steps=FLAGS.train_steps)

TPUEstimator API

  # Train the Model.
  classifier.train(
      input_fn=lambda params: iris_data.train_input_fn(
          train_x, train_y, params["batch_size"]),
      max_steps=FLAGS.train_steps)

評価メソッドを呼び出す

この変更は、train メソッドに加えた変更と似ています。ここでも、ラムダ関数を使用して、既存の評価入力関数を簡単に使用できるようにします。

また、steps パラメータを、eval_steps コマンドライン フラグで設定した値に変更する必要があります。

Estimator API

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda:iris_data.eval_input_fn(
          test_x, test_y, FLAGS.batch_size))

  print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))

TPUEstimator API

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda params: iris_data.eval_input_fn(
          test_x, test_y, params["batch_size"]),
      steps=FLAGS.eval_steps)

予測メソッドを呼び出す

train および evaluate メソッドと同様に、predict メソッドを更新する必要があります。 ここでも、ラムダ関数を使用して、既存の評価入力関数を簡単に使用できるようにします。

Estimator API

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda: iris_data.eval_input_fn(
          iris_data.PREDICTION_INPUT_DATA,
          labels=None,
          batch_size=FLAGS.batch_size))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
      template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')

      class_id = pred_dict['class_ids'][0]
      probability = pred_dict['probabilities'][class_id]

      print(template.format(iris_data.SPECIES[class_id],
                            100 * probability, expec))

TPUEstimator API

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda params: iris_data.predict_input_fn(
          iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
    template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")

    class_id = pred_dict["class_ids"][0]
    probability = pred_dict["probabilities"][class_id]

    print(template.format(iris_data.SPECIES[class_id],
                          100 * probability, expec))

クリーンアップ

このトピックで使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。

  1. Compute Engine VM との接続を解除します。

    (vm)$ exit

    プロンプトが username@projectname に変わります。これは、現在、Cloud Shell 内にいることを示しています。

  2. Cloud Shell で、Cloud TPU の設定時に使用した --zone フラグを指定して ctpu delete を実行し、Compute Engine VM と Cloud TPU を削除します。

    $ ctpu delete [optional: --zone]
  3. TPU の使用に対して不要な料金が発生しないように、ctpu status を実行してインスタンスが割り当てられていないことを確認します。削除には数分かかることがあります。次のようなレスポンスは、割り当てられたインスタンスがないことを示します。

    $ ctpu status --zone=europe-west4-a
    2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a"
    No instances currently exist.
        Compute Engine VM:     --
        Cloud TPU:             --
  4. 次に示すように gsutil を実行します。bucket-name の部分は、このチュートリアルで作成した Cloud Storage バケット名に置き換えてください。

    $ gsutil rm -r gs://bucket-name

次のステップ

Estimator API と TPUEstimator API の詳細については、次のトピックをご覧ください。