Эта страница была переведа с помощью Cloud Translation API.
Switch to English

Обучение нескольких рабочих с Keras

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть источник на GitHub Скачать блокнот

обзор

В этом учебном пособии демонстрируется многоуровневое распределенное обучение с использованием модели tf.distribute.Strategy с использованием API tf.distribute.Strategy , в частности tf.distribute.experimental.MultiWorkerMirroredStrategy . С помощью этой стратегии модель Keras, которая была разработана для работы на одном работнике, может беспрепятственно работать на нескольких работниках с минимальным изменением кода.

Руководство по распределенному обучению в TensorFlow доступно для ознакомления со стратегиями распространения, которые поддерживает TensorFlow, для тех, кто заинтересован в более глубоком понимании API-интерфейсов tf.distribute.Strategy .

Настроить

Сначала настройте TensorFlow и необходимый импорт.

 import os
import tensorflow as tf
import numpy as np
 

Подготовка набора данных

Теперь давайте подготовим набор данных MNIST. Набор данных MNIST содержит 60 000 обучающих примеров и 10 000 тестовых примеров рукописных цифр 0–9, отформатированных как монохромные изображения размером 28x28 пикселей. В этом примере мы возьмем обучающую часть наборов данных для демонстрации.

 def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset
 

Построить модель Keras

Здесь мы используем tf.keras.Sequential API для построения и компиляции простой модели сверточных нейронных сетей Keras для обучения с нашим набором данных MNIST.

 def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
 

Давайте сначала попробуем обучить модель для небольшого числа эпох и проследить результаты в одном работнике, чтобы убедиться, что все работает правильно. Вы должны ожидать снижения потерь и точности, приближающейся к 1,0, по мере продвижения эпохи.

 per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 0s 2ms/step - loss: 2.2701 - accuracy: 0.2451
Epoch 2/3
70/70 [==============================] - 0s 2ms/step - loss: 2.1827 - accuracy: 0.4777
Epoch 3/3
70/70 [==============================] - 0s 2ms/step - loss: 2.0865 - accuracy: 0.5955

<tensorflow.python.keras.callbacks.History at 0x7fc59381ac50>

Конфигурация нескольких рабочих

Теперь давайте войдем в мир обучения нескольких работников. В TF_CONFIG переменная окружения TF_CONFIG требуется для обучения на нескольких машинах, каждая из которых, возможно, имеет различную роль. TF_CONFIG - это строка JSON, используемая для указания конфигурации кластера на каждом работнике, который является частью кластера.

Есть два компонента TF_CONFIG : cluster и task . cluster предоставляет информацию об обучающем кластере, который является диктатом, состоящим из различных типов заданий, таких как worker . При обучении нескольких работников с помощью MultiWorkerMirroredStrategy обычно есть один worker который берет на себя немного больше ответственности, например, сохраняет контрольную точку и записывает сводный файл для TensorBoard в дополнение к тому, что делает обычный worker . Такой работник называется chief работником, и обычно главным worker назначается worker с index 0 (фактически так tf.distribute.Strategy ). task с другой стороны, предоставляет информацию о текущей задаче. Первый cluster компонентов одинаков для всех работников, а task второго компонента различна для каждого работника и определяет type и index этого работника.

В этом примере мы устанавливаем type задачи "worker" а index задачи - 0 . Это означает, что машина с такой настройкой является первым рабочим, который назначается главным рабочим и выполняет больше работы, чем другие рабочие. Обратите внимание, что на других машинах также должна быть TF_CONFIG переменная окружения TF_CONFIG , и она должна иметь такой же cluster dict, но другой type задачи или index задачи в зависимости от роли этих машин.

В целях иллюстрации этот учебник показывает, как можно установить TF_CONFIG с двумя рабочими на localhost TF_CONFIG . На практике пользователи будут создавать несколько рабочих на внешних IP-адресах / портах и ​​устанавливать TF_CONFIG для каждого рабочего соответствующим образом.

 os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
 

Обратите внимание, что хотя скорость обучения в этом примере является фиксированной, в общем случае может потребоваться настроить скорость обучения на основе глобального размера пакета.

