Estimator API から TPUEstimator API への移行
このチュートリアルでは、Estimator API を使用するモデル プログラムを、TPUEstimator API を使用するものに変換する方法について説明します。
警告: TPUEstimator API は、Tensorflow 1.x でのみサポートされています。TensorFlow 2.x でモデルを記述する場合は、代わりに Keras を使用してください。
概要
TPUEstimator API を使用するモデル プログラムは、CPU および GPU との互換性を保ちながら、Tensor Processing Unit(TPU)を最大限に活用できます。
このチュートリアルを終了すると、次のことを習得できます。
- Estimator API を使用するコードを、TPUEstimator API を使用するコードに変換する方法
- Cloud TPU で予測を実行する方法
準備
このチュートリアルを開始する前に、Google Cloud プロジェクトが正しく設定されていることを確認します。
このチュートリアルでは、Google Cloud の課金対象となるコンポーネントを使用します。費用を見積もるには、Cloud TPU の料金ページを確認してください。不要な課金を回避するために、このチュートリアルを完了したら、作成したリソースを必ずクリーンアップしてください。
リソースを設定する
このセクションでは、チュートリアルで使用する Cloud Storage のストレージ、VM、Cloud TPU の各リソースを設定する方法を説明します。
Cloud Storage バケットの作成
モデルのトレーニングに使用するデータとトレーニング結果を格納するには、Cloud Storage バケットが必要です。このチュートリアルで使用する gcloud
コマンドは、Cloud TPU サービス アカウントのデフォルトの権限を設定します。権限の詳細な設定が必要な場合は、アクセスレベル権限をご覧ください。
バケットのロケーションは、仮想マシン(VM)および TPU ノードと同じリージョンにする必要があります。VM と TPU ノードは、リージョン内のサブディビジョンである特定のゾーンに配置されます。
Google Cloud コンソールの [Cloud Storage] ページに移動します。
次のオプションを指定して新しいバケットを作成します。
- 任意の一意な名前
- ロケーション タイプに
Region
、ロケーション(ゾーン)にus-central1
を選択します。 - デフォルトのストレージ クラス:
Standard
- ロケーション: TPU ノードを作成する予定のリージョンと同じリージョンにバケットのロケーションを指定します。各種の TPU タイプをどこで使用できるかについては、TPU タイプとゾーンをご覧ください。
TPU と VM を作成する
TPU リソースは、同じ名前が設定された仮想マシン(VM)と Cloud TPU で構成されます。これらのリソースは、作成したバケットと同じリージョン / ゾーンに存在する必要があります。
VM リソースと TPU リソースを設定するには、gcloud
コマンドまたは Cloud コンソールを使用します。TPU リソースの管理の詳細については、TPU の作成と削除をご覧ください。
Cloud Shell ウィンドウを開きます。
プロジェクトを使用するように
gcloud
を構成します。 Cloud TPU を作成するプロジェクト。$ gcloud config set project your-project
gcloud
コマンドを使用して Compute Engine VM と Cloud TPU を起動します。$ gcloud compute tpus execution-groups create \ --name=tpu-name \ --zone=europe-west4-a \ --tf-version=2.12.0 \ --machine-type=n1-standard-1 \ --accelerator-type=v3-8
コマンドフラグの説明
gcloud
コマンドの詳細については、gcloud リファレンスをご覧ください。gcloud compute tpus execution-groups
コマンドの実行が終了したら、shell プロンプトがusername@projectname
からusername@vm-name
に変更されたことを確認します。変更されていれば、Compute Engine VM にログインしていることになります。gcloud compute ssh tpu-name --zone=europe-west4-a
これらの手順を続行する場合は、VM セッション ウィンドウで、(vm)$
で始まる各コマンドを実行します。
pandas をインストールする
次のコマンドを入力して、pandas をインストールまたはアップグレードします。
pip install pandas
ハイパーパラメータの定義
このコード セクションでは、TPU に必要ないくつかのハイパー パラメータを追加します。これらのハイパー パラメータをトレーニング スクリプトにフラグとして追加すると、実行時に変更できます。
追加するパラメータは次のとおりです。
- tpu。このパラメータは、モデルを実行する TPU ノードの名前や IP アドレスを識別します。
- model_dir。モデルのチェックポイントを保存するパス。このパスは Cloud Storage バケットである必要があります。
- iterations。トレーニング ループごとのイテレーション回数。
- use_tpu。TPU または GPU / CPU 上で可用性に基づいてモデルを実行するかどうかを指定します。
Estimator API
# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
default=50,
help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
default=1000,
help="Total number of training steps.")
FLAGS = tf.flags.FLAGS
TPUEstimator API
# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
"tpu", default=None,
help="The Cloud TPU to use for training. This should be the name used when "
"creating the Cloud TPU. To find out the name of TPU, either use command "
"'gcloud compute tpus list --zone=<zone-name>', or use "
"'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")
# Model specific parameters
tf.flags.DEFINE_string(
"model_dir", default="",
help="This should be the path of storage bucket which will be used as "
"model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
"batch_size", default=128,
help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
"train_steps", default=1000,
help="Total number of training steps.")
tf.flags.DEFINE_integer(
"eval_steps", default=4,
help="Total number of evaluation steps. If `0`, evaluation "
"after training is skipped.")
# TPU specific parameters.
tf.flags.DEFINE_bool(
"use_tpu", default=True,
help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
"iterations", default=500,
help="Number of iterations per TPU training loop.")
データの読み込み
このコード セクションでは、データを読み取って読み込む方法を指定します。
TPU は、次のデータ型をサポートしています。
tf.float32
tf.complex64
tf.int64
tf.bool
tf.bfloat64
Estimator API
def load_data(y_name='Species'):
"""Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
train_path, test_path = maybe_download()
train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
train_x, train_y = train, train.pop(y_name)
test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
test_x, test_y = test, test.pop(y_name)
return (train_x, train_y), (test_x, test_y)
TPUEstimator API
def load_data(y_name='Species'):
"""Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
train_path, test_path = maybe_download()
train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
dtype={'SepalLength': pd.np.float32,
'SepalWidth': pd.np.float32,
'PetalLength': pd.np.float32,
'PetalWidth': pd.np.float32,
'Species': pd.np.int32})
train_x, train_y = train, train.pop(y_name)
test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
dtype={'SepalLength': pd.np.float32,
'SepalWidth': pd.np.float32,
'PetalLength': pd.np.float32,
'PetalWidth': pd.np.float32,
'Species': pd.np.int32})
test_x, test_y = test, test.pop(y_name)
return (train_x, train_y), (test_x, test_y)
入力関数の定義
Estimator API と TPUEstimator API では、主に入力関数の関数シグネチャの記述方法が異なります。Estimator API では、任意の数のパラメータで入力関数を作成できます。一方、TPUEstimator API では、入力関数に 1 つのパラメータ(params
)しか指定できません。この params
には、TPUEstimator オブジェクトのすべての Key-Value ペアに加え、batch_size
などの追加のキーが含まれます。
この違いに対処する 1 つの方法は、入力関数を呼び出すときにラムダ関数を使用することです。ラムダ関数を使用すると、既存の入力関数にわずかな変更を加えるだけでその機能を利用できるようになります。
次のセクションでは、入力関数を更新する方法について説明します。ラムダ関数を使用してこれらの入力関数を変換し、TPUEstimator API で使用する方法については、後ほど説明します。
トレーニング用の入力関数
TPUEstimator API を使用する場合、トレーニング用の入力関数 train_input_fn
は、Cloud TPU コア数でシャーディングできる入力サンプル数を返す必要があります。たとえば、8 つのコアを使用している場合、各バッチサイズは 8 で割り切れる必要があります。
そのため、前述のコードでは dataset.batch(batch_size, drop_remainder=True)
関数を使用しています。この関数は、batch_size
パラメータを使用してバッチ処理を行い、残りの部分を破棄します。
Estimator API
def train_input_fn(features, labels, batch_size):
"""An input function for training"""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(1000).repeat().batch(batch_size)
# Return the dataset.
return dataset
TPUEstimator API
def train_input_fn(features, labels, batch_size):
"""An input function for training."""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.batch(batch_size, drop_remainder=True)
# Return the dataset.
return dataset
評価入力関数
このステップでは、評価入力関数 eval_input_fn
を更新して、入力サンプルが TPU コア数でシャーディングされるようにします。これは、dataset.batch(batch_size, drop_remainder=True)
関数で実現できます。
Estimator API
def eval_input_fn(features, labels, batch_size):
"""An input function for evaluation or prediction"""
features=dict(features)
if labels is None:
# No labels, use only features.
inputs = features
else:
inputs = (features, labels)
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices(inputs)
# Batch the examples
assert batch_size is not None, "batch_size must not be None"
dataset = dataset.batch(batch_size)
# Return the dataset.
return dataset
TPUEstimator API
def eval_input_fn(features, labels, batch_size):
"""An input function for evaluation."""
features = dict(features)
inputs = (features, labels)
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices(inputs)
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.batch(batch_size, drop_remainder=True)
# Return the dataset.
return dataset
予測入力関数
TPUEstimator の予測では、入力データセットに、batch_size
の外側の次元が追加されたテンソルが含まれている必要があります。このため、予測入力関数を追加する必要があり、この関数は、features
と batch_size
をパラメータとして取得します。この関数を使用すると、batch_size
よりも少ない入力サンプルで済みます。
予測入力関数は、Estimator API を使用している場合は省略可能です。
Estimator API
Estimator API では予測入力関数は省略可能です。評価関数 eval_input_fn がこのタスクを実行します。
TPUEstimator API
def predict_input_fn(features, batch_size):
"""An input function for prediction."""
dataset = tf.data.Dataset.from_tensor_slices(features)
dataset = dataset.batch(batch_size)
return dataset
カスタムモデル関数の更新
次のタスクは、カスタムモデル関数の更新です。
tf.estimator.EstimatorSpec
のインスタンスを置き換えて、tf.contrib.tpu.TPUEstimatorSpec
を使用します。tf.summary
のインスタンスをすべて削除します。TPUEstimator API は TensorBoard のカスタム サマリーをサポートしていません。ただし、基本サマリーは、モデル ディレクトリのイベント ファイルに自動的に記録されます。tf.contrib.tpu.CrossShardOptimizer
を使用して、オプティマイザーをラップします。CrossShardOptimizer
は、allreduce
を使用して勾配を集約し、結果を各シャードにブロードキャストします。CrossShardOptimizer
はローカル トレーニングと互換性がないため、use_tpu
フラグも確認する必要があります。
Estimator API
def my_model(features, labels, mode, params):
"""DNN with three hidden layers, and dropout of 0.1 probability."""
# Create three fully connected layers each layer having a dropout
# probability of 0.1.
net = tf.feature_column.input_layer(features, params['feature_columns'])
for units in params['hidden_units']:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
# Compute logits (1 per class).
logits = tf.layers.dense(net, params['n_classes'], activation=None)
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
'class_ids': predicted_classes[:, tf.newaxis],
'probabilities': tf.nn.softmax(logits),
'logits': logits,
}
return tf.estimator.EstimatorSpec(mode, predictions=predictions)
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
logits=logits)
# Compute evaluation metrics.
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predicted_classes,
name='acc_op')
metrics = {'accuracy': accuracy}
tf.summary.scalar('accuracy', accuracy[1])
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode, loss=loss, eval_metric_ops=metrics)
# Create training op.
if mode == tf.estimator.ModeKeys.TRAIN
optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
TPUEstimator API
def my_model(features, labels, mode, params):
"""Deep Neural Network(DNN) model.
This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
10 neurons in each. And number of neurons in the last layer is equal to the
number of output classes. This is a densely connected network where each
neuron of previous layer is connected to each neuron of next layer.
Args:
features: Feature values for input samples.
labels: label/class assigned to the corresponding input sample.
mode: "TRAIN"/"EVAL"/"PREDICT"
params: Dictionary used to pass extra parameters to model function from
the main function.
Returns:
TPUEstimatorSpec object.
"""
# Create three fully connected layers.
net = tf.feature_column.input_layer(features, params["feature_columns"])
for units in params["hidden_units"]:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
# Compute logits (1 per class).
logits = tf.layers.dense(net, params["n_classes"], activation=None)
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
"class_ids": predicted_classes[:, tf.newaxis],
"probabilities": tf.nn.softmax(logits),
"logits": logits,
}
return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
logits=logits)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.contrib.tpu.TPUEstimatorSpec(
mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))
# Create training op.
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
if FLAGS.use_tpu:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)
評価指標関数の追加
Estimator API と TPUEstimator API では、指標の処理方法も異なります。Estimator API では、指標を通常の辞書として渡すことができます。TPUEstimator API では、代わりに関数を使用する必要があります。
Estimator API
省略可。my_model
関数は、指標を生成します。
TPUEstimator API
def metric_fn(labels, logits):
"""Function to return metrics for evaluation."""
predicted_classes = tf.argmax(logits, 1)
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predicted_classes,
name="acc_op")
return {"accuracy": accuracy}
メイン関数の更新
TPU を構成する
このステップでは、TPU クラスタを構成します。
クラスタを構成するには、ハイパーパラメータに割り当てられた値を使用します。詳細については、ハイパー パラメータを定義するをご覧ください。また、次の値も設定する必要があります。
allow_soft_placement
。true に設定すると、TPU を使用できない場合に TensorFlow は GPU デバイスを使用できます。GPU デバイスも使用できない場合は、CPU デバイスが使用されます。log_device_placement
。TensorFlow がデバイスの割り当てをログに記録する必要があることを示します。
Estimator API
このコード セクションは TPU にのみ影響するため、必須ではありません。
TPUEstimator API
# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
FLAGS.tpu)
run_config = tf.contrib.tpu.RunConfig(
model_dir=FLAGS.model_dir,
cluster=tpu_cluster_resolver,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=True),
tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)
TPU 固有のパラメータを分類に追加する
コードのこのセクションでは、分類変数を更新して TPUEstimator クラスを使用します。この変更では、次のパラメータを追加する必要があります。
use_tpu
train_batch_size
eval_batch_size
predict_batch_size
config
Estimator API
# Build 2 hidden layer DNN with 10, 10 units respectively.
classifier = tf.estimator.Estimator(
model_fn=my_model,
params={
'feature_columns': my_feature_columns,
# Two hidden layers of 10 nodes each.
'hidden_units': [10, 10],
# The model must choose between 3 classes.
'n_classes': 3,
})
TPUEstimator API
# Build 2 hidden layer DNN with 10, 10 units respectively.
classifier = tf.contrib.tpu.TPUEstimator(
model_fn=my_model,
use_tpu=FLAGS.use_tpu,
train_batch_size=FLAGS.batch_size,
eval_batch_size=FLAGS.batch_size,
predict_batch_size=FLAGS.batch_size,
config=run_config,
params={
# Name of the feature columns in the input data.
"feature_columns": my_feature_columns,
# Two hidden layers of 10 nodes each.
"hidden_units": [10, 10],
# The model must choose between 3 classes.
"n_classes": 3,
"use_tpu": FLAGS.use_tpu,
})
train メソッドを呼び出す
次の変更は、train メソッドの更新です。ラムダ関数を使用して、train_input_fn
関数を呼び出すことに注意してください。この方法論では、TPUEstimator API で既存の関数を簡単に使用できます。
また、steps パラメータを max_steps
に変更する必要があります。次のセクションでは、steps パラメータの用途を変えて、評価ステップの数を指定します。
Estimator API
# Train the Model.
classifier.train(
input_fn=lambda:iris_data.train_input_fn(
train_x, train_y, FLAGS.batch_size),
steps=FLAGS.train_steps)
TPUEstimator API
# Train the Model.
classifier.train(
input_fn=lambda params: iris_data.train_input_fn(
train_x, train_y, params["batch_size"]),
max_steps=FLAGS.train_steps)
評価メソッドを呼び出す
この変更は、train メソッドに加えた変更と似ています。ここでも、ラムダ関数を使用して、既存の評価入力関数を簡単に使用できるようにします。
また、steps
パラメータを、eval_steps
コマンドライン フラグで設定した値に変更する必要があります。
Estimator API
# Evaluate the model.
eval_result = classifier.evaluate(
input_fn=lambda:iris_data.eval_input_fn(
test_x, test_y, FLAGS.batch_size))
print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))
TPUEstimator API
# Evaluate the model.
eval_result = classifier.evaluate(
input_fn=lambda params: iris_data.eval_input_fn(
test_x, test_y, params["batch_size"]),
steps=FLAGS.eval_steps)
予測メソッドを呼び出す
train および evaluate メソッドと同様に、predict メソッドを更新する必要があります。 ここでも、ラムダ関数を使用して、既存の評価入力関数を簡単に使用できるようにします。
Estimator API
# Generate predictions from the model
predictions = classifier.predict(
input_fn=lambda: iris_data.eval_input_fn(
iris_data.PREDICTION_INPUT_DATA,
labels=None,
batch_size=FLAGS.batch_size))
for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')
class_id = pred_dict['class_ids'][0]
probability = pred_dict['probabilities'][class_id]
print(template.format(iris_data.SPECIES[class_id],
100 * probability, expec))
TPUEstimator API
# Generate predictions from the model
predictions = classifier.predict(
input_fn=lambda params: iris_data.predict_input_fn(
iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))
for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")
class_id = pred_dict["class_ids"][0]
probability = pred_dict["probabilities"][class_id]
print(template.format(iris_data.SPECIES[class_id],
100 * probability, expec))
クリーンアップ
このトピックで使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。
Compute Engine VM との接続を解除します。
(vm)$ exit
プロンプトが
username@projectname
に変わります。これは、現在、Cloud Shell 内にいることを示しています。Cloud Shell で、Cloud TPU の設定時に使用した --zone フラグを指定して
ctpu delete
を実行し、Compute Engine VM と Cloud TPU を削除します。$ ctpu delete [optional: --zone]
TPU の使用に対して不要な料金が発生しないように、
ctpu status
を実行してインスタンスが割り当てられていないことを確認します。削除には数分かかることがあります。次のようなレスポンスは、割り当てられたインスタンスがないことを示します。$ ctpu status --zone=europe-west4-a
2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a" No instances currently exist. Compute Engine VM: -- Cloud TPU: --
次に示すように
gsutil
を実行します。bucket-name の部分は、このチュートリアルで作成した Cloud Storage バケット名に置き換えてください。$ gsutil rm -r gs://bucket-name
次のステップ
Estimator API と TPUEstimator API の詳細については、次のトピックをご覧ください。
- Estimator API
- TPUEstimator API