このページは Cloud Translation API によって翻訳されました。
Switch to English

Estimatorを使用したマルチワーカートレーニング

TensorFlow.orgで見る Google Colabで実行 GitHubでソースを表示 ノートブックをダウンロード

概観

このチュートリアルでは、 tf.distribute.Strategyを使用してマルチワーカーの分散トレーニングにtf.estimator使用する方法を示します。 tf.estimatorを使用してコードをtf.estimator 、高性能の単一マシンを超えたスケーリングに興味がある場合は、このチュートリアルがtf.estimatorます。

始める前に、 配布戦略ガイドをお読みください。このチュートリアルでは同じモデルを使用しているため、 マルチGPUトレーニングチュートリアルも関連しています。

セットアップ

まず、TensorFlowと必要なインポートをセットアップします。

 import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json
 

入力機能

このチュートリアルでは、 TensorFlow Datasetsの MNISTデータセットを使用します。ここのコードはマルチGPUトレーニングチュートリアルに似ていますが、重要な違いが1つあります。マルチワーカートレーニングにEstimatorを使用する場合、モデルの収束を確実にするために、データセットをワーカー数でシャーディングする必要があります。入力データはワーカーインデックスによって分割されるため、各ワーカーはデータセットの1/num_workers異なる部分を処理します。

 BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
 

収束を達成するための別の合理的なアプローチは、各ワーカーで異なるシードを使用してデータセットをシャッフルすることです。

マルチワーカー構成

このチュートリアルの主な違いの1つ( マルチGPUトレーニングチュートリアルと比較)は、マルチワーカーのセットアップです。 TF_CONFIG環境変数は、クラスターの一部である各ワーカーにクラスター構成を指定する標準的な方法です。

TF_CONFIG clustertask 2つのコンポーネントがありtaskclusterは、クラスター全体、つまりクラスター内のワーカーとパラメーターサーバーに関する情報を提供します。 taskは、現在のタスクに関する情報を提供します。最初のコンポーネントclusterclusterすべてのワーカーとパラメーターサーバーで同じであり、2番目のコンポーネントtaskは各ワーカーとパラメーターサーバーで異なり、独自のtypeindexを指定しindex 。この例では、タスクtypeworkerで、タスクindex0です。

説明のために、このチュートリアルでは、 localhostで2つのワーカーをTF_CONFIGしてTF_CONFIGを設定する方法を示します。実際には、外部IPアドレスとポートに複数のワーカーを作成し、各ワーカーにTF_CONFIGを適切に設定しindex 。つまり、タスクindex変更します。

 os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
 

モデルを定義する

レイヤー、オプティマイザ、およびトレーニング用の損失関数を記述します。このチュートリアルでは、 マルチGPUトレーニングチュートリアルと同様に、Kerasレイヤーを使用してモデルを定義します。

 LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))
 

MultiWorkerMirroredStrategy

モデルをトレーニングするには、 tf.distribute.experimental.MultiWorkerMirroredStrategyインスタンスを使用します。 MultiWorkerMirroredStrategyは、すべてのワーカーの各デバイスのモデルのレイヤーにすべての変数のコピーを作成します。集団通信のためのTensorFlow演算であるCollectiveOps使用して、勾配を集約し、変数の同期を維持します。 tf.distribute.Strategyガイドには、この戦略の詳細が記載されています。

 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

モデルのトレーニングと評価

次に、で流通戦略を指定しRunConfig推定のために、と起動することによって、訓練し、評価tf.estimator.train_and_evaluate 。このチュートリアルでは、 train_distributeを使用して戦略を指定することにより、トレーニングのみをtrain_distributeます。 eval_distributeeval_distributeて評価を配布することもできます。

 config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
 
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f7f002eedd8>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/ops/resource_variable_ops.py:1666: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/ops/resource_variable_ops.py:1666: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f7f00111b70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f7f00111b70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f7f00111b70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:loss = 2.3159704, step = 0

INFO:tensorflow:loss = 2.3159704, step = 0

INFO:tensorflow:global_step/sec: 182.998

INFO:tensorflow:global_step/sec: 182.998

INFO:tensorflow:loss = 2.3200626, step = 100 (0.549 sec)

INFO:tensorflow:loss = 2.3200626, step = 100 (0.549 sec)

INFO:tensorflow:global_step/sec: 198.007

INFO:tensorflow:global_step/sec: 198.007

INFO:tensorflow:loss = 2.2902875, step = 200 (0.505 sec)

INFO:tensorflow:loss = 2.2902875, step = 200 (0.505 sec)

INFO:tensorflow:global_step/sec: 200.163

INFO:tensorflow:global_step/sec: 200.163

INFO:tensorflow:loss = 2.2996945, step = 300 (0.500 sec)

INFO:tensorflow:loss = 2.2996945, step = 300 (0.500 sec)

INFO:tensorflow:global_step/sec: 199.776

INFO:tensorflow:global_step/sec: 199.776

INFO:tensorflow:loss = 2.3171215, step = 400 (0.501 sec)

INFO:tensorflow:loss = 2.3171215, step = 400 (0.501 sec)

INFO:tensorflow:global_step/sec: 198.755

INFO:tensorflow:global_step/sec: 198.755

INFO:tensorflow:loss = 2.2927647, step = 500 (0.503 sec)

INFO:tensorflow:loss = 2.2927647, step = 500 (0.503 sec)

INFO:tensorflow:global_step/sec: 192.239

INFO:tensorflow:global_step/sec: 192.239

INFO:tensorflow:loss = 2.3039918, step = 600 (0.520 sec)

INFO:tensorflow:loss = 2.3039918, step = 600 (0.520 sec)

INFO:tensorflow:global_step/sec: 193.256

INFO:tensorflow:global_step/sec: 193.256

INFO:tensorflow:loss = 2.2618923, step = 700 (0.517 sec)

INFO:tensorflow:loss = 2.2618923, step = 700 (0.517 sec)

INFO:tensorflow:global_step/sec: 228.038

INFO:tensorflow:global_step/sec: 228.038

INFO:tensorflow:loss = 2.2896879, step = 800 (0.438 sec)

INFO:tensorflow:loss = 2.2896879, step = 800 (0.438 sec)

INFO:tensorflow:global_step/sec: 709.957

INFO:tensorflow:global_step/sec: 709.957

INFO:tensorflow:loss = 2.2736495, step = 900 (0.141 sec)

INFO:tensorflow:loss = 2.2736495, step = 900 (0.141 sec)

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-07-23T01:31:43Z

INFO:tensorflow:Starting evaluation at 2020-07-23T01:31:43Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 1.08720s

INFO:tensorflow:Inference Time : 1.08720s

INFO:tensorflow:Finished evaluation at 2020-07-23-01:31:44

INFO:tensorflow:Finished evaluation at 2020-07-23-01:31:44

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2841425

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2841425

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1250379.

INFO:tensorflow:Loss for final step: 1.1250379.

({'loss': 2.2841425, 'global_step': 938}, [])

トレーニングパフォーマンスを最適化する

これで、 tf.distribute.Strategy利用したモデルとマルチワーカー対応のEstimatorができtf.distribute.Strategy 。次の手法を試して、マルチワーカートレーニングのパフォーマンスを最適化できます。

  • バッチサイズを増やす:ここで指定するバッチサイズはGPUごとです。一般に、GPUメモリに適合する最大のバッチサイズをお勧めします。
  • 変数のキャスト: tf.float 、変数をtf.floatキャストします。公式のResNetモデルにはこれを行う方法のが含まれています。
  • 集団通信の使用: MultiWorkerMirroredStrategyは、複数の集団通信の実装を提供します。

    • RINGは、gRPCをクロスホスト通信レイヤーとして使用して、リングベースの集合を実装します。
    • NCCLNvidiaのNCCLを使用して集合を実装します。
    • AUTOは選択をランタイムに委ねます。

    集合的な実装の最良の選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続に依存します。自動選択をオーバーライドするには、 MultiWorkerMirroredStrategyのコンストラクターのcommunicationパラメーターに有効な値を指定します(例: communication=tf.distribute.experimental.CollectiveCommunication.NCCL

TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールの詳細については、ガイドのパフォーマンスセクションをご覧ください。

その他のコード例

  1. Kubernetesテンプレートを使用したtensorflow / ecosystemでのマルチワーカートレーニングのエンドツーエンドの例 。この例は、 tf.keras.estimator.model_to_estimatorモデルから始まり、 tf.keras.estimator.model_to_estimator APIを使用してそれをEstimatorに変換します。
  2. 公式モデル 。その多くは、複数の配布戦略を実行するように構成できます。