Выберите правильную стратегию

В TensorFlow распределенное обучение состоит из синхронного обучения, когда этапы обучения синхронизируются между рабочими и репликами, и асинхронного обучения, где этапы обучения строго не синхронизированы.

MultiWorkerMirroredStrategy , которая является рекомендуемой стратегией для синхронного обучения нескольких работников, будет продемонстрирована в этом руководстве. Для обучения модели используйте экземпляр tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих. Он использует CollectiveOps , опцию TensorFlow для коллективного общения, для агрегирования градиентов и синхронизации переменных. Руководство tf.distribute.Strategy содержит более подробную информацию об этой стратегии.

 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy предоставляет несколько реализаций через параметр CollectiveCommunication . RING реализует кольцевые коллективы, используя gRPC в качестве уровня связи между хостами. NCCL использует Nvidia NCCL для реализации коллективов. AUTO откладывает выбор на время выполнения. Лучший выбор коллективной реализации зависит от количества и типа графических процессоров и сетевого взаимодействия в кластере.

Обучите модель с помощью MultiWorkerMirroredStrategy

Благодаря интеграции API-интерфейса tf.distribute.Strategy в tf.keras единственное изменение, которое вы сделаете, чтобы распространить обучение на нескольких сотрудников, - это включение построения модели и model.compile() внутри strategy.scope() . Область действия стратегии распространения определяет, как и где создаются переменные, а в случае MultiWorkerMirroredStrategy создаваемые переменные являются MirroredVariable , и они реплицируются на каждого из работников.

 num_workers = 4

# Here the batch size scales up by number of workers since 
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, 
# and now this becomes 128.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
 
Epoch 1/3
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
70/70 [==============================] - 0s 3ms/step - loss: 2.2682 - accuracy: 0.2265
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1714 - accuracy: 0.4954
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.0638 - accuracy: 0.6232

<tensorflow.python.keras.callbacks.History at 0x7fc5f4f062e8>

Шардинг набора данных и размер партии

При обучении нескольких рабочих с помощью MultiWorkerMirroredStrategy набора данных для обеспечения конвергенции и производительности. Однако обратите внимание, что в приведенном выше фрагменте кода наборы данных напрямую передаются в model.fit() без необходимости сегментирования; это связано с тем, tf.distribute.Strategy API tf.distribute.Strategy позаботится о разбиении набора данных автоматически. Он разделяет набор данных на уровне файлов, что может привести к перекосу осколков. В крайних случаях, когда имеется только один файл, только первый фрагмент (т. Е. Работник) получит данные обучения или оценки, и в результате все работники получат ошибки.

Если для обучения вы предпочитаете ручной шардинг, его можно отключить с помощью API tf.data.experimental.DistributeOptions . В частности,

 options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
 

Еще одна вещь, на которую стоит обратить внимание, это размер пакета для datasets В приведенном выше фрагменте кода мы используем global_batch_size = per_worker_batch_size * num_workers , который в num_workers разы больше, чем в случае с одним рабочим, потому что эффективный размер пакета для каждого рабочего - это глобальный размер пакета (параметр, передаваемый в tf.data.Dataset.batch() ) делится на количество рабочих, и с этим изменением мы сохраняем размер пакета на одного рабочего, как и прежде.

оценка

Если вы передадите validation_data в model.fit , он будет чередоваться между обучением и оценкой для каждой эпохи. Оценка с использованием validation_data распределяется по одному и тому же набору работников, а результаты оценки агрегируются и доступны для всех работников. Как и в случае с обучением, набор данных проверки автоматически обрабатывается на уровне файлов. Вам необходимо установить глобальный размер пакета в наборе данных проверки и установить validation_steps . Повторный набор данных также рекомендуется для оценки.

Кроме того, вы также можете создать другую задачу, которая периодически считывает контрольные точки и запускает оценку. Это то, что делает Оценщик. Но это не рекомендуемый способ проведения оценки, и поэтому его детали опущены.

прогнозирование

В настоящее время model.predict не работает с MultiWorkerMirroredStrategy.

Производительность

