Using distributed training

This page describes how to run distributed training jobs on Vertex AI.

Code requirements

Use an ML framework that supports distributed training. In your training code, you can use the CLUSTER_SPEC or TF_CONFIG environment variables to reference specific parts of your training cluster.

Structure of the training cluster

If you run a distributed training job with Vertex AI, you specify multiple machines (nodes) in a training cluster. The training service allocates the resources for the machine types you specify. Your running job on a given node is called a replica. A group of replicas with the same configuration is called a worker pool.

Each replica in the training cluster is given a single role or task in distributed training. For example:

  • Primary replica: Exactly one replica is designated the primary replica. This task manages the others and reports status for the job as a whole.

  • Worker(s): One or more replicas may be designated as workers. These replicas do their portion of the work as you designate in your job configuration.

  • Parameter server(s): If supported by your ML framework, one or more replicas may be designated as parameter servers. These replicas store model parameters and coordinate shared model state between the workers.

  • Evaluator(s): If supported by your ML framework, one or more replicas may be designated as evaluators. These replicas can be used to evaluate your model. If you are using TensorFlow, note that TensorFlow generally expects that you use no more than one evaluator.

Configuring a distributed training job

You can configure any custom training job as a distributed training job by defining multiple worker pools. You can also run distributed training within a training pipeline or a hyperparameter tuning job.

To configure a distributed training job, define your list of worker pools (workerPoolSpecs[]), designating one WorkerPoolSpec for each type of task:

Position in workerPoolSpecs[] Task performed in cluster
First (workerPoolSpecs[0]) Primary, chief, scheduler, or "master"
Second (workerPoolSpecs[1]) Secondary, replicas, workers
Third (workerPoolSpecs[2]) Parameter servers
Fourth (workerPoolSpecs[3]) Evaluators

You must specify a primary replica, which coordinates the work done by all the other replicas. Use the first worker pool specification only for your primary replica, and set its replicaCount to 1:

{
  "workerPoolSpecs": [
     // `WorkerPoolSpec` for worker pool 0, primary replica, required
     {
       "machineSpec": {...},
       "replicaCount": 1,
       "diskSpec": {...},
       ...
     },
     // `WorkerPoolSpec` for worker pool 1, optional
     {},
     // `WorkerPoolSpec` for worker pool 2, optional
     {},
     // `WorkerPoolSpec` for worker pool 3, optional
     {}
   ]
   ...
}

Specifying additional worker pools

Depending on your ML framework, you may specify additional worker pools for other purposes. For example, if you are using TensorFlow, you could specify worker pools to configure worker replicas, parameter server replicas, and evaluator replicas.

The order of the worker pools you specify in the workerPoolSpecs[] list determines the type of worker pool. Set empty values for worker pools that you don't want to use, so that you can skip them in the workerPoolSpecs[] list in order to specify worker pools that you do want to use. For example:

If you want to specify a job that has only a primary replica and a parameter server worker pool, you must set an empty value for worker pool 1:

{
  "workerPoolSpecs": [
     // `WorkerPoolSpec` for worker pool 0, required
     {
       "machineSpec": {...},
       "replicaCount": 1,
       "diskSpec": {...},
       ...
     },
     // `WorkerPoolSpec` for worker pool 1, optional
     {},
     // `WorkerPoolSpec` for worker pool 2, optional
     {
       "machineSpec": {...},
       "replicaCount": 1,
       "diskSpec": {...},
       ...
     },
     // `WorkerPoolSpec` for worker pool 3, optional
     {}
   ]
   ...
}

Environment variables for your cluster

Vertex AI populates an environment variable, CLUSTER_SPEC, on every replica to describe how the overall cluster is set up. Like TensorFlow's TF_CONFIG, CLUSTER_SPEC describes every replica in the cluster, including its index and role (primary replica, worker, parameter server, or evaluator).

When you run distributed training with TensorFlow, TF_CONFIG is parsed to build tf.train.ClusterSpec. Similarly, when you run distributed training with other ML frameworks, you must parse CLUSTER_SPEC to populate any environment variables or settings required by the framework.

The format of CLUSTER_SPEC

The CLUSTER_SPEC environment variable is a JSON string with the following format:

Key Description
"cluster"

The cluster description for your custom container. As with TF_CONFIG, this object is formatted as a TensorFlow cluster specification, and can be passed to the constructor of tf.train.ClusterSpec.

The cluster description contains a list of replica names for each worker pool you specify.

"workerpool0" All distributed training jobs have one primary replica in the first worker pool.
"workerpool1" This worker pool contains worker replicas, if you specified them when creating your job.
"workerpool2" This worker pool contains parameter servers, if you specified them when creating your job.
"workerpool3" This worker pool contains evaluators, if you specified them when creating your job.
"environment" The string cloud.
"task" Describes the task of the particular node on which your code is running. You can use this information to write code for specific workers in a distributed job. This entry is a dictionary with the following keys:
"type" The type of worker pool this task is running in. For example, "workerpool0" refers to the primary replica.
"index"

The zero-based index of the task. For example, if your training job includes two workers, this value is set to 0 on one of them and 1 on the other.

"trial" The identifier of the hyperparameter tuning trial currently running. When you configure hyperparameter tuning for your job, you set a number of trials to train. This value gives you a way to differentiate in your code between trials that are running. The identifier is a string value containing the trial number, starting at 1.
job

The CustomJobSpec that you provided to create the current training job, represented as a dictionary.

