Como migrar da API Estimator para a API TPUEstimator

Neste tutorial, descrevemos como converter um programa modelo que use a API Estimator em um que use a API TPUEstimator.

Aviso A API Qwiklabs só é compatível com o TensorFlow 1.x. Se você estiver escrevendo o modelo com o TensorFlow 2.x, use o Keras.

Informações gerais

Os programas modelo que usam a API TPUEstimator podem aproveitar ao máximo as Unidades de Processamento de Tensor (TPUs) e continuar compatíveis com CPUs e GPUs.

Depois de concluir este tutorial, você saberá:

  • como converter seu código que usa a API Estimator para o uso com a API TPUEstimator;
  • como executar previsões no Cloud TPU.

Antes de começar

Antes de começar o tutorial, verifique se o projeto do Google Cloud foi configurado corretamente.

Este tutorial usa componentes faturáveis do Google Cloud. Consulte a página de preços da Cloud TPU para fazer uma estimativa dos custos. Para evitar cobranças desnecessárias, não se esqueça de apagar os recursos criados ao terminar de usá-los.

Como configurar os recursos

Nesta seção, você verá como configurar os recursos de armazenamento, a VM e a Cloud TPU do Cloud Storage para tutoriais.

Crie um bucket do Cloud Storage

É preciso um bucket do Cloud Storage para armazenar os dados usados para treinar o modelo e os resultados do treinamento. O comando gcloud usado neste tutorial configura permissões padrão para a conta de serviço da Cloud TPU. Caso queira permissões mais específicas, consulte as permissões de nível de acesso.

O local do bucket precisa estar na mesma região da máquina virtual (VM) e do nó da TPU. As VMs e os nós da TPU estão localizados em zonas específicas, que são subdivisões dentro de uma região.

  1. Acesse a página do Cloud Storage no console do Google Cloud.

    Acessar a página do Cloud Storage

  2. Crie um novo bucket especificando as opções a seguir:

    • Um nome exclusivo à sua escolha
    • Selecione Region para o tipo de local e us-central1 para o local (zona)
    • Classe de armazenamento padrão: Standard
    • Local: especifique um local para o bucket na mesma região em que você planeja criar seu nó da TPU. Consulte tipos e zonas de TPU para saber onde vários tipos de TPU estão disponíveis.

Criar uma TPU e uma VM

Os recursos da TPU são compostos por uma máquina virtual (VM, na sigla em inglês) e um Cloud TPU com o mesmo nome. Eles precisam residir na mesma região/zona que o bucket recém-criado.

É possível configurar os recursos de VM e TPU usando comandos gcloud ou o Console do Cloud. Para mais informações sobre como gerenciar recursos de TPU, consulte Como criar e excluir TPUs.

  1. Abra uma janela do Cloud Shell.

    Abra o Cloud Shell

  2. Configure o gcloud para usar seu projeto.

    $ gcloud config set project your-project
    
    Projeto em que você quer criar o Cloud TPU.

  3. Use o comando gcloud para iniciar uma VM do Compute Engine e a Cloud TPU.

    $ gcloud compute tpus execution-groups create \
     --name=tpu-name \
     --zone=europe-west4-a \
     --tf-version=2.12.0 \
     --machine-type=n1-standard-1 \
     --accelerator-type=v3-8
    

    Descrições de sinalizações de comando

    name
    O nome do Cloud TPU a ser criado.
    zone
    A zona em que você planeja criar a Cloud TPU.
    tf-version
    A versão do Tensorflow que o comando gcloud instala na sua VM.
    machine-type
    O tipo de máquina da VM do Compute Engine a ser criada.
    accelerator-type
    O tipo do Cloud TPU a ser criado.

    Para mais informações sobre o comando gcloud, consulte a referência da gcloud.

  4. Quando o comando gcloud compute tpus execution-groups terminar a execução, verifique se o prompt do shell foi alterado de username@projectname para username@vm-name. Essa alteração mostra que você fez login na VM do Compute Engine.

    gcloud compute ssh tpu-name --zone=europe-west4-a
    

