Cómo migrar de la API de Estimator a la API de TPUEstimator

En este instructivo se describe cómo convertir un programa modelo que usa la API de Estimator en uno que use la API de TPUEstimator.

Advertencia: La API de TPUEstimator solo es compatible con Tensorflow 1.x. Si escribes tu modelo con Tensorflow 2.x, usa Keras en su lugar.

Descripción general

Los programas modelo que usan la API de TPUEstimator pueden aprovechar al máximo las unidades de procesamiento tensorial (TPU) y seguir siendo compatibles con las CPU y GPU.

Una vez que finalices este instructivo, sabrás:

  • Cómo convertir el código del programa que usa la API de Estimator en uno que use la API de TPUEstimator
  • Cómo ejecutar las predicciones en Cloud TPU

Antes de comenzar

Antes de comenzar este instructivo, verifica que tu proyecto de Google Cloud esté configurado correctamente.

En esta explicación, se usan componentes facturables de Google Cloud. Consulta la página de precios de Cloud TPU para calcular los costos. Asegúrate de limpiar los recursos que crees cuando hayas terminado de usarlos para evitar cargos innecesarios.

Configura tus recursos

En esta sección, se proporciona información sobre la configuración de los recursos de almacenamiento de Cloud Storage, VM y Cloud TPU para instructivos.

Cree un bucket de Cloud Storage

Necesitas un bucket de Cloud Storage a fin de almacenar los datos que usas para entrenar el modelo y los resultados del entrenamiento. El comando de gcloud que se usa en este instructivo configura los permisos predeterminados para la cuenta de servicio de Cloud TPU. Si quieres contar con permisos más detallados, revisa los permisos de nivel de acceso.

La ubicación del bucket debe estar en la misma región que tu máquina virtual (VM) y tu nodo TPU. Las VM y los nodos TPU se encuentran en zonas específicas, que son subdivisiones dentro de una región.

  1. Ve a la página de Cloud Storage en Google Cloud Console.

    Ve a la página de Cloud Storage

  2. Crea un bucket nuevo y especifica las siguientes opciones:

    • Un nombre único de tu elección
    • Selecciona Region en el tipo de ubicación y us-central1 en la ubicación (zona).
    • Clase de almacenamiento predeterminada: Standard
    • Ubicación: Especifica una ubicación de bucket en la misma región en la que planeas crear tu nodo TPU. Consulta la página sobre los tipos y zonas de TPU para saber dónde están disponibles distintos tipos de TPU

Crea una TPU y VM

Los recursos de TPU están compuestos por una máquina virtual (VM) y una Cloud TPU que tiene el mismo nombre. Estos recursos deben residir en la misma región o zona que el bucket que acabas de crear.

Puedes configurar tus recursos de VM y TPU con los comandos de gcloud o mediante Cloud Console. Si deseas obtener más información para administrar recursos de TPU, consulta Crea y borra TPU.

  1. Abre una ventana de Cloud Shell.

    Abra Cloud Shell

  2. Configura gcloud para usar tu proyecto.

    $ gcloud config set project your-project
    
    proyecto en el que deseas crear Cloud TPU.

  3. Inicia una VM de Compute Engine y Cloud TPU con el comando gcloud.

    $ gcloud compute tpus execution-groups create \
     --name=tpu-name \
     --zone=europe-west4-a \
     --tf-version=2.12.0 \
     --machine-type=n1-standard-1 \
     --accelerator-type=v3-8
    

    Descripciones de las marcas de comandos

    name
    El nombre de la Cloud TPU que se creará.
    zone
    Es la zona en la que deseas crear la Cloud TPU.
    tf-version
    La versión de Tensorflow el comando gcloud se instala en tu VM.
    machine-type
    El tipo de máquina de la VM de Compute Engine que se creará.
    accelerator-type
    El tipo de Cloud TPU que se creará.

    Para obtener más información sobre el comando de gcloud, consulta la Referencia de gcloud.

  4. Cuando el comando gcloud compute tpus execution-groups termine de ejecutarse, verifica que el indicador de shell haya cambiado de username@projectname a username@vm-name. Este cambio indica que accediste a tu VM de Compute Engine.

    gcloud compute ssh tpu-name --zone=europe-west4-a
    

Mientras sigues estas instrucciones, ejecuta cada comando que empiece con (vm)$ en la ventana de sesión de tu VM.

Instala Pandas

Escribe el siguiente comando para instalar o actualizar Pandas:

pip install pandas

Define los hiperparámetros

