Overview: Distributed training using TensorFlow Estimator APIs

By Quantiphi, Inc.


This article provides an overview of a system developed by Quantiphi that uses AI Platform on Google Cloud to implement distributed machine learning model training using the TensorFlow Estimator API. The article describes the creation of the input pipeline, the implementation of the tower-based replicated model graph, and the use of estimator and experiment classes using the Estimator API.

This description covers a specific implementation, but the same blocks can be adapted for other model training events as well.

Code blocks

The following code blocks are used as part of the solution:

  1. An input function (using the TensorFlow Dataset API).
  2. A model function that is run simultaneously on all available GPUs for synchronous training.
  3. An estimator class (with a model function, run configuration, and model parameters as input).
  4. The train_and_evaluate class (with evaluation and training input functions, the estimator class, training steps, and other parameters as input).

The input function

The methods for training, evaluating, and predicting all take an input function that produces features and labels. The TensorFlow Dataset API is used to create a multithreaded queued input pipeline.

Dataset objects can be created using different input types. For example, tf.data.Dataset.from_tensors can be used to create a dataset object from in-memory tensors, or TFRecord format can be used to save the data on disk and load it into dataset object using tf.data.TFRecordDataset. It's also possible to use CSV files, which is what was used in the implementation described in this article, using tf.data.TextLineDataset. The current solution uses the CSV format, because the dataset did not provide a constant feature length for each data point, which is what's needed for TFRecord.

Using the input function pipeline

The input pipeline consists of the following steps. The parenthetical labels correspond to the diagram that follows.

  1. (CSV Decoding) The training CSV file is loaded, and file locations and labels are decoded.
  2. (Buffered Shuffle) The pipeline takes a sliding window of given size and produces shuffled data from it.
  3. (Feature Extraction) The dataset object is mapped with a user-defined function using dataset.map, and the defined function loads the datafile from Cloud Storage and extracts features using the feature extractor class.
  4. (Repeat) The complete dataset is repeated for a given number of epochs using dataset.repeat(num_epochs). If num_epochs is set to None, the dataset is repeated indefinitely.
  5. (Padded Batching) A padded input batch is created using Dataset.padded_batch(Batch_Size,Padded_Shape). The Padded_Shape parameter ensures that each of the inputs in a batch is the same shape. If None is provided for any dimension in the Padded_Shape parameter, the maximum vector length among all the inputs in the batch and pads the other files in the batch accordingly is taken.
  6. (Iterator object). The iterator objects is created using dataset.make_one_shot_iterator to ultimately access elements from the dataset
  7. Batch features and labels are accessed and subsequently retrieved using iterator.get_next for each GPU.

The following figure illustrates this sequence.

The input function pipeline

The model function

The model function (model_fn) returns operations for training, evaluation, or prediction, depending on which graph is being requested. Depending on the calls made by estimators, the model function is called with different mode parameters. For example, if the training method is called, model_fn is called with the parameter mode=TRAIN. Therefore, different graphs for training, evaluation, and prediction can all be written in a single model_fn function in which different operations are executed depending on the mode parameter.

The training and evaluation graph in our model function follows a distributed protocol (in-graph replication). The pipeline fetches a number of batches equal to the number of GPUs per machine, and that number of batches are pushed to separate GPUs. The batches are then passed through Quantiphi's Deep Neural Network (DNN) architecture, which calculates respective loss and parameter gradients for each GPU.

The calculated parameter gradients from each GPU are averaged and these gradients are then used to update the model's parameters.

The following diagram describes this flow.

Updating the model parameters

The Estimator class

The Estimator class is called by passing a model function, a run configuration, and model parameters to the constructor. The syntax for instantiating the estimator class is the following:

classifier = tf.estimator.Estimator(model_fn=model_fn,config,params)

When one of the Estimator class methods is called (for example, a training, evaluation, or prediction method), the estimator creates a TensorFlow graph, sets up the input pipeline, and calls the model_fn with the appropriate mode parameter in order to create a graph representing our model.

The train_and_evaluate class

The core of TensorFlow distributed execution support is the train_and_evaluate class, which groups the estimator with different input functions for training and evaluation. The tf.estimator.train_and_evaluate function can be set up by passing it a classifier (which comes from the model_fn function), TrainSpec, and EvalSpec. TrainSpec and EvalSpec are defined by passing an input function to them, along with other parameters such as number of steps to run the input function for.