Ao seguir essas instruções, execute cada comando iniciado por (vm)$ na janela de sessão da VM.

Instalar o pandas

Instale ou atualize o pandas com o seguinte comando:

pip install pandas

Definir hiperparâmetros

Nesta seção de código, você vai adicionar vários hiperparâmetros que as TPUs exigem. Adicione esses hiperparâmetros ao script de treinamento como sinalizadores, o que permite alterá-los no ambiente de execução.

Veja quais são esses parâmetros:

  • tpu: este parâmetro identifica o nome ou endereço IP do nó da TPU na qual o modelo será executado.
  • model_dir: o caminho para salvar os checkpoints do modelo. Esse caminho precisa ser um bucket do Cloud Storage.
  • iterations: o número de iterações por loop de treinamento.
  • use_tpu: especifica se você quer executar o modelo em TPUs ou GPUs/CPUs, com base na disponibilidade.

API Estimator

# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
    default=50,
    help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
    default=1000,
    help="Total number of training steps.")
FLAGS = tf.flags.FLAGS

API TPUEstimator

# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
    "tpu", default=None,
    help="The Cloud TPU to use for training. This should be the name used when "
    "creating the Cloud TPU. To find out the name of TPU, either use command "
    "'gcloud compute tpus list --zone=<zone-name>', or use "
    "'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")

# Model specific parameters
tf.flags.DEFINE_string(
    "model_dir", default="",
    help="This should be the path of storage bucket which will be used as "
    "model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
    "batch_size", default=128,
    help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
    "train_steps", default=1000,
    help="Total number of training steps.")
tf.flags.DEFINE_integer(
    "eval_steps", default=4,
    help="Total number of evaluation steps. If `0`, evaluation "
    "after training is skipped.")

# TPU specific parameters.
tf.flags.DEFINE_bool(
    "use_tpu", default=True,
    help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
    "iterations", default=500,
    help="Number of iterations per TPU training loop.")

Como carregar os dados

Esta seção de código especifica como ler e carregar os dados.

Os TPUs são compatíveis com os seguintes tipos de dados:

  • tf.float32
  • tf.complex64
  • tf.int64
  • tf.bool
  • tf.bfloat64

API Estimator

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

API TPUEstimator

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
                      dtype={'SepalLength': pd.np.float32,
                             'SepalWidth': pd.np.float32,
                             'PetalLength': pd.np.float32,
                             'PetalWidth': pd.np.float32,
                             'Species': pd.np.int32})
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
                     dtype={'SepalLength': pd.np.float32,
                            'SepalWidth': pd.np.float32,
                            'PetalLength': pd.np.float32,
                            'PetalWidth': pd.np.float32,
                            'Species': pd.np.int32})
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

Definir as funções de entrada

Uma diferença fundamental entre a API Estimator e a API TPUEstimator é a assinatura de função das funções de entrada. Com a API Estimator, você pode escrever funções de entrada com qualquer quantidade de parâmetros. Com a API TPUEstimator, as funções de entrada podem levar apenas um único parâmetro, params. Esse params tem todos os pares de chave-valor do objeto do TPUEstimator, além de chaves extras, como batch_size.

Uma maneira de resolver essa diferença é usar as funções lambda ao chamar as funções de entrada. Com as funções lambda, você precisa fazer apenas pequenas alterações nas funções de entrada existentes.

As seções a seguir demonstram como atualizar suas funções de entrada. Mais tarde, você verá como usar as funções lambda para converter essas funções de entrada para trabalhar com a API TPUEstimator.

Função de entrada de treinamento

Com a API TPUEstimator, sua função de entrada de treinamento, train_input_fn, precisa retornar um número de amostras de entrada que podem ser fragmentadas pelo número de núcleos do Cloud TPU. Por exemplo, se você estiver usando 8 núcleos, o tamanho de cada lote deverá ser divisível por 8.