En esta sección de código, agrega los hiperparámetros que requieren las TPU. Agrega estos hiperparámetros como marcadores a la secuencia de comandos de entrenamiento, lo que te permite cambiarlos en el entorno de ejecución.

A continuación, se muestran los parámetros que debes agregar:

  • tpu. Este parámetro identifica el nombre o la dirección IP del nodo de TPU en el que se ejecuta el modelo.
  • model_dir. La ruta para guardar los puntos de control del modelo. Esta ruta debe ser un bucket de Cloud Storage.
  • iteraciones. La cantidad de iteraciones por ciclo de entrenamiento.
  • use_tpu. Especifica si deseas ejecutar el modelo en las TPU o GPU/CPU, según la disponibilidad.

API de Estimator

# Model specific parameters
tf.flags.DEFINE_integer("batch_size",
    default=50,
    help="Batch size.")
tf.flags.DEFINE_integer("train_steps",
    default=1000,
    help="Total number of training steps.")
FLAGS = tf.flags.FLAGS

API de TPUEstimator

# Cloud TPU Cluster Resolver flags
tf.flags.DEFINE_string(
    "tpu", default=None,
    help="The Cloud TPU to use for training. This should be the name used when "
    "creating the Cloud TPU. To find out the name of TPU, either use command "
    "'gcloud compute tpus list --zone=<zone-name>', or use "
    "'ctpu status --details' if you have created your Cloud TPU using 'ctpu up'.")

# Model specific parameters
tf.flags.DEFINE_string(
    "model_dir", default="",
    help="This should be the path of storage bucket which will be used as "
    "model_directory to export the checkpoints during training.")
tf.flags.DEFINE_integer(
    "batch_size", default=128,
    help="This is the global batch size and not the per-shard batch.")
tf.flags.DEFINE_integer(
    "train_steps", default=1000,
    help="Total number of training steps.")
tf.flags.DEFINE_integer(
    "eval_steps", default=4,
    help="Total number of evaluation steps. If `0`, evaluation "
    "after training is skipped.")

# TPU specific parameters.
tf.flags.DEFINE_bool(
    "use_tpu", default=True,
    help="True, if want to run the model on TPU. False, otherwise.")
tf.flags.DEFINE_integer(
    "iterations", default=500,
    help="Number of iterations per TPU training loop.")

Carga los datos

En esta sección de código, se especifica cómo leer y cargar los datos.

Las TPU admiten los siguientes tipos de datos:

  • tf.float32
  • tf.complex64
  • tf.int64
  • tf.bool
  • tf.bfloat64

API de Estimator

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0)
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0)
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

API de TPUEstimator

def load_data(y_name='Species'):
  """Returns the iris dataset as (train_x, train_y), (test_x, test_y)."""
  train_path, test_path = maybe_download()

  train = pd.read_csv(train_path, names=CSV_COLUMN_NAMES, header=0,
                      dtype={'SepalLength': pd.np.float32,
                             'SepalWidth': pd.np.float32,
                             'PetalLength': pd.np.float32,
                             'PetalWidth': pd.np.float32,
                             'Species': pd.np.int32})
  train_x, train_y = train, train.pop(y_name)

  test = pd.read_csv(test_path, names=CSV_COLUMN_NAMES, header=0,
                     dtype={'SepalLength': pd.np.float32,
                            'SepalWidth': pd.np.float32,
                            'PetalLength': pd.np.float32,
                            'PetalWidth': pd.np.float32,
                            'Species': pd.np.int32})
  test_x, test_y = test, test.pop(y_name)

  return (train_x, train_y), (test_x, test_y)

Define las funciones de entrada

La diferencia principal entre la API de Estimator y la API de TPUEstimator es la firma de función de las funciones de entrada. Con la API de Estimator, puedes escribir las funciones de entrada con cualquier cantidad de parámetros. Con la API de TPUEstimator, las funciones de entrada pueden usar un solo parámetro, params. Este params tiene todos los pares clave-valor del objeto TPUEstimator, junto con claves adicionales como batch_size.

Una forma de abordar esta diferencia es usar las funciones lambda cuando llamas a las funciones de entrada. Con las funciones lambda, solo debes realizar pequeños cambios a las funciones de entrada existentes.

En las siguientes secciones, se muestra cómo actualizar las funciones de entrada. Más adelante, verás cómo usar las funciones lambda para convertir estas funciones de entrada a fin de que funcionen con la API de TPUEstimator.

Función de entrada de entrenamiento