train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=train_steps)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, steps=steps)
tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)

The tf.estimator.train_and_evalute picks up the cluster configuration automatically and launches the model training based on the cluster specification it infers.

Training pipelines

This section outlines pipelines for different approaches to training.

Hybrid training approach

In the hybrid training approach, tower-based average gradients and loss are calculated at each node, as shown in the following figure. The model architecture defined in the model function section follows this hybrid training approach.

Hybrid training approach

Synchronous training approach

The approach used in this solution uses the synchronous training approach for training the model. The following figure illustrates the process.

Synchronous training approach

The optimizer operation is wrapped into a SyncReplicasOptimizer class with the number of replicas_to_aggregate parameter. This parameter defines how many workers are present in the cluster, which defines how many nodes to aggregate the parameters from.

The following code snippet shows this call is made.

if params.sync:
    optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=num_workers)
    sync_replicas_hook = optimizer.make_session_run_hook(params.is_chief)

Code testing (local version)

When all of the pieces of the code blocks are assembled, a test run should be performed over the local machine using the following gcloud command. The values in brackets (for example, [TRAIN_FILES_LOCATION]) are placeholders for values that vary by context.

gcloud ml-engine local train \
    --package-path trainer/ \    ## defining the code directory
    --module-name trainer.main \ ## the main script to start training
    --job-dir [JOB_DIR] \  ##job output path
    -- \
    --train_files [TRAIN_FILES_LOCATION] \
    --eval_files [EVAL_FILES_LOCATION] \
    --train_batch_size 8 \
    --eval_batch_size 8 \
    --model_dir [CHECKPOINT_DIR_LOCATION] \
    --train_steps 20000

The command has two parts—arguments for cloud machine learning, and the arguments for the model. The two parts are separated by -- \. The model-specific arguments generally contain the location of train and eval files, batch size, and other arguments. The values in the example for train_step and batch sizes are just examples; they are not intended to be taken as ideal numbers for any experiment.

Job submission on AI Platform

After a local run has completed successfully, the job can be submitted over AI Platform by doing the following:

  1. Defining the AI Platform cluster definition in the config.yaml file. For example:

    scaleTier: CUSTOM
    masterType: standard_p100
    workerType: standard_p100
    parameterServerType: complex_model_m
    workerCount: 3
    parameterServerCount: 1

    In this configuration:

    • scaleTier is set to CUSTOM to specify a custom cluster configuration instead of a predefined scale.
    • masterType and workerType specify the controller (master) and worker machine types. The value standard_p100 assigns a machine with 1 p100 GPU.
    • workerCount specifies the number of workers to add into training.
    • parameterServerCount specifies the number of parameter servers to add into training.
  2. Defining a setup.py file that specifies the code dependencies. The following figure shows the setup file used in this solution.

    Listing of setup.py file

    The setup file must contain all of the code dependencies, such as included Python libraries and Ubuntu packages, so that the dependencies are installed when each AI Platform node starts.

  3. Submitting a job. The following gcloud command shows how to submit a job, with the parameters pointing to the training scripts, batch size, scale tier, and cluster configuration.

    gcloud ml-engine jobs submit training $JOB_NAME \
        --package-path trainer \
        --module-name trainer.main \
        --region us-central1 \
        --job-dir [JOB_DIR] \
        --scale-tier CUSTOM \
        --config config.yaml \
        --runtime-version 1.4 \
        -- \
        --train_files [TRAIN_FILES_LOCATION] \
        --eval_files [EVAL_FILES_LOCATION] \
        --train_batch_size 16 \
        --eval_batch_size 16 \
        --model_dir [CHECKPOINT_DIR_LOCATION] \
        --train_steps 100000 \
        --data_dir [DATA_LOCATION] \
        --num_gpus 4 \
        --num_workers 3

    In this command:

    • --region us-central-1 specifies the Google Cloud region where machine training starts.
    • --scale-tier Custom and --config config.yaml specify the scale tier and configuration file created earlier.
    • --runtime-version 1.4 specifies the TensorFlow version to use.
    • The parameters that follow the -- \ line are passed to the main script.

What's next