Para fazer isso, o código anterior usa a função dataset.batch(batch_size, drop_remainder=True). Essa função cria lotes usando o parâmetro batch_size e descarta o restante.

API Estimator

def train_input_fn(features, labels, batch_size):
  """An input function for training"""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat().batch(batch_size)

  # Return the dataset.
  return dataset

API TPUEstimator

def train_input_fn(features, labels, batch_size):
  """An input function for training."""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat()

  dataset = dataset.batch(batch_size, drop_remainder=True)

  # Return the dataset.
  return dataset

Função de entrada de avaliação

Nesta etapa, você atualiza a função de entrada de avaliação, eval_input_fn, para garantir que as amostras de entrada possam ser fragmentadas pelo número de núcleos de TPU. Para fazer isso, use a função dataset.batch(batch_size, drop_remainder=True).

API Estimator

def eval_input_fn(features, labels, batch_size):
  """An input function for evaluation or prediction"""
  features=dict(features)
  if labels is None:
      # No labels, use only features.
      inputs = features
  else:
      inputs = (features, labels)

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices(inputs)

  # Batch the examples
  assert batch_size is not None, "batch_size must not be None"
  dataset = dataset.batch(batch_size)

  # Return the dataset.
  return dataset

API TPUEstimator

 def eval_input_fn(features, labels, batch_size):
    """An input function for evaluation."""
    features = dict(features)
    inputs = (features, labels)

    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices(inputs)
    dataset = dataset.shuffle(1000).repeat()

    dataset = dataset.batch(batch_size, drop_remainder=True)

    # Return the dataset.
    return dataset

Função de entrada de previsão

Para previsões em TPUEstimators, o conjunto de dados de entrada precisa ter tensores com a dimensão externa extra de batch_size. Como resultado, você precisa adicionar uma função de entrada de previsão, que usa features e batch_size como parâmetros. Essa função permite ter um número de amostras de entrada menor que batch_size.

Uma função de entrada de previsão será opcional se você estiver usando a API Estimator.

API Estimator

Uma função de entrada de predição é opcional para a API Estimador, porque a função de avaliação, eval_input_fn, executa essa tarefa.

API TPUEstimator

  def predict_input_fn(features, batch_size):
    """An input function for prediction."""

    dataset = tf.data.Dataset.from_tensor_slices(features)
    dataset = dataset.batch(batch_size)
    return dataset

Atualizar a função do modelo personalizado

Sua próxima tarefa é atualizar a função do modelo personalizado:

  • Substitua as instâncias de tf.estimator.EstimatorSpec para usar tf.contrib.tpu.TPUEstimatorSpec
  • Remova todas as instâncias de tf.summary. A API TPUEstimator não é compatível com resumos personalizados para o TensorBoard. No entanto, os resumos básicos são gravados automaticamente em arquivos de eventos no diretório do modelo.
  • Encerre o otimizador usando tf.contrib.tpu.CrossShardOptimizer. O CrossShardOptimizer usa um allreduce para agregar gradientes e transmitir o resultado para cada fragmento. Como o CrossShardOptimizer não é compatível com treinamento local, você também precisa verificar a sinalização use_tpu.

API Estimator

def my_model(features, labels, mode, params):
  """DNN with three hidden layers, and dropout of 0.1 probability."""

  # Create three fully connected layers each layer having a dropout
  # probability of 0.1.
  net = tf.feature_column.input_layer(features, params['feature_columns'])
  for units in params['hidden_units']:
      net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params['n_classes'], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
      predictions = {
          'class_ids': predicted_classes[:, tf.newaxis],
          'probabilities': tf.nn.softmax(logits),
          'logits': logits,
      }
      return tf.estimator.EstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  # Compute evaluation metrics.
  accuracy = tf.metrics.accuracy(labels=labels,
                                 predictions=predicted_classes,
                                 name='acc_op')
  metrics = {'accuracy': accuracy}
  tf.summary.scalar('accuracy', accuracy[1])
  if mode == tf.estimator.ModeKeys.EVAL:
      return tf.estimator.EstimatorSpec(
          mode, loss=loss, eval_metric_ops=metrics)

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN
      optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
      train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
      return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