Con la API de TPUEstimator, tu función de entrada de entrenamiento, train_input_fn, debe mostrar cierta cantidad de ejemplos de entrada que se puedan fragmentar según la cantidad de núcleos de Cloud TPU. Por ejemplo, si usas 8 núcleos, cada tamaño del lote debe poder dividirse por 8.

Para lograr esto, el código anterior usa la función dataset.batch(batch_size, drop_remainder=True). Esta función agrupa en lotes con el parámetro batch_size y descarta el resto.

API de Estimator

def train_input_fn(features, labels, batch_size):
  """An input function for training"""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat().batch(batch_size)

  # Return the dataset.
  return dataset

API de TPUEstimator

def train_input_fn(features, labels, batch_size):
  """An input function for training."""

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))

  # Shuffle, repeat, and batch the examples.
  dataset = dataset.shuffle(1000).repeat()

  dataset = dataset.batch(batch_size, drop_remainder=True)

  # Return the dataset.
  return dataset

Función de entrada de evaluación

En este paso, actualiza la función de entrada de evaluación, eval_input_fn, para asegurarte de que las ejemplos de entrada pueden fragmentarse por el número de núcleos de TPU. Para lograrlo, usa la función dataset.batch(batch_size, drop_remainder=True).

API de Estimator

def eval_input_fn(features, labels, batch_size):
  """An input function for evaluation or prediction"""
  features=dict(features)
  if labels is None:
      # No labels, use only features.
      inputs = features
  else:
      inputs = (features, labels)

  # Convert the inputs to a Dataset.
  dataset = tf.data.Dataset.from_tensor_slices(inputs)

  # Batch the examples
  assert batch_size is not None, "batch_size must not be None"
  dataset = dataset.batch(batch_size)

  # Return the dataset.
  return dataset

API de TPUEstimator

 def eval_input_fn(features, labels, batch_size):
    """An input function for evaluation."""
    features = dict(features)
    inputs = (features, labels)

    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices(inputs)
    dataset = dataset.shuffle(1000).repeat()

    dataset = dataset.batch(batch_size, drop_remainder=True)

    # Return the dataset.
    return dataset

Función de entrada de predicción

Para las predicciones en TPUEstimators, el conjunto de datos de entrada debe tener tensores con la dimensión externa adicional de batch_size. Como resultado, debes agregar una función de entrada de predicción, que toma features y batch_size como parámetros. Esta función te permite tener menos ejemplos de entrada que batch_size.

Si estás usando la API de Estimator, la función de entrada de predicción es opcional.

API de Estimator

La función de entrada de predicción es opcional para la API de Estimator, ya que la función de evaluación eval_input_fn realiza esta tarea.

API de TPUEstimator

  def predict_input_fn(features, batch_size):
    """An input function for prediction."""

    dataset = tf.data.Dataset.from_tensor_slices(features)
    dataset = dataset.batch(batch_size)
    return dataset

Actualiza la función de modelo personalizado

El siguiente paso es actualizar la función de modelo personalizado:

  • Reemplaza las instancias de tf.estimator.EstimatorSpec para usar tf.contrib.tpu.TPUEstimatorSpec.
  • Quita cualquier instancia de tf.summary. La API de TPUEstimator no admite resúmenes personalizados de TensorBoard. Sin embargo, los resúmenes básicos se registran de forma automática en los archivos de eventos dentro del directorio del modelo.
  • Une el optimizador mediante tf.contrib.tpu.CrossShardOptimizer. El CrossShardOptimizer usa un allreduce para agregar gradientes y transmitir el resultado a cada fragmentación. Como CrossShardOptimizer no es compatible con el entrenamiento local, también debes verificar la marca use_tpu.

API de Estimator

def my_model(features, labels, mode, params):
  """DNN with three hidden layers, and dropout of 0.1 probability."""

  # Create three fully connected layers each layer having a dropout
  # probability of 0.1.
  net = tf.feature_column.input_layer(features, params['feature_columns'])
  for units in params['hidden_units']:
      net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params['n_classes'], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
      predictions = {
          'class_ids': predicted_classes[:, tf.newaxis],
          'probabilities': tf.nn.softmax(logits),
          'logits': logits,
      }
      return tf.estimator.EstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  # Compute evaluation metrics.
  accuracy = tf.metrics.accuracy(labels=labels,
                                 predictions=predicted_classes,
                                 name='acc_op')
  metrics = {'accuracy': accuracy}
  tf.summary.scalar('accuracy', accuracy[1])
  if mode == tf.estimator.ModeKeys.EVAL:
      return tf.estimator.EstimatorSpec(
          mode, loss=loss, eval_metric_ops=metrics)

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN
      optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
      train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
      return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

