This article provides an overview of a system developed by Quantiphi that uses AI Platform on Google Cloud to implement distributed machine learning model training using the TensorFlow Estimator API. The article describes the creation of the input pipeline, the implementation of the tower-based replicated model graph, and the use of estimator and experiment classes using the Estimator API.
This description covers a specific implementation, but the same blocks can be adapted for other model training events as well.
The following code blocks are used as part of the solution:
- An input function (using the TensorFlow Dataset API).
- A model function that is run simultaneously on all available GPUs for synchronous training.
- An estimator class (with a model function, run configuration, and model parameters as input).
- The train_and_evaluate class (with evaluation and training input functions, the estimator class, training steps, and other parameters as input).
The input function
The methods for training, evaluating, and predicting all take an input function that produces features and labels. The TensorFlow Dataset API is used to create a multithreaded queued input pipeline.
Dataset objects can be created using different input types. For example,
tf.data.Dataset.from_tensors can be used to create a dataset object from
in-memory tensors, or
TFRecord format can be used to save the data on disk and
load it into dataset object using
tf.data.TFRecordDataset. It's also possible
to use CSV files, which is what was used in the implementation described in this
tf.data.TextLineDataset. The current solution uses the CSV
format, because the dataset did not provide a constant feature length for each
data point, which is what's needed for
Using the input function pipeline
The input pipeline consists of the following steps. The parenthetical labels correspond to the diagram that follows.
- (CSV Decoding) The training CSV file is loaded, and file locations and labels are decoded.
- (Buffered Shuffle) The pipeline takes a sliding window of given size and produces shuffled data from it.
- (Feature Extraction) The dataset object is mapped with a user-defined
dataset.map, and the defined function loads the datafile from Cloud Storage and extracts features using the feature extractor class.
- (Repeat) The complete dataset is repeated for a given number of epochs
num_epochsis set to
None, the dataset is repeated indefinitely.
- (Padded Batching) A padded input batch is created using
Padded_Shapeparameter ensures that each of the inputs in a batch is the same shape. If
Noneis provided for any dimension in the
Padded_Shapeparameter, the maximum vector length among all the inputs in the batch and pads the other files in the batch accordingly is taken.
- (Iterator object). The iterator objects is created using
dataset.make_one_shot_iteratorto ultimately access elements from the dataset
- Batch features and labels are accessed and subsequently retrieved using
iterator.get_nextfor each GPU.
The following figure illustrates this sequence.
The model function
The model function (
model_fn) returns operations for training, evaluation, or
prediction, depending on which graph is being requested. Depending on the calls
made by estimators, the model function is called with different mode parameters.
For example, if the training method is called,
model_fn is called with the
mode=TRAIN. Therefore, different graphs for training, evaluation,
and prediction can all be written in a single
model_fn function in which
different operations are executed depending on the mode parameter.
The training and evaluation graph in our model function follows a distributed protocol (in-graph replication). The pipeline fetches a number of batches equal to the number of GPUs per machine, and that number of batches are pushed to separate GPUs. The batches are then passed through Quantiphi's Deep Neural Network (DNN) architecture, which calculates respective loss and parameter gradients for each GPU.
The calculated parameter gradients from each GPU are averaged and these gradients are then used to update the model's parameters.
The following diagram describes this flow.
The Estimator class
Estimator class is called by passing a model function, a run
configuration, and model parameters to the constructor. The syntax for
instantiating the estimator class is the following:
classifier = tf.estimator.Estimator(model_fn=model_fn,config,params)
When one of the
Estimator class methods is called (for example, a training,
evaluation, or prediction method), the estimator creates a TensorFlow
graph, sets up the input pipeline, and calls the
model_fn with the appropriate
mode parameter in order to create a graph representing our model.
The train_and_evaluate class
The core of TensorFlow distributed execution support is the
train_and_evaluate class, which groups the estimator with different input
functions for training and evaluation. The
function can be set up by passing it a classifier (which comes from the
are defined by passing an input function to them, along with other parameters
such as number of steps to run the input function for.
train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=train_steps) eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, steps=steps) tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)
tf.estimator.train_and_evalute picks up the cluster configuration
automatically and launches the model training based on the cluster specification
This section outlines pipelines for different approaches to training.
Hybrid training approach
In the hybrid training approach, tower-based average gradients and loss are calculated at each node, as shown in the following figure. The model architecture defined in the model function section follows this hybrid training approach.
Synchronous training approach
The approach used in this solution uses the synchronous training approach for training the model. The following figure illustrates the process.
The optimizer operation is wrapped into a
SyncReplicasOptimizer class with
the number of
replicas_to_aggregate parameter. This parameter defines how many
workers are present in the cluster, which defines how many nodes to aggregate
the parameters from.
The following code snippet shows this call is made.
if params.sync: optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=num_workers) sync_replicas_hook = optimizer.make_session_run_hook(params.is_chief) train_hooks.append(sync_replicas_hook)
Code testing (local version)
When all of the pieces of the code blocks are assembled, a test run should be
performed over the local machine using the following
gcloud command. The
values in brackets (for example,
[TRAIN_FILES_LOCATION]) are placeholders for
values that vary by context.
gcloud ml-engine local train \ --package-path trainer/ \ ## defining the code directory --module-name trainer.main \ ## the main script to start training --job-dir [JOB_DIR] \ ##job output path -- \ --train_files [TRAIN_FILES_LOCATION] \ --eval_files [EVAL_FILES_LOCATION] \ --train_batch_size 8 \ --eval_batch_size 8 \ --model_dir [CHECKPOINT_DIR_LOCATION] \ --train_steps 20000
The command has two parts—arguments for cloud machine learning, and the
arguments for the model. The two parts are separated by
The model-specific arguments generally contain the location of train and eval
files, batch size, and other arguments. The values in the example for
train_step and batch sizes are just examples; they are not
intended to be taken as ideal numbers for any experiment.
Job submission on AI Platform
After a local run has completed successfully, the job can be submitted over AI Platform by doing the following:
Defining the AI Platform cluster definition in the
config.yamlfile. For example:
trainingInput: scaleTier: CUSTOM masterType: standard_p100 workerType: standard_p100 parameterServerType: complex_model_m workerCount: 3 parameterServerCount: 1
In this configuration:
scaleTieris set to
CUSTOMto specify a custom cluster configuration instead of a predefined scale.
workerTypespecify the controller (master) and worker machine types. The value
standard_p100assigns a machine with 1 p100 GPU.
workerCountspecifies the number of workers to add into training.
parameterServerCountspecifies the number of parameter servers to add into training.
setup.pyfile that specifies the code dependencies. The following figure shows the setup file used in this solution.
The setup file must contain all of the code dependencies, such as included Python libraries and Ubuntu packages, so that the dependencies are installed when each AI Platform node starts.
Submitting a job. The following
gcloudcommand shows how to submit a job, with the parameters pointing to the training scripts, batch size, scale tier, and cluster configuration.
gcloud ml-engine jobs submit training $JOB_NAME \ --package-path trainer \ --module-name trainer.main \ --region us-central1 \ --job-dir [JOB_DIR] \ --scale-tier CUSTOM \ --config config.yaml \ --runtime-version 1.4 \ -- \ --train_files [TRAIN_FILES_LOCATION] \ --eval_files [EVAL_FILES_LOCATION] \ --train_batch_size 16 \ --eval_batch_size 16 \ --model_dir [CHECKPOINT_DIR_LOCATION] \ --train_steps 100000 \ --data_dir [DATA_LOCATION] \ --num_gpus 4 \ --num_workers 3
In this command:
--region us-central-1specifies the Google Cloud region where machine training starts.
--config config.yamlspecify the scale tier and configuration file created earlier.
--runtime-version 1.4specifies the TensorFlow version to use.
- The parameters that follow the
-- \line are passed to the main script.
- Quantiphi case study: A machine learning pipeline for virtual material testing
- Quantiphi case study: Automating metadata extraction for video commercials
- Explore reference architectures, diagrams, tutorials, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.