Estimator API から TPUEstimator API への移行

このチュートリアルでは、Estimator API を使用するモデル プログラムを、TPUEstimator API を使用するものに変換する方法について説明します。

概要

TPUEstimator API を使用するモデル プログラムは、CPU および GPU との互換性を保ちながら、Tensor Processing Unit(TPU)を最大限に活用できます。

このチュートリアルを終了すると、次のことを習得できます。

  • Estimator API を使用するコードを、TPUEstimator API を使用するコードに変換する方法
  • Cloud TPU で予測を実行する方法

始める前に

このチュートリアルを開始する前に、Google Cloud Platform プロジェクトが正しく設定されていることを確認してください。

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Google Cloud Platform プロジェクトを選択または作成します。

    [リソースの管理] ページに移動

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。

    課金を有効にする方法について

  4. このチュートリアルでは、Google Cloud Platform の課金対象となるコンポーネントを使用します。費用を見積もるには、Cloud TPU の料金ページを確認してください。不要な課金を回避できるよう、このチュートリアルを完了したら、作成したリソースを必ずクリーンアップしてください。

リソースを設定する

このセクションでは、チュートリアルで使用する Cloud Storage のストレージ、VM、Cloud TPU の各リソースを設定する方法を説明します。

Cloud Storage バケットを作成する

モデルのトレーニングに使用するデータとトレーニング結果を格納するには、Cloud Storage バケットが必要です。このチュートリアルで使用する ctpu up ツールは、Cloud TPU サービス アカウントのデフォルトの権限を設定します。権限の詳細な設定が必要な場合は、アクセスレベル権限をご覧ください。

作成するバケットは、使用する仮想マシン(VM)や Cloud TPU デバイス(複数の TPU デバイスの場合は Cloud TPU スライス)と同じリージョンに配置する必要があります。

  1. GCP Console の Cloud Storage ページに移動します。

    Cloud Storage ページに移動

  2. 次のオプションを指定して新しいバケットを作成します。

    • 任意の一意な名前
    • デフォルトのストレージ クラス: Regional
    • ロケーション: 単一の Cloud TPU デバイスを使用する場合は、表示されるデフォルト ロケーションを受け入れます。Cloud TPU Pod スライスを使用する場合、Cloud TPU Pod を利用できるリージョンを選択する必要があります。

ctpu ツールを使用する

このセクションでは、Cloud TPU プロビジョニング ツールctpu)を使用して Cloud TPU プロジェクトのリソースを作成、管理する方法を説明します。リソースは、同じ名前が付けられた仮想マシン(VM)と Cloud TPU リソースで構成されます。これらのリソースは、作成したバケットと同じリージョン / ゾーンに存在する必要があります。

VM リソースと TPU リソースを設定するには、gcloud コマンドまたは Cloud Console を使用することもできます。Compute Engine VM と Cloud TPU の設定から管理までのすべての手順については、VM と TPU のリソース管理のページをご覧ください。

ctpu up を実行してリソースを作成する

  1. Cloud Shell ウィンドウを開きます。

    Cloud Shell を開く

  2. gcloud config set project <Your-Project> を実行して、Cloud TPU の作成に使用するプロジェクトを設定します。

  3. Cloud TPU デバイスまたは Pod スライスのいずれかを示すフラグを指定して ctpu up を実行します。フラグのオプションと説明については、CTPU リファレンスをご覧ください。

  4. Cloud TPU デバイスを設定します。

    $ ctpu up 

    構成に関する次のメッセージが表示されます。

    ctpu will use the following configuration:
    
    Name: [your TPU's name]
    Zone: [your project's zone]
    GCP Project: [your project's name]
    TensorFlow Version: 1.14
    VM:
     Machine Type: [your machine type]
     Disk Size: [your disk size]
     Preemptible: [true or false]
    Cloud TPU:
     Size: [your TPU size]
     Preemptible: [true or false]
    
    OK to create your Cloud TPU resources with the above configuration? [Yn]:
    

    y キーを押して、Cloud TPU リソースを作成します。

ctpu up コマンドにより、仮想マシン(VM)と Cloud TPU サービスが作成されます。

これ以降、接頭辞 (vm)$ は Compute Engine VM インスタンスでコマンドを実行する必要があることを意味します。

Compute Engine VM を確認する

ctpu up コマンドの実行が終了したら、シェル プロンプトが username@project から username@tpuname に変更されたことを確認します。変更されていれば、Compute Engine VM にログインしていることになります。

pandas をインストールする

次のコマンドを入力して、pandas をインストールまたはアップグレードします。

pip install pandas

その他の TensorFlow のコンセプト

TensorFlow の次のコンセプトも理解しておく必要があります。

  • Estimator。 詳細については、TensorFlow ガイドをご覧ください。
  • Custom Estimator。 Custom Estimator の詳細については、TensorFlow ガイドの Custom Estimator の作成をご覧ください。

ハイパーパラメータの定義

このコード セクションでは、TPU に必要ないくつかのハイパーパラメータを追加します。これらのハイパーパラメータをフラグとして追加すると、実行時に変更できます。

追加するパラメータは次のとおりです。

  • tpu。 このパラメータは、モデルを実行する TPU ノードの名前や IP アドレスを識別します。
  • model_dir。 モデルのチェックポイントを保存するパス。このパスは Cloud Storage バケットである必要があります。
  • iterations。 トレーニング ループごとのイテレーション回数。
  • use_tpu。 TPU または GPU / CPU 上で可用性に基づいてモデルを実行するかどうかを指定します。

Estimator API

# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
    default=50,
    help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
    default=1000,
    help="Total number of training steps.")
FLAGS = tf.flags.FLAGS

TPUEstimator API

# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
    "tpu", default=None,
    help="The Cloud TPU to use for training. This should be the name used when "
    "creating the Cloud TPU. To find out the name of TPU, either use command "
    "'gcloud compute tpus list --zone=<zone-name>', or use "
    "'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")

# Model specific parameters
tf.flags.DEFINE_string(
    "model_dir", default="",
    help="This should be the path of storage bucket which will be used as "
    "model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
    "batch_size", default=128,
    help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
    "train_steps", default=1000,
    help="Total number of training steps.")
tf.flags.DEFINE_integer(
    "eval_steps", default=4,
    help="Total number of evaluation steps. If `0`, evaluation "
    "after training is skipped.")

# TPU specific parameters.
tf.flags.DEFINE_bool(
    "use_tpu", default=True,
    help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
    "iterations", default=500,
    help="Number of iterations per TPU training loop.")

データの読み込み

このコード セクションでは、データを読み取って読み込む方法を指定します。

TPU は、次のデータ型をサポートしています。

  • tf.float32
  • tf.complex64
  • tf.int64
  • tf.bool
  • tf.bfloat64

Estimator API

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

TPUEstimator API

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
                      dtype={'SepalLength': pd.np.float32,
                             'SepalWidth': pd.np.float32,
                             'PetalLength': pd.np.float32,
                             'PetalWidth': pd.np.float32,
                             'Species': pd.np.int32})
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
                     dtype={'SepalLength': pd.np.float32,
                            'SepalWidth': pd.np.float32,
                            'PetalLength': pd.np.float32,
                            'PetalWidth': pd.np.float32,
                            'Species': pd.np.int32})
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

入力関数の定義

Estimator API と TPUEstimator API では、主に入力関数の関数シグネチャの記述方法が異なります。Estimator API では、任意の数のパラメータで入力関数を作成できます。一方、TPUEstimator API では、入力関数に 1 つのパラメータ(params)しか指定できません。この params には、TPUEstimator オブジェクトのすべての Key-Value ペアに加え、batch_size などの追加のキーが含まれます。

この違いに対処する 1 つの方法は、入力関数を呼び出すときにラムダ関数を使用することです。ラムダ関数を使用すると、既存の入力関数にわずかな変更を加えるだけでその機能を利用できるようになります。

次のセクションでは、入力関数を更新する方法について説明します。後で、ラムダ関数を使用してこれらの入力関数を変換し、TPUEstimator API で使用する方法について説明します。

トレーニング入力関数

TPUEstimator API では、トレーニング入力関数 train_input_fn は、Cloud TPU のコア数で分割できる複数の入力サンプルを返す必要があります。たとえば、8 つのコアを使用している場合、各バッチサイズは 8 で割り切れる必要があります。

そのために、前述のコードでは dataset.batch(batch_size, drop_remainder=True) 関数を使用しています。この関数は、batch_size パラメータを使用してバッチ処理を行い、残りの部分を破棄します。

Estimator API

def train_input_fn(features, labels, batch_size):
  """An input function for training"""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat().batch(batch_size)

  # Return the dataset.
  return dataset

TPUEstimator API

def train_input_fn(features, labels, batch_size):
  """An input function for training."""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat()

  dataset = dataset.batch(batch_size, drop_remainder=True)

  # Return the dataset.
  return dataset

評価入力関数

このステップでは、評価入力関数 eval_input_fn を更新して、入力サンプルを TPU のコア数で分割できるようにします。そのために、dataset.batch(batch_size, drop_remainder=True) 関数を使用します。

Estimator API

def eval_input_fn(features, labels, batch_size):
  """An input function for evaluation or prediction"""
  features=dict(features)
  if labels is None:
      # No labels, use only features.
      inputs = features
  else:
      inputs = (features, labels)

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices(inputs)

  # Batch the examples
  assert batch_size is not None, "batch_size must not be None"
  dataset = dataset.batch(batch_size)

  # Return the dataset.
  return dataset

TPUEstimator API

 def eval_input_fn(features, labels, batch_size):
    """An input function for evaluation."""
    features = dict(features)
    inputs = (features, labels)

    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices(inputs)
    dataset = dataset.shuffle(1000).repeat()

    dataset = dataset.batch(batch_size, drop_remainder=True)

    # Return the dataset.
    return dataset

予測入力関数

TPUEstimator の予測では、入力データセットに、batch_size の外側の次元が追加されたテンソルが含まれている必要があります。このため、予測入力関数を追加する必要があり、この関数は、featuresbatch_size をパラメータとして取得します。この関数を使用すると、batch_size よりも入力サンプルを少なくできます。

予測入力関数は、Estimator API を使用している場合は省略可能です。

Estimator API

Estimator API では予測入力関数は省略可能です。評価関数 eval_input_fn がこのタスクを実行します。

TPUEstimator API

  def predict_input_fn(features, batch_size):
    """An input function for prediction."""

    dataset = tf.data.Dataset.from_tensor_slices(features)
    dataset = dataset.batch(batch_size)
    return dataset

カスタムモデル関数の更新

次のタスクは、カスタムモデル関数を更新することです。

  • tf.estimator.EstimatorSpec のインスタンスを置き換えて tf.contrib.tpu.TPUEstimatorSpec を使用します。
  • tf.summary のインスタンスをすべて削除します。TPUEstimator API は TensorBoard のカスタム サマリーをサポートしていません。ただし、基本サマリーは、モデル ディレクトリのイベント ファイルに自動的に記録されます。
  • tf.contrib.tpu.CrossShardOptimizer を使用してオプティマイザーをラップします。CrossShardOptimizer は、allreduce を使用して勾配を集約し、結果を各シャードにブロードキャストします。CrossShardOptimizer はローカル トレーニングと互換性がないため、use_tpu フラグも確認する必要があります。

Estimator API

def my_model(features, labels, mode, params):
  """DNN with three hidden layers, and dropout of 0.1 probability."""

  # Create three fully connected layers each layer having a dropout
  # probability of 0.1.
  net = tf.feature_column.input_layer(features, params['feature_columns'])
  for units in params['hidden_units']:
      net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params['n_classes'], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
      predictions = {
          'class_ids': predicted_classes[:, tf.newaxis],
          'probabilities': tf.nn.softmax(logits),
          'logits': logits,
      }
      return tf.estimator.EstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  # Compute evaluation metrics.
  accuracy = tf.metrics.accuracy(labels=labels,
                                 predictions=predicted_classes,
                                 name='acc_op')
  metrics = {'accuracy': accuracy}
  tf.summary.scalar('accuracy', accuracy[1])
  if mode == tf.estimator.ModeKeys.EVAL:
      return tf.estimator.EstimatorSpec(
          mode, loss=loss, eval_metric_ops=metrics)

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN
      optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
      train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
      return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

TPUEstimator API

def my_model(features, labels, mode, params):
  """Deep Neural Network(DNN) model.

  This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
  10 neurons in each. And number of neurons in the last layer is equal to the
  number of output classes. This is a densely connected network where each
  neuron of previous layer is connected to each neuron of next layer.

  Args:
    features: Feature values for input samples.
    labels: label/class assigned to the corresponding input sample.
    mode: "TRAIN"/"EVAL"/"PREDICT"
    params: Dictionary used to pass extra parameters to model function from
      the main function.

  Returns:
    TPUEstimatorSpec object.

  """

  # Create three fully connected layers.
  net = tf.feature_column.input_layer(features, params["feature_columns"])
  for units in params["hidden_units"]:
    net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params["n_classes"], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        "class_ids": predicted_classes[:, tf.newaxis],
        "probabilities": tf.nn.softmax(logits),
        "logits": logits,
    }
    return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.contrib.tpu.TPUEstimatorSpec(
        mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN:
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    if FLAGS.use_tpu:
      optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

評価指標関数の追加

Estimator API と TPUEstimator API では、指標の処理方法も異なります。Estimator API では、指標を通常の辞書として渡すことができます。TPUEstimator API では、代わりに関数を使用する必要があります。

Estimator API

省略可。my_model 関数は指標を生成します。

TPUEstimator API

  def metric_fn(labels, logits):
    """Function to return metrics for evaluation."""

    predicted_classes = tf.argmax(logits, 1)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predicted_classes,
                                   name="acc_op")
    return {"accuracy": accuracy}

メイン関数の更新

TPU を構成する

このステップでは、TPU クラスタを構成します。

クラスタを構成するには、ハイパーパラメータに割り当てられた値を使用します。詳細については、ハイパーパラメータを定義するをご覧ください。また、次の値も設定する必要があります。

  • allow_soft_placement。「true」に設定すると、TPU を使用できない場合に TensorFlow は GPU デバイスを使用できます。GPU デバイスも使用できない場合は、CPU デバイスが使用されます。
  • log_device_placement。TensorFlow がデバイスの割り当てをログに記録する必要があることを示します。

Estimator API

このコード セクションは TPU にのみ影響するため、必須ではありません。

TPUEstimator API

# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
    FLAGS.tpu)

run_config = tf.contrib.tpu.RunConfig(
    model_dir=FLAGS.model_dir,
    cluster=tpu_cluster_resolver,
    session_config=tf.ConfigProto(
        allow_soft_placement=True, log_device_placement=True),
    tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)

TPU 固有のパラメータを分類に追加する

コードのこのセクションでは、TPUEstimator クラスを使用するように分類変数を更新します。この変更では、次のパラメータを追加する必要があります。

  • use_tpu
  • train_batch_size
  • eval_batch_size
  • predict_batch_size
  • config

Estimator API

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.estimator.Estimator(
      model_fn=my_model,
      params={
          'feature_columns': my_feature_columns,
          # Two hidden layers of 10 nodes each.
          'hidden_units': [10, 10],
          # The model must choose between 3 classes.
          'n_classes': 3,
      })