API de TPUEstimator

def my_model(features, labels, mode, params):
  """Deep Neural Network(DNN) model.

  This is a DNN Model with 3 hidden layers. First 2 hidden layers are having
  10 neurons in each. And number of neurons in the last layer is equal to the
  number of output classes. This is a densely connected network where each
  neuron of previous layer is connected to each neuron of next layer.

  Args:
    features: Feature values for input samples.
    labels: label/class assigned to the corresponding input sample.
    mode: "TRAIN"/"EVAL"/"PREDICT"
    params: Dictionary used to pass extra parameters to model function from
      the main function.

  Returns:
    TPUEstimatorSpec object.

  """

  # Create three fully connected layers.
  net = tf.feature_column.input_layer(features, params["feature_columns"])
  for units in params["hidden_units"]:
    net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

  # Compute logits (1 per class).
  logits = tf.layers.dense(net, params["n_classes"], activation=None)

  # Compute predictions.
  predicted_classes = tf.argmax(logits, 1)
  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        "class_ids": predicted_classes[:, tf.newaxis],
        "probabilities": tf.nn.softmax(logits),
        "logits": logits,
    }
    return tf.contrib.tpu.TPUEstimatorSpec(mode, predictions=predictions)

  # Compute loss.
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,
                                                logits=logits)

  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.contrib.tpu.TPUEstimatorSpec(
        mode=mode, loss=loss, eval_metrics=(metric_fn, [labels, logits]))

  # Create training op.
  if mode == tf.estimator.ModeKeys.TRAIN:
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    if FLAGS.use_tpu:
      optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

Agrega una función de métrica de evaluación

Otra diferencia entre la API de Estimator y la API de TPUEstimator es la forma en que controlan las métricas. Con la API de Estimator, puedes pasar las métricas como un diccionario normal. En cambio, para la API de TPUEstimator, debes usar una función.

API de Estimator

Opcional. La función my_model genera las métricas.

API de TPUEstimator

  def metric_fn(labels, logits):
    """Function to return metrics for evaluation."""

    predicted_classes = tf.argmax(logits, 1)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predicted_classes,
                                   name="acc_op")
    return {"accuracy": accuracy}

Actualiza la función principal

Configura las TPU

En este paso, configura el clúster de TPU.

Para configurar el clúster, puedes usar los valores asignados a los hiperparámetros. Consulta Definir los hiperparámetros para obtener más información. Además, debes establecer los siguientes valores:

  • allow_soft_placement. Cuando se configura como “true”, este parámetro permite que TensorFlow use un dispositivo de GPU en el caso de que una TPU no esté disponible. Si un dispositivo de GPU no está disponible, se usa un dispositivo de CPU.
  • log_device_placement. Indica que TensorFlow debe registrar las posiciones del dispositivo.

API de Estimator

No se requiere, ya que este código solo afecta a las TPU.

API de TPUEstimator

# Resolve TPU cluster and runconfig for this.
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
    FLAGS.tpu)

run_config = tf.contrib.tpu.RunConfig(
    model_dir=FLAGS.model_dir,
    cluster=tpu_cluster_resolver,
    session_config=tf.ConfigProto(
        allow_soft_placement=True, log_device_placement=True),
    tpu_config=tf.contrib.tpu.TPUConfig(FLAGS.iterations),
)

Agrega parámetros específicos de TPU al clasificador

En esta sección de código, actualiza la variable del clasificador para usar la clase TPUEstimator. Este cambio requiere que agregues los siguientes parámetros:

  • use_tpu
  • train_batch_size
  • eval_batch_size
  • predict_batch_size
  • config

API de Estimator

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.estimator.Estimator(
      model_fn=my_model,
      params={
          'feature_columns': my_feature_columns,
          # Two hidden layers of 10 nodes each.
          'hidden_units': [10, 10],
          # The model must choose between 3 classes.
          'n_classes': 3,
      })

