Эта страница была переведа с помощью Cloud Translation API.
Switch to English

Обучение нескольких сотрудников с помощью Оценщика

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

В этом руководстве показано, как tf.distribute.Strategy можно использовать для распределенного обучения нескольких сотрудников с помощью tf.estimator . Если вы пишете код с помощью tf.estimator и заинтересованы в масштабировании за пределы одной машины с высокой производительностью, это руководство для вас.

Перед началом работы прочтите руководство по стратегии распространения . Учебное пособие по обучению работе с несколькими графическими процессорами также актуально, поскольку в этом руководстве используется та же модель.

Настроить

Сначала настройте TensorFlow и необходимый импорт.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json

Функция ввода

В этом руководстве используется набор данных MNIST из наборов данных TensorFlow . Код здесь аналогичен учебному руководству по обучению с несколькими графическими процессорами с одним ключевым отличием: при использовании Оценщика для обучения нескольких рабочих необходимо сегментировать набор данных по количеству рабочих, чтобы обеспечить сходимость модели. Входные данные сегментируются по рабочему индексу, так что каждый рабочий обрабатывает 1/num_workers отдельных частей набора данных.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Еще один разумный подход к достижению сходимости - перемешать набор данных с отдельными начальными числами для каждого рабочего.

Многопользовательская конфигурация

Одно из ключевых отличий в этом руководстве (по сравнению с учебным пособием с несколькими графическими процессорами ) - это настройка с несколькими рабочими. TF_CONFIG среды TF_CONFIG - это стандартный способ указать конфигурацию кластера для каждого рабочего, который является частью кластера.

TF_CONFIG из двух компонентов: cluster и task . cluster предоставляет информацию обо всем кластере, а именно о рабочих и серверах параметров в кластере. task предоставляет информацию о текущей задаче. Первый компонентный cluster и тот же для всех рабочих процессов и серверов параметров в кластере, а task второго компонента различна для каждого рабочего сервера и сервера параметров и задает свой собственный type и index . В этом примере type задачи - worker а index задачи - 0 .

В целях иллюстрации в этом руководстве показано, как установить TF_CONFIG с двумя TF_CONFIG на localhost TF_CONFIG . На практике вы должны создать несколько рабочих процессов на внешнем IP-адресе и порту и соответствующим образом установить TF_CONFIG для каждого рабочего, то есть изменить index задачи.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Определите модель

Напишите слои, оптимизатор и функцию потерь для обучения. В этом руководстве модель определяется со слоями Keras, как и в учебном пособии по работе с несколькими GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Для обучения модели используйте экземпляр tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих MultiWorkerMirroredStrategy . Он использует CollectiveOps , операцию TensorFlow для коллективного общения, чтобы агрегировать градиенты и синхронизировать переменные. В руководстве tf.distribute.Strategy есть более подробная информация об этой стратегии.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

Обучите и оцените модель

Затем укажите стратегию распространения в RunConfig для оценщика, а затем tf.estimator.train_and_evaluate и оцените, вызвав tf.estimator.train_and_evaluate . В этом руководстве распространяется только обучение, указав стратегию через train_distribute . Также возможно распространять оценку через eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f4c6c18af98>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1389045.

INFO:tensorflow:Loss for final step: 1.1389045.

({'loss': 2.276255, 'global_step': 938}, [])

Оптимизируйте тренировку

Теперь у вас есть модель и tf.distribute.Strategy способный работать с несколькими сотрудниками на базе tf.distribute.Strategy . Вы можете попробовать следующие методы, чтобы оптимизировать производительность обучения нескольких сотрудников:

  • Увеличьте размер пакета: здесь указан размер пакета для каждого графического процессора. В общем, рекомендуется использовать самый большой размер пакета, который подходит для памяти графического процессора.
  • tf.float переменных: если возможно, tf.float переменные к tf.float . Официальная модель ResNet включает пример того, как это можно сделать.
  • Используйте коллективное общение: MultiWorkerMirroredStrategy предоставляет несколько реализаций коллективного взаимодействия .

    • RING реализует группы на основе кольца, используя gRPC в качестве уровня связи между хостами.
    • NCCL использует NCCL Nvidia для реализации коллективов.
    • AUTO откладывает выбор на время выполнения.

    Наилучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также от сетевого соединения в кластере. Чтобы переопределить автоматический выбор, укажите допустимое значение параметра communication конструктора MultiWorkerMirroredStrategy , например, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Посетите раздел «Производительность» в руководстве, чтобы узнать больше о других стратегиях и инструментах, которые вы можете использовать для оптимизации производительности своих моделей TensorFlow.

Другие примеры кода

  1. Сквозной пример обучения нескольких сотрудников тензорному потоку / экосистеме с использованием шаблонов Kubernetes. Этот пример начинается с модели tf.keras.estimator.model_to_estimator и преобразует ее в tf.keras.estimator.model_to_estimator API tf.keras.estimator.model_to_estimator .
  2. Официальные модели , многие из которых можно настроить для запуска нескольких стратегий распространения.