API TPUEstimator

def my_model(features, labels, mode, params):
  """Deep Neural Network(DNN) model.

  This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
  10 neurons in each. And number of neurons in the last layer is equal to the
  number of output classes. This is a densely connected network where each
  neuron of previous layer is connected to each neuron of next layer.

  Args:
    features: Feature values for input samples.
    labels: label/class assigned to the corresponding input sample.
    mode: "TRAIN"/"EVAL"/"PREDICT"
    params: Dictionary used to pass extra parameters to model function from
      the main function.

  Returns:
    TPUEstimatorSpec object.

  """

  # Create three fully connected layers.
  net = tf.feature_column.input_layer(features, params["feature_columns"])
  for units in params["hidden_units"]:
    net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params["n_classes"], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        "class_ids": predicted_classes[:, tf.newaxis],
        "probabilities": tf.nn.softmax(logits),
        "logits": logits,
    }
    return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.contrib.tpu.TPUEstimatorSpec(
        mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN:
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    if FLAGS.use_tpu:
      optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

Adicionar uma função métrica de avaliação

Outra diferença entre a API Estimator e a API TPUEstimator é como elas lidam com métricas. Com a API Estimator, você pode passar métricas como um dicionário normal. Com a API TPUEstimator, você precisa usar uma função.

API Estimator

Opcional. A função my_model gera as métricas.

API TPUEstimator

  def metric_fn(labels, logits):
    """Function to return metrics for evaluation."""

    predicted_classes = tf.argmax(logits, 1)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predicted_classes,
                                   name="acc_op")
    return {"accuracy": accuracy}

Atualizar a função principal

Configurar TPUs

Nesta etapa, você configura o cluster da TPU.

Para configurar o cluster, use os valores atribuídos aos hiperparâmetros. Consulte Definir hiperparâmetros para ver mais informações. Além disso, é necessário definir os seguintes valores:

  • allow_soft_placement. Quando definido como verdadeiro, esse parâmetro permite que o TensorFlow use um dispositivo de GPU se uma TPU não estiver disponível. Se um dispositivo de GPU também estiver indisponível, será usado um dispositivo de CPU.
  • log_device_placement. Indica que o TensorFlow deve registrar posicionamentos de dispositivo.

API Estimator

Não é obrigatória, já que essa seção de código afeta apenas as TPUs.

API TPUEstimator

# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
    FLAGS.tpu)

run_config = tf.contrib.tpu.RunConfig(
    model_dir=FLAGS.model_dir,
    cluster=tpu_cluster_resolver,
    session_config=tf.ConfigProto(
        allow_soft_placement=True, log_device_placement=True),
    tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)

Adicionar parâmetros específicos de TPU ao classificador

Nesta seção do código, atualize a variável do classificador para usar a classe TPUEstimator. Essa alteração exige que você adicione os seguintes parâmetros:

  • use_tpu
  • train_batch_size
  • eval_batch_size
  • predict_batch_size
  • config

API Estimator

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.estimator.Estimator(
      model_fn=my_model,
      params={
          'feature_columns': my_feature_columns,
          # Two hidden layers of 10 nodes each.
          'hidden_units': [10, 10],
          # The model must choose between 3 classes.
          'n_classes': 3,
      })