API de TPUEstimator

  # Build 2 hidden layer DNN with 10, 10 units respectively.
  classifier = tf.contrib.tpu.TPUEstimator(
      model_fn=my_model,
      use_tpu=FLAGS.use_tpu,
      train_batch_size=FLAGS.batch_size,
      eval_batch_size=FLAGS.batch_size,
      predict_batch_size=FLAGS.batch_size,
      config=run_config,
      params={
          # Name of the feature columns in the input data.
          "feature_columns": my_feature_columns,
          # Two hidden layers of 10 nodes each.
          "hidden_units": [10, 10],
          # The model must choose between 3 classes.
          "n_classes": 3,
          "use_tpu": FLAGS.use_tpu,
      })

Llama al método de entrenamiento

El siguiente cambio es actualizar el método de entrenamiento. Ten en cuenta el uso de una función lambda para llamar a la función train_input_fn. Esta metodología facilita el uso de las funciones existentes con la API de TPUEstimator.

Además, debes cambiar el parámetro de pasos por max_steps. En la siguiente sección, volverás a usar el parámetro de pasos para especificar la cantidad de pasos de evaluación.

API de Estimator

  # Train the Model.
  classifier.train(
      input_fn=lambda:iris_data.train_input_fn(
          train_x, train_y, FLAGS.batch_size),
      steps=FLAGS.train_steps)

API de TPUEstimator

  # Train the Model.
  classifier.train(
      input_fn=lambda params: iris_data.train_input_fn(
          train_x, train_y, params["batch_size"]),
      max_steps=FLAGS.train_steps)

Llama al método de evaluación

Este cambio es similar al que realizaste en el método de entrenamiento. Una vez más, el uso de una función lambda facilita el uso de una función de entrada de evaluación existente.

Además, debes cambiar el parámetro steps al valor configurado de la marca de línea de comandos eval_steps.

API de Estimator

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda:iris_data.eval_input_fn(
          test_x, test_y, FLAGS.batch_size))

  print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))

API de TPUEstimator

  # Evaluate the model.
  eval_result = classifier.evaluate(
      input_fn=lambda params: iris_data.eval_input_fn(
          test_x, test_y, params["batch_size"]),
      steps=FLAGS.eval_steps)

Llama al método de predicción

Al igual que con los métodos de entrenamiento y evaluación, debes actualizar el método de predicción. Una vez más, el uso de una función lambda facilita el uso de una función de entrada de evaluación existente.

API de Estimator

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda: iris_data.eval_input_fn(
          iris_data.PREDICTION_INPUT_DATA,
          labels=None,
          batch_size=FLAGS.batch_size))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
      template = ('\nPrediction is "{}" ({:.1f}%), expected "{}"')

      class_id = pred_dict['class_ids'][0]
      probability = pred_dict['probabilities'][class_id]

      print(template.format(iris_data.SPECIES[class_id],
                            100 * probability, expec))

API de TPUEstimator

  # Generate predictions from the model
  predictions = classifier.predict(
      input_fn=lambda params: iris_data.predict_input_fn(
          iris_data.PREDICTION_INPUT_DATA, params["batch_size"]))

  for pred_dict, expec in zip(predictions, iris_data.PREDICTION_OUTPUT_DATA):
    template = ("\nPrediction is \"{}\" ({:.1f}%), expected \"{}\"")

    class_id = pred_dict["class_ids"][0]
    probability = pred_dict["probabilities"][class_id]

    print(template.format(iris_data.SPECIES[class_id],
                          100 * probability, expec))

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de GCP por los recursos usados en este tema, realiza los siguientes pasos:

  1. Desconéctate de la VM de Compute Engine:

    (vm)$ exit
    

    El mensaje ahora debería mostrar username@projectname, que indica que estás en Cloud Shell.

  2. En Cloud Shell, ejecuta ctpu delete con la marca --zone que usaste cuando configuraste la Cloud TPU para borrar la VM de Compute Engine y la Cloud TPU:

    $ ctpu delete [optional: --zone]
    
  3. Ejecuta ctpu status para asegurarte de no tener instancias asignadas y así evitar cargos innecesarios por el uso de TPU. La eliminación puede tomar varios minutos. Una respuesta como la que se muestra a continuación indica que no hay más instancias asignadas:

    $ ctpu status --zone=europe-west4-a
    
    2018/04/28 16:16:23 WARNING: Setting zone to "--zone=europe-west4-a"
    No instances currently exist.
        Compute Engine VM:     --
        Cloud TPU:             --
    
  4. Ejecuta gsutil como se muestra y reemplaza bucket-name por el nombre del bucket de Cloud Storage que creaste para este instructivo:

    $ gsutil rm -r gs://bucket-name
    

Próximos pasos

Para obtener más información sobre las API de Estimator y TPUEstimator, consulta los siguientes temas: