分散トレーニングの詳細情報に TF_CONFIG を使用する

AI Platform は、トレーニング インスタンスに TF_CONFIG という環境変数を設定します。サービスとアプリケーションは、実行中に TF_CONFIG にあるトレーニング ジョブの詳細情報にアクセスできます。

このガイドでは、TF_CONFIG 環境変数にあるトレーニング ジョブの詳細情報にアクセスする方法について示します。この手法は分散トレーニング ジョブとハイパーパラメータ調整ジョブで便利です。これらのジョブでは、トレーニング アプリケーションとサービスとの間に特別な通信が必要です。

TensorFlow と TF_CONFIG

TF_CONFIG 環境変数が存在する場合、TensorFlow の Estimator API はこの環境変数を解析し、TF_CONFIG に含まれる関連する詳細情報を使用して、分散トレーニングのプロパティ(クラスタ仕様、タスク ID など)を作成します。

アプリケーションで分散トレーニングに tf.estimator を使用している場合は、AI Platform が TF_CONFIG を自動的に設定するので、プロパティが自動的にクラスタ仕様に伝播されます。

同様に、カスタム コンテナを使用して AI Platform で分散トレーニング アプリケーションを実行する場合も、AI Platform が TF_CONFIG を設定し、各マシンの環境変数 CLUSTER_SPEC に値を取り込みます。

TF_CONFIG の形式

TF_CONFIG 環境変数は、次の形式の JSON 文字列です。

キー 説明
"cluster" TensorFlow クラスタの説明。このオブジェクトの形式は TensorFlow クラスタ仕様として設定されるので、これを tf.train.ClusterSpec のコンストラクタに渡すことができます。
"task" コードが実行されている特定のノードのタスクを記述します。この情報を使用して、分散ジョブ内の特定のワーカー用のコードを記述できます。このエントリは、次のキーを持つ辞書です。
"type" このノードで実行されるタスクのタイプ。使用可能な値は、masterworkerps です。
"index" ゼロから開始するタスクのインデックス。ほとんどの分散トレーニング ジョブには単一のマスタータスク、1 つ以上のパラメータ サーバー、1 つ以上のワーカーがあります。
"trial" 現在実行中のハイパーパラメータ調整トライアルの ID。ジョブのハイパーパラメータ調整を構成する際に、トレーニングするトライアル数を設定します。この値により、コードで実行されているトライアルを識別できます。ID は、トライアル番号を含む文字列値で、1 から始まります。
"job" ジョブを開始したときに使用したジョブ パラメータ。ほとんどの場合、このエントリはコマンドライン引数を通じてアプリケーションに渡されるデータの複製であるため、無視できます。

TF_CONFIG の取得と分散クラスタ仕様の設定

次の例は、アプリケーションで TF_CONFIG の内容を取得する方法を示しています。

また、この例では、分散トレーニング時に TF_CONFIG を使用して tf.train.ClusterSpec を設定する方法も示しています。注: コードで TensorFlow コア API を使用している場合、必要な作業は tf.train.ClusterSpecTF_CONFIG から構築することだけです。tf.estimator を使用している場合、TensorFlow はこの変数を解析してクラスタ仕様を作成します。詳しくは、このページの TensorFlow と TF_CONFIG に関するセクションをご覧ください。

def train_and_evaluate(args):
  """Parse TF_CONFIG to cluster_spec and call run() method.

  TF_CONFIG environment variable is available when running using
  gcloud either locally or on cloud. It has all the information required
  to create a ClusterSpec which is important for running distributed code.

  Args:
    args (args): Input arguments.
  """

  tf_config = os.environ.get('TF_CONFIG')
  # If TF_CONFIG is not available run local.
  if not tf_config:
    return run(target='', cluster_spec=None, is_chief=True, args=args)

  tf_config_json = json.loads(tf_config)
  cluster = tf_config_json.get('cluster')
  job_name = tf_config_json.get('task', {}).get('type')
  task_index = tf_config_json.get('task', {}).get('index')

  # If cluster information is empty run local.
  if job_name is None or task_index is None:
    return run(target='', cluster_spec=None, is_chief=True, args=args)

  cluster_spec = tf.train.ClusterSpec(cluster)
  server = tf.train.Server(cluster_spec,
                           job_name=job_name,
                           task_index=task_index)

  # Wait for incoming connections forever.
  # Worker ships the graph to the ps server.
  # The ps server manages the parameters of the model.
  #
  # See a detailed video on distributed TensorFlow
  # https://www.youtube.com/watch?v=la_M6bCV91M
  if job_name == 'ps':
    server.join()
    return
  elif job_name in ['master', 'worker']:
    return run(server.target, cluster_spec, is_chief=(job_name == 'master'),
               args=args)

分散トレーニング用の device_filters の設定

大規模な分散トレーニングでは、トレーニング アプリケーションがマシン障害に対する復元性を備えていて、マシン間の通信が信頼できるものであることを確認する必要があります。

このためには、トレーニング アプリケーションの端末フィルタで、一部のワーカーがアクティブでなくてもマスターが機能するように設定する必要があります。device_filters を設定すると、マスターマシンとワーカーマシンの通信相手をパラメータ サーバーのみにすることができます。

以下の例は、tf.estimator ライブラリを使用する場合の device_filters の設定方法を示しています。

def _get_session_config_from_env_var():
    """Returns a tf.ConfigProto instance that has appropriate device_filters
    set."""

    tf_config = json.loads(os.environ.get('TF_CONFIG', '{}'))

    if (tf_config and 'task' in tf_config and 'type' in tf_config['task'] and
            'index' in tf_config['task']):
        # Master should only communicate with itself and ps
        if tf_config['task']['type'] == 'master':
            return tf.ConfigProto(device_filters=['/job:ps', '/job:master'])
        # Worker should only communicate with itself and ps
        elif tf_config['task']['type'] == 'worker':
            return tf.ConfigProto(device_filters=[
                '/job:ps',
                '/job:worker/task:%d' % tf_config['task']['index']
            ])
    return None

次に、以下に示すように、この session_configtf.estimator.RunConfig に渡します。

config = tf.estimator.RunConfig(session_config=_get_session_config_from_env_var())

Tensorflow 1.10 以降では、これは tf.estimator ライブラリにおいてデフォルトで設定されています。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

TensorFlow 用 AI Platform