API TPUEstimator

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.contrib.tpu.TPUEstimator(
      model_fn=my_model,
      use_tpu=FLAGS.use_tpu,
      train_batch_size=FLAGS.batch_size,
      eval_batch_size=FLAGS.batch_size,
      predict_batch_size=FLAGS.batch_size,
      config=run_config,
      params={
          # Name of the feature columns in the input data.
          "feature_columns": my_feature_columns,
          # Two hidden layers of 10 nodes each.
          "hidden_units": [10, 10],
          # The model must choose between 3 classes.
          "n_classes": 3,
          "use_tpu": FLAGS.use_tpu,
      })

Chamar o método de treinamento

A próxima mudança é atualizar o método de treinamento. Observe o uso de uma função lambda para chamar a função train_input_fn. Essa metodologia facilita o uso de suas funções atuais com a API TPUEstimator.

Além disso, é preciso alterar o parâmetro de etapas para max_steps. Na próxima seção, você redirecionará o parâmetro de etapas para especificar o número de etapas de avaliação.

API Estimator

  # Train the Model.
  classifier.train(
      input_fn=lambda:iris_data.train_input_fn(
          train_x, train_y, FLAGS.batch_size),
      steps=FLAGS.train_steps)

API TPUEstimator

  # Train the Model.
  classifier.train(
      input_fn=lambda params: iris_data.train_input_fn(
          train_x, train_y, params["batch_size"]),
      max_steps=FLAGS.train_steps)

Chamar o método de avaliação

Essa mudança é semelhante àquela que você fez no método de treinamento. Novamente, o uso de uma função lambda facilita o uso de uma função de entrada atual de avaliação.

Além disso, é preciso alterar o parâmetro steps para o valor definido na sinalização de linha de comando eval_steps.

API Estimator

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda:iris_data.eval_input_fn(
          test_x, test_y, FLAGS.batch_size))

  print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))

API TPUEstimator

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda params: iris_data.eval_input_fn(
          test_x, test_y, params["batch_size"]),
      steps=FLAGS.eval_steps)

Chamar o método de predição

Assim como ocorre nos métodos de treinamento e avaliação, você precisa atualizar o método de previsão. Novamente, o uso de uma função lambda facilita o uso de uma função de entrada atual de avaliação.

API Estimator

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda: iris_data.eval_input_fn(
          iris_data.PREDICTION_INPUT_DATA,
          labels=None,
          batch_size=FLAGS.batch_size))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
      template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')

      class_id = pred_dict['class_ids'][0]
      probability = pred_dict['probabilities'][class_id]

      print(template.format(iris_data.SPECIES[class_id],
                            100 * probability, expec))

API TPUEstimator

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda params: iris_data.predict_input_fn(
          iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
    template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")

    class_id = pred_dict["class_ids"][0]
    probability = pred_dict["probabilities"][class_id]

    print(template.format(iris_data.SPECIES[class_id],
                          100 * probability, expec))

Limpar

As etapas abaixo mostram como evitar cobranças na sua conta do GCP pelo uso de recursos.

  1. Encerre a conexão com a VM do Compute Engine:

    (vm)$ exit
    

    Agora, o prompt precisa ser username@projectname, mostrando que você está no Cloud Shell.

  2. No Cloud Shell, execute ctpu delete com a sinalização --zone usada ao configurar a Cloud TPU para excluir a VM do Compute Engine e a Cloud TPU:

    $ ctpu delete [optional: --zone]
    
  3. Execute ctpu status para garantir que não haja instâncias alocadas e evitar cobranças desnecessárias no uso da TPU. A exclusão pode levar vários minutos. Uma resposta como esta indica que não há mais instâncias alocadas:

    $ ctpu status --zone=europe-west4-a
    
    2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a"
    No instances currently exist.
        Compute Engine VM:     --
        Cloud TPU:             --
    
  4. Execute gsutil conforme mostrado. Substitua bucket-name pelo nome do bucket do Cloud Storage criado para este tutorial:

    $ gsutil rm -r gs://bucket-name
    

A seguir

Para saber mais sobre as APIs Estimator e TPUEstimator, consulte os seguintes tópicos: