Como migrar da API Estimator para a API TPUEstimator
Neste tutorial, descrevemos como converter um programa modelo que use a API Estimator em um que use a API TPUEstimator.
Aviso A API Qwiklabs só é compatível com o TensorFlow 1.x. Se você estiver escrevendo o modelo com o TensorFlow 2.x, use o Keras.
Informações gerais
Os programas modelo que usam a API TPUEstimator podem aproveitar ao máximo as Unidades de Processamento de Tensor (TPUs) e continuar compatíveis com CPUs e GPUs.
Depois de concluir este tutorial, você saberá:
- como converter seu código que usa a API Estimator para o uso com a API TPUEstimator;
- como executar previsões no Cloud TPU.
Antes de começar
Antes de começar o tutorial, verifique se o projeto do Google Cloud foi configurado corretamente.
Este tutorial usa componentes faturáveis do Google Cloud. Consulte a página de preços da Cloud TPU para fazer uma estimativa dos custos. Para evitar cobranças desnecessárias, não se esqueça de apagar os recursos criados ao terminar de usá-los.
Como configurar os recursos
Nesta seção, você verá como configurar os recursos de armazenamento, a VM e a Cloud TPU do Cloud Storage para tutoriais.
Crie um bucket do Cloud Storage
É preciso um bucket do Cloud Storage para armazenar os dados usados para treinar o modelo e os resultados do treinamento. O comando gcloud
usado neste tutorial
configura permissões padrão para a conta de serviço da Cloud TPU. Caso queira permissões mais específicas, consulte as permissões de nível de acesso.
O local do bucket precisa estar na mesma região da máquina virtual (VM) e do nó da TPU. As VMs e os nós da TPU estão localizados em zonas específicas, que são subdivisões dentro de uma região.
Acesse a página do Cloud Storage no console do Google Cloud.
Crie um novo bucket especificando as opções a seguir:
- Um nome exclusivo à sua escolha
- Selecione
Region
para o tipo de local eus-central1
para o local (zona) - Classe de armazenamento padrão:
Standard
- Local: especifique um local para o bucket na mesma região em que você planeja criar seu nó da TPU. Consulte tipos e zonas de TPU para saber onde vários tipos de TPU estão disponíveis.
Criar uma TPU e uma VM
Os recursos da TPU são compostos por uma máquina virtual (VM, na sigla em inglês) e um Cloud TPU com o mesmo nome. Eles precisam residir na mesma região/zona que o bucket recém-criado.
É possível configurar os recursos de VM e TPU usando comandos gcloud
ou o Console do Cloud. Para mais informações sobre como gerenciar recursos de TPU, consulte
Como criar e excluir TPUs.
Abra uma janela do Cloud Shell.
Configure o
gcloud
para usar seu projeto.$ gcloud config set project your-project
Projeto em que você quer criar o Cloud TPU.Use o comando
gcloud
para iniciar uma VM do Compute Engine e a Cloud TPU.$ gcloud compute tpus execution-groups create \ --name=tpu-name \ --zone=europe-west4-a \ --tf-version=2.12.0 \ --machine-type=n1-standard-1 \ --accelerator-type=v3-8
Descrições de sinalizações de comando
name
- O nome do Cloud TPU a ser criado.
zone
- A zona em que você planeja criar a Cloud TPU.
tf-version
- A versão do Tensorflow que o comando
gcloud
instala na sua VM. machine-type
- O tipo de máquina da VM do Compute Engine a ser criada.
accelerator-type
- O tipo do Cloud TPU a ser criado.
Para mais informações sobre o comando
gcloud
, consulte a referência da gcloud.Quando o comando
gcloud compute tpus execution-groups
terminar a execução, verifique se o prompt do shell foi alterado deusername@projectname
parausername@vm-name
. Essa alteração mostra que você fez login na VM do Compute Engine.gcloud compute ssh tpu-name --zone=europe-west4-a
Ao seguir essas instruções, execute cada comando iniciado por (vm)$
na janela de sessão da VM.
Instalar o pandas
Instale ou atualize o pandas com o seguinte comando:
pip install pandas
Definir hiperparâmetros
Nesta seção de código, você vai adicionar vários hiperparâmetros que as TPUs exigem. Adicione esses hiperparâmetros ao script de treinamento como sinalizadores, o que permite alterá-los no ambiente de execução.
Veja quais são esses parâmetros:
- tpu: este parâmetro identifica o nome ou endereço IP do nó da TPU na qual o modelo será executado.
- model_dir: o caminho para salvar os checkpoints do modelo. Esse caminho precisa ser um bucket do Cloud Storage.
- iterations: o número de iterações por loop de treinamento.
- use_tpu: especifica se você quer executar o modelo em TPUs ou GPUs/CPUs, com base na disponibilidade.
API Estimator
# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
default=50,
help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
default=1000,
help="Total number of training steps.")
FLAGS = tf.flags.FLAGS
API TPUEstimator
# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
"tpu", default=None,
help="The Cloud TPU to use for training. This should be the name used when "
"creating the Cloud TPU. To find out the name of TPU, either use command "
"'gcloud compute tpus list --zone=<zone-name>', or use "
"'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")
# Model specific parameters
tf.flags.DEFINE_string(
"model_dir", default="",
help="This should be the path of storage bucket which will be used as "
"model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
"batch_size", default=128,
help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
"train_steps", default=1000,
help="Total number of training steps.")
tf.flags.DEFINE_integer(
"eval_steps", default=4,
help="Total number of evaluation steps. If `0`, evaluation "
"after training is skipped.")
# TPU specific parameters.
tf.flags.DEFINE_bool(
"use_tpu", default=True,
help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
"iterations", default=500,
help="Number of iterations per TPU training loop.")
Como carregar os dados
Esta seção de código especifica como ler e carregar os dados.
Os TPUs são compatíveis com os seguintes tipos de dados:
tf.float32
tf.complex64
tf.int64
tf.bool
tf.bfloat64
API Estimator
def load_data(y_name='Species'):
"""Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
train_path, test_path = maybe_download()
train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
train_x, train_y = train, train.pop(y_name)
test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
test_x, test_y = test, test.pop(y_name)
return (train_x, train_y), (test_x, test_y)
API TPUEstimator
def load_data(y_name='Species'):
"""Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
train_path, test_path = maybe_download()
train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
dtype={'SepalLength': pd.np.float32,
'SepalWidth': pd.np.float32,
'PetalLength': pd.np.float32,
'PetalWidth': pd.np.float32,
'Species': pd.np.int32})
train_x, train_y = train, train.pop(y_name)
test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
dtype={'SepalLength': pd.np.float32,
'SepalWidth': pd.np.float32,
'PetalLength': pd.np.float32,
'PetalWidth': pd.np.float32,
'Species': pd.np.int32})
test_x, test_y = test, test.pop(y_name)
return (train_x, train_y), (test_x, test_y)
Definir as funções de entrada
Uma diferença fundamental entre a API Estimator e a API TPUEstimator é a assinatura de função das funções de entrada. Com a API Estimator, você pode escrever funções de entrada com qualquer quantidade de parâmetros. Com a API TPUEstimator, as funções de entrada podem levar apenas um único parâmetro, params
. Esse params
tem todos os pares de chave-valor do objeto do TPUEstimator, além de chaves extras, como batch_size
.
Uma maneira de resolver essa diferença é usar as funções lambda ao chamar as funções de entrada. Com as funções lambda, você precisa fazer apenas pequenas alterações nas funções de entrada existentes.
As seções a seguir demonstram como atualizar suas funções de entrada. Mais tarde, você verá como usar as funções lambda para converter essas funções de entrada para trabalhar com a API TPUEstimator.
Função de entrada de treinamento
Com a API TPUEstimator, sua função de entrada de treinamento, train_input_fn
, precisa retornar um número de amostras de entrada que podem ser fragmentadas pelo número de núcleos do Cloud TPU. Por exemplo, se você estiver usando 8 núcleos, o tamanho de cada lote deverá ser divisível por 8.
Para fazer isso, o código anterior usa a função dataset.batch(batch_size, drop_remainder=True)
. Essa função cria lotes usando o parâmetro batch_size
e descarta o restante.
API Estimator
def train_input_fn(features, labels, batch_size):
"""An input function for training"""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(1000).repeat().batch(batch_size)
# Return the dataset.
return dataset
API TPUEstimator
def train_input_fn(features, labels, batch_size):
"""An input function for training."""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.batch(batch_size, drop_remainder=True)
# Return the dataset.
return dataset
Função de entrada de avaliação
Nesta etapa, você atualiza a função de entrada de avaliação, eval_input_fn
, para garantir que as amostras de entrada possam ser fragmentadas pelo número de núcleos de TPU. Para fazer isso, use a função dataset.batch(batch_size, drop_remainder=True)
.
API Estimator
def eval_input_fn(features, labels, batch_size):
"""An input function for evaluation or prediction"""
features=dict(features)
if labels is None:
# No labels, use only features.
inputs = features
else:
inputs = (features, labels)
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices(inputs)
# Batch the examples
assert batch_size is not None, "batch_size must not be None"
dataset = dataset.batch(batch_size)
# Return the dataset.
return dataset
API TPUEstimator
def eval_input_fn(features, labels, batch_size):
"""An input function for evaluation."""
features = dict(features)
inputs = (features, labels)
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices(inputs)
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.batch(batch_size, drop_remainder=True)
# Return the dataset.
return dataset
Função de entrada de previsão
Para previsões em TPUEstimators, o conjunto de dados de entrada precisa ter tensores com a dimensão externa extra de batch_size
. Como resultado, você precisa adicionar uma função de entrada de previsão, que usa features
e batch_size
como parâmetros. Essa função permite ter um número de amostras de entrada menor que batch_size
.
Uma função de entrada de previsão será opcional se você estiver usando a API Estimator.
API Estimator
Uma função de entrada de predição é opcional para a API Estimador, porque a função de avaliação, eval_input_fn, executa essa tarefa.
API TPUEstimator
def predict_input_fn(features, batch_size):
"""An input function for prediction."""
dataset = tf.data.Dataset.from_tensor_slices(features)
dataset = dataset.batch(batch_size)
return dataset
Atualizar a função do modelo personalizado
Sua próxima tarefa é atualizar a função do modelo personalizado:
- Substitua as instâncias de
tf.estimator.EstimatorSpec
para usartf.contrib.tpu.TPUEstimatorSpec
- Remova todas as instâncias de
tf.summary
. A API TPUEstimator não é compatível com resumos personalizados para o TensorBoard. No entanto, os resumos básicos são gravados automaticamente em arquivos de eventos no diretório do modelo. - Encerre o otimizador usando
tf.contrib.tpu.CrossShardOptimizer
. OCrossShardOptimizer
usa umallreduce
para agregar gradientes e transmitir o resultado para cada fragmento. Como oCrossShardOptimizer
não é compatível com treinamento local, você também precisa verificar a sinalizaçãouse_tpu
.
API Estimator
def my_model(features, labels, mode, params):
"""DNN with three hidden layers, and dropout of 0.1 probability."""
# Create three fully connected layers each layer having a dropout
# probability of 0.1.
net = tf.feature_column.input_layer(features, params['feature_columns'])
for units in params['hidden_units']:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
# Compute logits (1 per class).
logits = tf.layers.dense(net, params['n_classes'], activation=None)
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
'class_ids': predicted_classes[:, tf.newaxis],
'probabilities': tf.nn.softmax(logits),
'logits': logits,
}
return tf.estimator.EstimatorSpec(mode, predictions=predictions)
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
logits=logits)
# Compute evaluation metrics.
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predicted_classes,
name='acc_op')
metrics = {'accuracy': accuracy}
tf.summary.scalar('accuracy', accuracy[1])
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode, loss=loss, eval_metric_ops=metrics)
# Create training op.
if mode == tf.estimator.ModeKeys.TRAIN
optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
API TPUEstimator
def my_model(features, labels, mode, params):
"""Deep Neural Network(DNN) model.
This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
10 neurons in each. And number of neurons in the last layer is equal to the
number of output classes. This is a densely connected network where each
neuron of previous layer is connected to each neuron of next layer.
Args:
features: Feature values for input samples.
labels: label/class assigned to the corresponding input sample.
mode: "TRAIN"/"EVAL"/"PREDICT"
params: Dictionary used to pass extra parameters to model function from
the main function.
Returns:
TPUEstimatorSpec object.
"""
# Create three fully connected layers.
net = tf.feature_column.input_layer(features, params["feature_columns"])
for units in params["hidden_units"]:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
# Compute logits (1 per class).
logits = tf.layers.dense(net, params["n_classes"], activation=None)
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {
"class_ids": predicted_classes[:, tf.newaxis],
"probabilities": tf.nn.softmax(logits),
"logits": logits,
}
return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
logits=logits)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.contrib.tpu.TPUEstimatorSpec(
mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))
# Create training op.
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
if FLAGS.use_tpu:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)
Adicionar uma função métrica de avaliação
Outra diferença entre a API Estimator e a API TPUEstimator é como elas lidam com métricas. Com a API Estimator, você pode passar métricas como um dicionário normal. Com a API TPUEstimator, você precisa usar uma função.
API Estimator
Opcional. A função my_model
gera as métricas.
API TPUEstimator
def metric_fn(labels, logits):
"""Function to return metrics for evaluation."""
predicted_classes = tf.argmax(logits, 1)
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predicted_classes,
name="acc_op")
return {"accuracy": accuracy}
Atualizar a função principal
Configurar TPUs
Nesta etapa, você configura o cluster da TPU.
Para configurar o cluster, use os valores atribuídos aos hiperparâmetros. Consulte Definir hiperparâmetros para ver mais informações. Além disso, é necessário definir os seguintes valores:
allow_soft_placement
. Quando definido como verdadeiro, esse parâmetro permite que o TensorFlow use um dispositivo de GPU se uma TPU não estiver disponível. Se um dispositivo de GPU também estiver indisponível, será usado um dispositivo de CPU.log_device_placement
. Indica que o TensorFlow deve registrar posicionamentos de dispositivo.
API Estimator
Não é obrigatória, já que essa seção de código afeta apenas as TPUs.
API TPUEstimator
# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
FLAGS.tpu)
run_config = tf.contrib.tpu.RunConfig(
model_dir=FLAGS.model_dir,
cluster=tpu_cluster_resolver,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=True),
tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)
Adicionar parâmetros específicos de TPU ao classificador
Nesta seção do código, atualize a variável do classificador para usar a classe TPUEstimator. Essa alteração exige que você adicione os seguintes parâmetros:
use_tpu
train_batch_size
eval_batch_size
predict_batch_size
config
API Estimator
# Build 2 hidden layer DNN with 10, 10 units respectively.
classifier = tf.estimator.Estimator(
model_fn=my_model,
params={
'feature_columns': my_feature_columns,
# Two hidden layers of 10 nodes each.
'hidden_units': [10, 10],
# The model must choose between 3 classes.
'n_classes': 3,
})
API TPUEstimator
# Build 2 hidden layer DNN with 10, 10 units respectively.
classifier = tf.contrib.tpu.TPUEstimator(
model_fn=my_model,
use_tpu=FLAGS.use_tpu,
train_batch_size=FLAGS.batch_size,
eval_batch_size=FLAGS.batch_size,
predict_batch_size=FLAGS.batch_size,
config=run_config,
params={
# Name of the feature columns in the input data.
"feature_columns": my_feature_columns,
# Two hidden layers of 10 nodes each.
"hidden_units": [10, 10],
# The model must choose between 3 classes.
"n_classes": 3,
"use_tpu": FLAGS.use_tpu,
})
Chamar o método de treinamento
A próxima mudança é atualizar o método de treinamento. Observe o uso de uma função lambda para chamar a função train_input_fn
. Essa metodologia facilita o uso de suas funções atuais com a API TPUEstimator.
Além disso, é preciso alterar o parâmetro de etapas para max_steps
. Na próxima seção, você redirecionará o parâmetro de etapas para especificar o número de etapas de avaliação.
API Estimator
# Train the Model.
classifier.train(
input_fn=lambda:iris_data.train_input_fn(
train_x, train_y, FLAGS.batch_size),
steps=FLAGS.train_steps)
API TPUEstimator
# Train the Model.
classifier.train(
input_fn=lambda params: iris_data.train_input_fn(
train_x, train_y, params["batch_size"]),
max_steps=FLAGS.train_steps)
Chamar o método de avaliação
Essa mudança é semelhante àquela que você fez no método de treinamento. Novamente, o uso de uma função lambda facilita o uso de uma função de entrada atual de avaliação.
Além disso, é preciso alterar o parâmetro steps
para o valor definido na sinalização de linha de comando eval_steps
.
API Estimator
# Evaluate the model.
eval_result = classifier.evaluate(
input_fn=lambda:iris_data.eval_input_fn(
test_x, test_y, FLAGS.batch_size))
print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))
API TPUEstimator
# Evaluate the model.
eval_result = classifier.evaluate(
input_fn=lambda params: iris_data.eval_input_fn(
test_x, test_y, params["batch_size"]),
steps=FLAGS.eval_steps)
Chamar o método de predição
Assim como ocorre nos métodos de treinamento e avaliação, você precisa atualizar o método de previsão. Novamente, o uso de uma função lambda facilita o uso de uma função de entrada atual de avaliação.
API Estimator
# Generate predictions from the model
predictions = classifier.predict(
input_fn=lambda: iris_data.eval_input_fn(
iris_data.PREDICTION_INPUT_DATA,
labels=None,
batch_size=FLAGS.batch_size))
for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')
class_id = pred_dict['class_ids'][0]
probability = pred_dict['probabilities'][class_id]
print(template.format(iris_data.SPECIES[class_id],
100 * probability, expec))
API TPUEstimator
# Generate predictions from the model
predictions = classifier.predict(
input_fn=lambda params: iris_data.predict_input_fn(
iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))
for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")
class_id = pred_dict["class_ids"][0]
probability = pred_dict["probabilities"][class_id]
print(template.format(iris_data.SPECIES[class_id],
100 * probability, expec))
Limpar
As etapas abaixo mostram como evitar cobranças na sua conta do GCP pelo uso de recursos.
Encerre a conexão com a VM do Compute Engine:
(vm)$ exit
Agora, o prompt precisa ser
username@projectname
, mostrando que você está no Cloud Shell.No Cloud Shell, execute
ctpu delete
com a sinalização --zone usada ao configurar a Cloud TPU para excluir a VM do Compute Engine e a Cloud TPU:$ ctpu delete [optional: --zone]
Execute
ctpu status
para garantir que não haja instâncias alocadas e evitar cobranças desnecessárias no uso da TPU. A exclusão pode levar vários minutos. Uma resposta como esta indica que não há mais instâncias alocadas:$ ctpu status --zone=europe-west4-a
2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a" No instances currently exist. Compute Engine VM: -- Cloud TPU: --
Execute
gsutil
conforme mostrado. Substitua bucket-name pelo nome do bucket do Cloud Storage criado para este tutorial:$ gsutil rm -r gs://bucket-name
A seguir
Para saber mais sobre as APIs Estimator e TPUEstimator, consulte os seguintes tópicos:
- API Estimator
- API CRL