CLUSTER_SPEC example

Here is an example value:


{
   "cluster":{
      "workerpool0":[
         "cmle-training-workerpool0-ab-0:2222"
      ],
      "workerpool1":[
         "cmle-training-workerpool1-ab-0:2222",
         "cmle-training-workerpool1-ab-1:2222"
      ],
      "workerpool2":[
         "cmle-training-workerpool2-ab-0:2222",
         "cmle-training-workerpool2-ab-1:2222"
      ],
      "workerpool3":[
         "cmle-training-workerpool3-ab-0:2222",
         "cmle-training-workerpool3-ab-1:2222",
         "cmle-training-workerpool3-ab-2:2222"
      ]
   },
   "environment":"cloud",
   "task":{
      "type":"workerpool0",
      "index":0,
      "trial":"TRIAL_ID"
   },
   "job": {
      ...
   }
}

The format of TF_CONFIG

In addition to CLUSTER_SPEC, Vertex AI sets the TF_CONFIG environment variable on each replica of all distributed training jobs. Vertex AI does not set TF_CONFIG for single-replica training jobs.

CLUSTER_SPEC and TF_CONFIG share some values, but they have different formats. Both environment variables include additional fields beyond what TensorFlow requires.

Distributed training with TensorFlow works the same way when you use custom containers as when you use a pre-built container.

The TF_CONFIG environment variable is a JSON string with the following format:

TF_CONFIG fields
cluster

The TensorFlow cluster description. A dictionary mapping one or more task names (chief, worker, ps, or master) to lists of network addresses where these tasks are running. For a given training job, this dictionary is the same on every VM.

This is a valid first argument for the tf.train.ClusterSpec constructor. Note that this dictionary never contains evaluator as a key, since evaluators are not considered part of the training cluster even if you use them for your job.

task

The task description of the VM where this environment variable is set. For a given training job, this dictionary is different on every VM. You can use this information to customize what code runs on each VM in a distributed training job. You can also use it to change the behavior of your training code for different trials of a hyperparameter tuning job.

This dictionary includes the following key-value pairs:

task fields
type

The type of task that this VM is performing. This value is set to worker on workers, ps on parameter servers, and evaluator on evaluators. On your job's master worker, the value is set to either chief or master; learn more about the difference between the two in the chief versus master section of this document.

index

The zero-based index of the task. For example, if your training job includes two workers, this value is set to 0 on one of them and 1 on the other.

trial

The ID of the hyperparameter tuning trial currently running on this VM. This field is only set if the current training job is a hyperparameter tuning job.

For hyperparameter tuning jobs, Vertex AI runs your training code repeatedly in many trials with different hyperparameters each time. This field contains the current trial number, starting at 1 for the first trial.

cloud

An ID used internally by Vertex AI. You can ignore this field.

job

The CustomJobSpec that you provided to create the current training job, represented as a dictionary.

environment

The string cloud.

TF_CONFIG example

The following example code prints theTF_CONFIG environment variable to your training logs:

import json
import os

tf_config_str = os.environ.get('TF_CONFIG')
tf_config_dict  = json.loads(tf_config_str)

# Convert back to string just for pretty printing
print(json.dumps(tf_config_dict, indent=2))

In a hyperparameter tuning job that runs in runtime version 2.1 or later and uses a master worker, two workers, and a parameter server, this code produces the following log for one of the workers during the first hyperparameter tuning trial. The example output hides the job field for conciseness and replaces some IDs with generic values.

{
  "cluster": {
    "chief": [
      "training-workerpool0-[ID_STRING_1]-0:2222"
    ],
    "ps": [
      "training-workerpool2-[ID_STRING_1]-0:2222"
    ],
    "worker": [
      "training-workerpool1-[ID_STRING_1]-0:2222",
      "training-workerpool1-[ID_STRING_1]-1:2222"
    ]
  },
  "environment": "cloud",
  "job": {
    ...
  },
  "task": {
    "cloud": "[ID_STRING_2]",
    "index": 0,
    "trial": "1",
    "type": "worker"
  }
}

When to use TF_CONFIG

TF_CONFIG is set only for distributed training jobs.

You likely don't need to interact with the TF_CONFIG environment variable directly in your training code. Only access the the TF_CONFIG environment variable if TensorFlow's distribution strategies and Vertex AI's standard hyperparameter tuning workflow, both described in the next sections, do not work for your job.

Distributed training

Vertex AI sets the TF_CONFIG environment variable to extend the specifications that TensorFlow requires for distributed training.

To perform distributed training with TensorFlow, use the tf.distribute.Strategy API. In particular, we recommend that you use the Keras API together with the MultiWorkerMirroredStrategy or, if you specify parameter servers for your job, the ParameterServerStrategy. However, note that TensorFlow currently only provides experimental support for these strategies.

These distribution strategies use the TF_CONFIG environment variable to assign roles to each VM in your training job and to facilitate communication between the VMs. You do not need to access the TF_CONFIG environment variable directly in your training code, because TensorFlow handles it for you.

Only parse the TF_CONFIG environment variable directly if you want to customize how the different VMs running your training job behave.

Hyperparameter tuning

When you run a hyperparameter tuning job, Vertex AI provides different arguments to your training code for each trial. Your training code does not necessarily need to be aware of what trial is currently running. In addition, you can monitor the progress of hyperparameter tuning jobs in Cloud Console.

If needed, your code can read the current trial number from the trial field of the task field of the TF_CONFIG environment variable.

What's next