Теперь у вас есть модель Keras, которая настроена для работы на нескольких рабочих с MultiWorkerMirroredStrategy . Вы можете попробовать следующие методы, чтобы настроить производительность обучения нескольких рабочих с MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy предоставляет несколько реализаций коллективного общения . RING реализует кольцевые коллективы, используя gRPC в качестве уровня связи между хостами. NCCL использует Nvidia NCCL для реализации коллективов. AUTO откладывает выбор на время выполнения. Лучший выбор коллективной реализации зависит от количества и типа графических процессоров и сетевого взаимодействия в кластере. Чтобы переопределить автоматический выбор, укажите допустимое значение для параметра communication конструктора MultiWorkerMirroredStrategy , например, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .
  • tf.float переменные к tf.float если это возможно. Официальная модель ResNet включает пример того, как это можно сделать.

Отказоустойчивость

При синхронном обучении кластер не будет работать, если один из рабочих выйдет из строя и не существует механизма восстановления после сбоя. Использование tf.distribute.Strategy с tf.distribute.Strategy преимущество отказоустойчивости в случаях, когда работники умирают или иным образом нестабильны. Мы делаем это, сохраняя состояние обучения в распределенной файловой системе по вашему выбору, так что после перезапуска экземпляра, который ранее был неудачным или прерванным, состояние обучения восстанавливается.

Поскольку все работники синхронизированы с точки зрения периодов и этапов обучения, другим работникам потребуется дождаться перезапуска отказавшего или прерванного работника, чтобы продолжить.

Обратный вызов ModelCheckpoint

ModelCheckpoint вызов ModelCheckpoint больше не обеспечивает отказоустойчивость, используйте вместо этого обратный вызов BackupAndRestore .

ModelCheckpoint вызов ModelCheckpoint все еще можно использовать для сохранения контрольных точек. Но при этом, если обучение было прервано или успешно завершено, чтобы продолжить обучение с контрольной точки, пользователь несет ответственность за загрузку модели вручную. При желании пользователь может выбрать сохранение и восстановление модели / весов вне ModelCheckpoint обратного вызова ModelCheckpoint .

Сохранение и загрузка модели

Чтобы сохранить модель с помощью model.save или tf.saved_model.save , место сохранения должно быть разным для каждого работника. На старших сотрудниках вам нужно будет сохранить модель во временном каталоге, а на главном - в предоставленном каталоге модели. Временные каталоги на работнике должны быть уникальными во избежание ошибок, возникающих в результате того, что несколько работников пытаются записать в одно и то же место. Модель, сохраненная во всех каталогах, идентична, и обычно для восстановления или обслуживания следует ссылаться только на модель, сохраненную начальником. Мы рекомендуем иметь некоторую логику очистки, которая удаляет временные каталоги, созданные рабочими после завершения обучения.

Причина, по которой вам нужно одновременно экономить на начальнике и рабочих, заключается в том, что вы можете собирать переменные во время контрольных точек, что требует от руководителя и рабочих участия в протоколе связи allreduce. С другой стороны, если руководитель и работники сохранят данные в одном каталоге моделей, это приведет к ошибкам из-за конфликта.

С MultiWorkerMirroredStrategy программа запускается на каждом работнике, и чтобы узнать, является ли текущий работник руководителем, мы используем объект распознавателя кластера, который имеет атрибуты task_type и task_id . task_type сообщает вам, каково текущее задание (например, «работник»), а task_id сообщает вам идентификатор работника. Работник с идентификатором 0 назначается главным работником.

В приведенном ниже фрагменте кода write_filepath предоставляет путь к файлу для записи, который зависит от идентификатора работника. В случае Chief (рабочий с идентификатором 0), он записывает в исходный путь к файлу; для других он создает временный каталог (с идентификатором в пути к каталогу) для записи в:

 model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works 
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
 

Теперь вы готовы сохранить:

 multi_worker_model.save(write_model_path)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Как мы описали выше, позже модель должна загружаться только из сохраненного главного пути, поэтому давайте удалим временные, сохраненные не главными работниками:

 if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))
 