TPUEstimator API

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.contrib.tpu.TPUEstimator(
      model_fn=my_model,
      use_tpu=FLAGS.use_tpu,
      train_batch_size=FLAGS.batch_size,
      eval_batch_size=FLAGS.batch_size,
      predict_batch_size=FLAGS.batch_size,
      config=run_config,
      params={
          # Name of the feature columns in the input data.
          "feature_columns": my_feature_columns,
          # Two hidden layers of 10 nodes each.
          "hidden_units": [10, 10],
          # The model must choose between 3 classes.
          "n_classes": 3,
          "use_tpu": FLAGS.use_tpu,
      })

train メソッドを呼び出す

次の変更は、train メソッドの更新です。ラムダ関数を使用して、train_input_fn 関数を呼び出すことに注意してください。この方法では、TPUEstimator API で既存の関数を簡単に使用できます。

また、steps パラメータを max_steps に変更する必要があります。次のセクションでは、steps パラメータの用途を変えて、評価ステップの数を指定します。

Estimator API

  # Train the Model.
  classifier.train(
      input_fn=lambda:iris_data.train_input_fn(
          train_x, train_y, FLAGS.batch_size),
      steps=FLAGS.train_steps)

TPUEstimator API

  # Train the Model.
  classifier.train(
      input_fn=lambda params: iris_data.train_input_fn(
          train_x, train_y, params["batch_size"]),
      max_steps=FLAGS.train_steps)

評価メソッドを呼び出す

この変更は、train メソッドに加えた変更と似ています。ここでも、ラムダ関数を使用して、既存の評価入力関数を簡単に使用できるようにします。

また、steps パラメータを、eval_steps コマンドライン フラグから設定された値に変更する必要があります。

Estimator API

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda:iris_data.eval_input_fn(
          test_x, test_y, FLAGS.batch_size))

  print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))

TPUEstimator API

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda params: iris_data.eval_input_fn(
          test_x, test_y, params["batch_size"]),
      steps=FLAGS.eval_steps)

予測メソッドを呼び出す

train および evaluate メソッドと同様に、predict メソッドを更新する必要があります。ここでも、ラムダ関数を使用して、既存の評価入力関数を簡単に使用できるようにします。

Estimator API

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda: iris_data.eval_input_fn(
          iris_data.PREDICTION_INPUT_DATA,
          labels=None,
          batch_size=FLAGS.batch_size))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
      template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')

      class_id = pred_dict['class_ids'][0]
      probability = pred_dict['probabilities'][class_id]

      print(template.format(iris_data.SPECIES[class_id],
                            100 * probability, expec))

TPUEstimator API

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda params: iris_data.predict_input_fn(
          iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
    template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")

    class_id = pred_dict["class_ids"][0]
    probability = pred_dict["probabilities"][class_id]

    print(template.format(iris_data.SPECIES[class_id],
                          100 * probability, expec))

クリーンアップ

このトピックで使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。

  1. Compute Engine VM との接続を解除します。

    (vm)$ exit
    

    プロンプトが user@projectname と表示され、Cloud Shell 内にいることが示されます。

  2. Cloud Shell で、Cloud TPU の設定時に使用した --zone フラグを指定して ctpu delete を実行し、Compute Engine VM と Cloud TPU を削除します。

    $ ctpu delete [optional: --zone]
    
  3. TPU の使用に対する不要な料金が発生しないように、ctpu status を実行して、インスタンスの割り当てがないことを確認します。削除には数分かかることがあります。次のようなレスポンスは、割り当てられたインスタンスがないことを示します。

    2018/04/28 16:16:23 WARNING: Setting zone to "us-central1-b"
    No instances currently exist.
            Compute Engine VM:     --
            Cloud TPU:             --
    
  4. 次に示す gsutil を実行します。YOUR-BUCKET-NAME の部分は、このチュートリアルで作成した Cloud Storage バケットの名前に置き換えてください。

    $ gsutil rm -r gs://YOUR-BUCKET-NAME
    

次のステップ

Estimator API と TPUEstimator API の詳細については、次のトピックをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...