Теперь, когда пришло время загрузки, давайте используем удобный API tf.keras.models.load_model и продолжим дальнейшую работу. Здесь мы предполагаем использование только одного работника для загрузки и продолжения обучения, в этом случае вы не вызываете tf.keras.models.load_model в другой strategy.scope() .

 loaded_model = tf.keras.models.load_model(model_path)

# Now that we have the model restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9825 - accuracy: 0.1102
Epoch 2/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9367 - accuracy: 0.1117

<tensorflow.python.keras.callbacks.History at 0x7fc5f4b0d8d0>

Сохранение и восстановление контрольной точки

С другой стороны, контрольные точки позволяют сохранять веса модели и восстанавливать их без необходимости сохранения всей модели. Здесь вы создадите одну tf.train.Checkpoint которая отслеживает модель, которой управляет tf.train.CheckpointManager так что tf.train.CheckpointManager только последняя контрольная точка.

 checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
  checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
 

Как только CheckpointManager настроен, вы готовы сохранить и удалить контрольные точки, сохраненные не главными работниками.

 checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)
 

Теперь, когда вам нужно восстановить, вы можете найти последнюю сохраненную контрольную точку, используя удобную функцию tf.train.latest_checkpoint . Восстановив контрольно-пропускной пункт, вы можете продолжить тренировку.

 latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9841 - accuracy: 0.6561
Epoch 2/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9445 - accuracy: 0.6805

<tensorflow.python.keras.callbacks.History at 0x7fc5f49d9d30>

BackupAndRestore обратный вызов

Обратный вызов BackupAndRestore обеспечивает отказоустойчивость благодаря резервному копированию модели и номера текущей эпохи во временном файле контрольных backup_dir аргументом BackupAndRestore в BackupAndRestore . Это делается в конце каждой эпохи.

Как только задания прерываются и перезапускаются, обратный вызов восстанавливает последнюю контрольную точку, и обучение продолжается с начала прерванной эпохи. Любое частичное обучение, уже выполненное в незаконченную эпоху до прерывания, будет отброшено, чтобы оно не влияло на конечное состояние модели.

Чтобы использовать его, предоставьте экземпляр tf.keras.callbacks.experimental.BackupAndRestore при tf.keras.Model.fit() .

С MultiWorkerMirroredStrategy, если рабочий прерывается, весь кластер приостанавливается, пока не будет перезапущен рабочий. Другие рабочие также перезапустятся, и прерванный рабочий снова присоединится к кластеру. Затем каждый работник читает файл контрольной точки, который был ранее сохранен, и получает свое прежнее состояние, что позволяет кластеру вернуться в синхронизацию. Затем обучение продолжается.

BackupAndRestore вызов BackupAndRestore использует CheckpointManager для сохранения и восстановления состояния обучения, которое генерирует файл с именем checkpoint, который отслеживает существующие контрольные точки вместе с самой последней. По этой причине не следует повторно использовать backup_dir для хранения других контрольных точек во избежание конфликта имен.

В настоящее время BackupAndRestore вызов BackupAndRestore поддерживает одного рабочего без стратегии MirroredStrategy и нескольких рабочих с MultiWorkerMirroredStrategy. Ниже приведены два примера как для обучения нескольких работников, так и для обучения одного работника.

 # Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
 
Epoch 1/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2837 - accuracy: 0.1836
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2131 - accuracy: 0.4091
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1310 - accuracy: 0.5485

<tensorflow.python.keras.callbacks.History at 0x7fc5f49a3080>

Если вы осмотрите каталог backup_dir , указанный в BackupAndRestore , вы можете заметить некоторые временно созданные файлы контрольных точек. Эти файлы необходимы для восстановления ранее потерянных экземпляров, и они будут удалены библиотекой в ​​конце tf.keras.Model.fit() после успешного завершения вашего обучения.

Смотрите также

  1. Руководство по распределенному обучению в TensorFlow содержит обзор доступных стратегий распространения.
  2. Официальные модели , многие из которых могут быть настроены для запуска нескольких стратегий распространения.
  3. Раздел « Производительность» в руководстве содержит информацию о других стратегиях и инструментах, которые можно использовать для оптимизации производительности моделей TensorFlow.