Эта страница была переведа с помощью Cloud Translation API.
Switch to English

Распределенное обучение с TensorFlow

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть источник на GitHub Скачать блокнот

обзор

tf.distribute.Strategy - это API-интерфейс tf.distribute.Strategy для распределения обучения между несколькими графическими процессорами, несколькими компьютерами или TPU. Используя этот API, вы можете распространять свои существующие модели и обучающий код с минимальными изменениями кода.

tf.distribute.Strategy был разработан с учетом следующих ключевых целей:

  • Легко использовать и поддерживать несколько пользовательских сегментов, включая исследователей, инженеров ML и т. Д.
  • Обеспечить хорошую производительность из коробки.
  • Простое переключение между стратегиями.

tf.distribute.Strategy может использоваться с высокоуровневым API, таким как Keras , и также может использоваться для распространения пользовательских циклов обучения (и, в общем, любых вычислений с использованием TensorFlow).

В TensorFlow 2.x вы можете выполнять свои программы с нетерпением или в виде графика, используя tf.function . tf.distribute.Strategy намеревается поддерживать оба этих режима выполнения, но лучше всего работает с tf.function . Режим Eager рекомендуется только для целей отладки и не поддерживается для TPUStrategy . Хотя в этом руководстве мы обсуждаем большую часть времени обучение, этот API также можно использовать для распространения оценок и прогнозов на разных платформах.

Вы можете использовать tf.distribute.Strategy с очень небольшим количеством изменений в вашем коде, потому что мы изменили базовые компоненты TensorFlow, чтобы стать осведомленными о стратегии. Это включает в себя переменные, слои, модели, оптимизаторы, метрики, сводки и контрольные точки.

В этом руководстве мы расскажем о различных типах стратегий и о том, как их можно использовать в разных ситуациях.

 # Import TensorFlow
import tensorflow as tf
 

Типы стратегий

tf.distribute.Strategy намеревается охватить несколько вариантов использования по разным осям. Некоторые из этих комбинаций в настоящее время поддерживаются, а другие будут добавлены в будущем. Некоторые из этих осей:

  • Синхронное и асинхронное обучение. Это два распространенных способа распределения обучения с параллелизмом данных. При обучении синхронизации все работники обучаются на разных срезах входных данных синхронно и агрегируют градиенты на каждом этапе. При асинхронном обучении все работники независимо проходят обучение по входным данным и асинхронно обновляют переменные. Как правило, обучение синхронизации поддерживается через все-сокращение и асинхронное через архитектуру сервера параметров.
  • Аппаратная платформа. Возможно, вы захотите масштабировать обучение на несколько графических процессоров на одном компьютере или на нескольких компьютерах в сети (с 0 или более графическими процессорами каждый) или на облачных TPU.

Для поддержки этих вариантов использования доступно шесть стратегий. В следующем разделе мы объясним, какие из них поддерживаются в каких сценариях в TF 2.2 в настоящее время. Вот краткий обзор:

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API поддержанный поддержанный Экспериментальная поддержка Экспериментальная поддержка Поддерживается запланированный пост 2.3
Пользовательский цикл обучения поддержанный поддержанный Экспериментальная поддержка Экспериментальная поддержка Поддерживается запланированный пост 2.3
API оценщика Ограниченная поддержка Не поддерживается Ограниченная поддержка Ограниченная поддержка Ограниченная поддержка

MirroredStrategy

tf.distribute.MirroredStrategy поддерживает синхронное распределенное обучение на нескольких графических процессорах на одном компьютере. Он создает одну реплику на устройство GPU. Каждая переменная в модели зеркально отражена во всех репликах. Вместе эти переменные образуют единую концептуальную переменную, которая называется MirroredVariable . Эти переменные синхронизируются друг с другом, применяя идентичные обновления.

Эффективные алгоритмы общего сокращения используются для передачи обновлений переменных между устройствами. Все-уменьшить совокупные тензоры на всех устройствах, добавляя их, и делает их доступными на каждом устройстве. Это объединенный алгоритм, который очень эффективен и может значительно сократить накладные расходы на синхронизацию. Существует множество универсальных алгоритмов и реализаций, в зависимости от типа связи, доступной между устройствами. По умолчанию он использует NVIDIA NCCL в качестве универсальной реализации. Вы можете выбрать один из нескольких предложенных нами вариантов или написать свой.

Вот самый простой способ создания MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Это создаст экземпляр MirroredStrategy который будет использовать все графические процессоры, видимые для TensorFlow, и использовать NCCL в качестве связи между устройствами.

Если вы хотите использовать только некоторые графические процессоры на своем компьютере, вы можете сделать это следующим образом:

 mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
 
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Если вы хотите переопределить связь между устройствами, вы можете сделать это с cross_device_ops аргумента cross_device_ops , предоставив экземпляр tf.distribute.CrossDeviceOps . В настоящее время tf.distribute.HierarchicalCopyAllReduce и tf.distribute.ReductionToOneDevice являются двумя параметрами, отличными от tf.distribute.NcclAllReduce который используется по умолчанию.

 mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy позволяет вам проводить тренинг tf.distribute.TPUStrategy на Tensor Processing Units (TPU). TPU - это специализированные ASIC от Google, предназначенные для значительного ускорения рабочих нагрузок машинного обучения. Они доступны в Google Colab, TensorFlow Research Cloud и Cloud TPU .

С точки зрения архитектуры распределенного обучения, TPUStrategy - это та же MirroredStrategy - она ​​реализует синхронное распределенное обучение. TPU обеспечивают собственную реализацию эффективных операций общего сокращения и других коллективных операций на нескольких ядрах TPU, которые используются в TPUStrategy .

Вот как вы могли бы TPUStrategy :

 cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
 

Экземпляр TPUClusterResolver помогает найти TPU. В Colab вам не нужно указывать аргументы.

Если вы хотите использовать это для облачных TPU:

  • Вы должны указать имя вашего ресурса TPU в аргументе tpu .
  • Вы должны явно инициализировать систему TPU в начале программы. Это необходимо до того, как TPU можно будет использовать для вычислений. Инициализация системы tpu также стирает память TPU, поэтому важно сначала выполнить этот шаг, чтобы избежать потери состояния.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy очень похож на MirroredStrategy . Он реализует синхронное распределенное обучение для нескольких работников, каждый из которых может иметь несколько графических процессоров. Подобно MirroredStrategy , он создает копии всех переменных в модели на каждом устройстве для всех рабочих.

Он использует CollectiveOps в качестве универсального метода связи для всех работников, используемого для синхронизации переменных. Коллективная операция - это одна операция на графике TensorFlow, которая может автоматически выбирать алгоритм полного сокращения во время выполнения TensorFlow в соответствии с аппаратным обеспечением, топологией сети и размерами тензора.

Также реализованы дополнительные оптимизации производительности. Например, он включает в себя статическую оптимизацию, которая преобразует множественные все редукции на малых тензорах в меньшее все редукции на больших тензорах. Кроме того, мы разрабатываем так, чтобы он имел архитектуру плагинов - так что в будущем вы сможете подключать алгоритмы, которые лучше настроены для вашего оборудования. Обратите внимание, что коллективные операции также реализуют другие коллективные операции, такие как широковещание и всеобщее собрание.

Вот самый простой способ создания MultiWorkerMirroredStrategy :

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy настоящее время позволяет выбирать между двумя различными реализациями коллективных операций. CollectiveCommunication.RING реализует кольцевые коллективы, используя gRPC в качестве уровня связи. CollectiveCommunication.NCCL использует NCCL от Nvidia для реализации коллективов. CollectiveCommunication.AUTO откладывает выбор на время выполнения. Лучший выбор коллективной реализации зависит от количества и типа графических процессоров и сетевого взаимодействия в кластере. Вы можете указать их следующим образом:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

Одним из ключевых отличий для обучения нескольких рабочих по сравнению с обучением с несколькими GPU является настройка нескольких рабочих. TF_CONFIG среды TF_CONFIG является стандартным способом в TensorFlow для указания конфигурации кластера для каждого работника, который является частью кластера. Узнайте больше о настройке TF_CONFIG .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy выполняет синхронное обучение. Переменные не отражаются, вместо этого они помещаются в ЦП, а операции реплицируются во все локальные графические процессоры. Если имеется только один графический процессор, все переменные и операции будут помещены в этот графический процессор.

Создайте экземпляр CentralStorageStrategy помощью:

 central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
 
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Это создаст экземпляр CentralStorageStrategy который будет использовать все видимые графические процессоры и ЦП. Обновление переменных в репликах будет агрегировано перед применением к переменным.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy поддерживает обучение серверов параметров на нескольких компьютерах. В этой настройке некоторые машины обозначаются как рабочие, а некоторые - как серверы параметров. Каждая переменная модели размещается на одном сервере параметров. Вычисления реплицируются на все графические процессоры всех рабочих.

С точки зрения кода, это похоже на другие стратегии:

 ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
 

Для обучения нескольких рабочих TF_CONFIG должен указать конфигурацию серверов параметров и рабочих в вашем кластере, о которых вы можете прочитать в TF_CONFIG ниже .

Другие стратегии

В дополнение к вышеупомянутым стратегиям есть две другие стратегии, которые могут быть полезны для tf.distribute прототипов и отладки при использовании API tf.distribute .

Стратегия по умолчанию

Стратегия по умолчанию - это стратегия распространения, которая присутствует, когда нет явной стратегии распространения. Он реализует интерфейс tf.distribute.Strategy но является сквозным и не обеспечивает фактического распространения. Например, strategy.run(fn) просто вызовет fn . Код, написанный с использованием этой стратегии, должен вести себя точно так же, как код, написанный без какой-либо стратегии. Вы можете думать об этом как о стратегии "без операции".

Стратегия по умолчанию - одноэлементная, и нельзя создавать больше ее экземпляров. Его можно получить с помощью tf.distribute.get_strategy() вне области действия любой явной стратегии (тот же API, который можно использовать для получения текущей стратегии внутри области действия явной стратегии).

 default_strategy = tf.distribute.get_strategy()
 

Эта стратегия служит двум основным целям:

  • Это позволяет безоговорочно писать код библиотеки с учетом распространения. Например, в оптимизаторе мы можем сделать tf.distribute.get_strategy() и использовать эту стратегию для уменьшения градиентов - он всегда будет возвращать объект стратегии, для которого мы можем вызвать API сокращения.
 # In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
 
1.0
  • Подобно библиотечному коду, его можно использовать для написания программ конечных пользователей для работы с и без стратегии распространения, не требуя условной логики. Пример кода, иллюстрирующий это:
 if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
 
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy - это стратегия размещения всех переменных и вычислений на одном указанном устройстве.

 strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
 

Эта стратегия отличается от стратегии по умолчанию несколькими способами. В стратегии по умолчанию логика размещения переменных остается неизменной по сравнению с запуском TensorFlow без какой-либо стратегии распространения. Но при использовании OneDeviceStrategy все переменные, созданные в его области действия, явно размещаются на указанном устройстве. Более того, любые функции, вызываемые через OneDeviceStrategy.run , также будут размещены на указанном устройстве.

Входные данные, распространяемые с помощью этой стратегии, будут предварительно загружены на указанное устройство. В стратегии по умолчанию нет входного распределения.

Подобно стратегии по умолчанию, эта стратегия может также использоваться для тестирования вашего кода перед переключением на другие стратегии, которые фактически распространяются на несколько устройств / машин. Это будет использовать механизм стратегии распределения несколько больше, чем стратегия по умолчанию, но не в полной мере, как при использовании MirroredStrategy TPUStrategy и т. Д. Если вам нужен код, который ведет себя так, как будто стратегии нет, то используйте стратегию по умолчанию.

До сих пор мы говорили о том, какие существуют различные стратегии и как вы можете их реализовать. В следующих нескольких разделах мы поговорим о различных способах их использования для распространения вашего обучения. В этом руководстве мы покажем фрагменты короткого кода и дадим ссылки на полные учебники, которые вы можете запускать из конца в конец.

Использование tf.distribute.Strategy с tf.keras.Model.fit

Мы интегрировали tf.distribute.Strategy в tf.keras который является реализацией спецификации tf.distribute.Strategy в tf.keras . tf.keras - это высокоуровневый API для построения и обучения моделей. Благодаря интеграции в бэкэнд tf.keras мы tf.keras для вас распространение вашего обучения, написанного в рамках обучения model.fit с использованием model.fit .

Вот что вам нужно изменить в своем коде:

  1. Создайте экземпляр соответствующего tf.distribute.Strategy .
  2. Переместите создание модели Keras, оптимизатора и метрик в strategy.scope .

Мы поддерживаем все типы моделей Keras - последовательные, функциональные и подклассы.

Вот фрагмент кода, чтобы сделать это для очень простой модели Keras с одним плотным слоем:

 mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

В этом примере мы использовали MirroredStrategy поэтому мы можем запустить его на машине с несколькими графическими процессорами. strategy.scope() указывает Keras, какую стратегию использовать для распространения обучения. Создание моделей / оптимизаторов / метрик внутри этой области позволяет нам создавать распределенные переменные вместо обычных переменных. После того, как это настроено, вы можете подогнать свою модель, как обычно. MirroredStrategy заботится о копировании обучения модели на доступных графических процессорах, агрегировании градиентов и многом другом.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
 
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.0035
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.4436
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.2755

0.27546340227127075

Здесь мы использовали tf.data.Dataset для обеспечения обучения и оценки. Вы также можете использовать numy массивы:

 import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
 
Epoch 1/2
10/10 [==============================] - 0s 1ms/step - loss: 0.1961
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.0867

<tensorflow.python.keras.callbacks.History at 0x7fa17808e240>

В обоих случаях (набор данных или numpy) каждая партия данного ввода делится поровну между несколькими репликами. Например, если вы используете MirroredStrategy с 2 графическими процессорами, каждая партия размером 10 будет разделена между двумя графическими процессорами, и каждый получит 5 входных примеров на каждом шаге. Каждая эпоха будет развиваться быстрее, если вы добавите больше графических процессоров. Как правило, вы хотите увеличить размер пакета при добавлении дополнительных ускорителей, чтобы эффективно использовать дополнительную вычислительную мощность. Вам также нужно будет перенастроить скорость обучения в зависимости от модели. Вы можете использовать strategy.num_replicas_in_sync чтобы получить количество реплик.

 # Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
 

Что сейчас поддерживается?

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API Keras поддержанный поддержанный Экспериментальная поддержка Экспериментальная поддержка Поддержка запланированного поста 2.3

Примеры и учебники

Вот список учебников и примеров, которые иллюстрируют вышеупомянутую интеграцию от начала до конца с Keras:

  1. Учебник для обучения MNIST с MirroredStrategy .
  2. Учебник для обучения MNIST с использованием MultiWorkerMirroredStrategy .
  3. Руководство по обучению MNIST с использованием TPUStrategy .
  4. Репозиторий TensorFlow Model Garden, содержащий коллекции самых современных моделей, реализованных с использованием различных стратегий.

Использование tf.distribute.Strategy с пользовательскими циклами обучения

Как вы видели, использование tf.distribute.Strategy с model.fit требует изменения только нескольких строк вашего кода. tf.distribute.Strategy немного больше усилий, вы также можете использовать tf.distribute.Strategy с пользовательскими циклами обучения.

Если вам требуется больше гибкости и контроля над тренировочными циклами, чем это возможно в Estimator или Keras, вы можете написать собственные обучающие циклы. Например, при использовании GAN вам может потребоваться выполнять разное количество шагов генератора или дискриминатора в каждом раунде. Аналогичным образом, структуры высокого уровня не очень подходят для обучения в области обучения подкреплению.

Для поддержки пользовательских циклов обучения мы предоставляем основной набор методов через классы tf.distribute.Strategy . Их использование может потребовать незначительной реструктуризации кода на начальном этапе, но как только это будет сделано, вы сможете переключаться между графическими процессорами, TPU и несколькими компьютерами, просто изменив экземпляр стратегии.

Здесь мы покажем краткий фрагмент, иллюстрирующий этот вариант использования для простого примера обучения, использующего ту же модель Keras, что и раньше.

Сначала мы создаем модель и оптимизатор в рамках стратегии. Это гарантирует, что любые переменные, созданные с помощью модели и оптимизатора, являются зеркальными переменными.

 with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
 

Затем мы создаем входной набор данных и вызываем tf.distribute.Strategy.experimental_distribute_dataset чтобы распределить набор данных на основе стратегии.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
 

Затем мы определяем один шаг обучения. Мы будем использовать tf.GradientTape для вычисления градиентов и оптимизатор для применения этих градиентов для обновления переменных нашей модели. Чтобы распространить этот обучающий шаг, мы вставляем функцию train_step и передаем ее в tf.distrbute.Strategy.run вместе с входными данными набора данных, которые мы получаем из dist_dataset созданного ранее:

 loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
 

Несколько других вещей, которые нужно отметить в приведенном выше коде:

  1. Мы использовали tf.nn.compute_average_loss для расчета потерь. tf.nn.compute_average_loss суммирует потери в каждом примере и делит сумму на global_batch_size. Это важно, потому что позже, после того как градиенты рассчитаны для каждой реплики, они агрегируются по репликам путем их суммирования .
  2. Мы использовали API tf.distribute.Strategy.reduce для объединения результатов, возвращаемых tf.distribute.Strategy.run . tf.distribute.Strategy.run возвращает результаты каждой локальной реплики в стратегии, и есть несколько способов использовать этот результат. Вы можете reduce их, чтобы получить совокупную стоимость. Вы также можете выполнить tf.distribute.Strategy.experimental_local_results чтобы получить список значений, содержащихся в результате, по одному на локальную реплику.
  3. Когда apply_gradients вызывается в рамках стратегии распространения, его поведение изменяется. В частности, перед применением градиентов к каждому параллельному экземпляру во время синхронного обучения он выполняет суммирование по всем репликам градиентов.

Наконец, как только мы определили шаг обучения, мы можем dist_dataset и запустить обучение в цикле:

 for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
 
tf.Tensor(0.4155251, shape=(), dtype=float32)
tf.Tensor(0.41321823, shape=(), dtype=float32)
tf.Tensor(0.4109319, shape=(), dtype=float32)
tf.Tensor(0.40866604, shape=(), dtype=float32)
tf.Tensor(0.40642032, shape=(), dtype=float32)
tf.Tensor(0.40419456, shape=(), dtype=float32)
tf.Tensor(0.4019885, shape=(), dtype=float32)
tf.Tensor(0.399802, shape=(), dtype=float32)
tf.Tensor(0.39763477, shape=(), dtype=float32)
tf.Tensor(0.3954866, shape=(), dtype=float32)
tf.Tensor(0.39335734, shape=(), dtype=float32)
tf.Tensor(0.3912467, shape=(), dtype=float32)
tf.Tensor(0.38915452, shape=(), dtype=float32)
tf.Tensor(0.38708064, shape=(), dtype=float32)
tf.Tensor(0.38502476, shape=(), dtype=float32)
tf.Tensor(0.38298675, shape=(), dtype=float32)
tf.Tensor(0.38096642, shape=(), dtype=float32)
tf.Tensor(0.3789635, shape=(), dtype=float32)
tf.Tensor(0.3769779, shape=(), dtype=float32)
tf.Tensor(0.37500936, shape=(), dtype=float32)

В приведенном выше примере мы dist_dataset чтобы предоставить информацию для вашего обучения. Мы также предоставляем tf.distribute.Strategy.make_experimental_numpy_dataset для поддержки вводных данных. Вы можете использовать этот API для создания набора данных перед вызовом tf.distribute.Strategy.experimental_distribute_dataset .

Другой способ перебора ваших данных - явное использование итераторов. Возможно, вы захотите сделать это, если вы хотите выполнить для заданного числа шагов, а не для итерации по всему набору данных. Вышеупомянутая итерация теперь будет модифицирована, чтобы сначала создать итератор, а затем явно вызвать next для него, чтобы получить входные данные.

 iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
 
tf.Tensor(0.37305772, shape=(), dtype=float32)
tf.Tensor(0.3711228, shape=(), dtype=float32)
tf.Tensor(0.3692044, shape=(), dtype=float32)
tf.Tensor(0.36730233, shape=(), dtype=float32)
tf.Tensor(0.3654165, shape=(), dtype=float32)
tf.Tensor(0.36354658, shape=(), dtype=float32)
tf.Tensor(0.36169255, shape=(), dtype=float32)
tf.Tensor(0.3598542, shape=(), dtype=float32)
tf.Tensor(0.35803124, shape=(), dtype=float32)
tf.Tensor(0.3562236, shape=(), dtype=float32)

Это охватывает простейший случай использования API tf.distribute.Strategy для распространения пользовательских циклов обучения. Мы находимся в процессе улучшения этих API. Поскольку этот вариант использования требует больше усилий для адаптации вашего кода, в будущем мы опубликуем отдельное подробное руководство.

Что сейчас поддерживается?

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Пользовательский цикл обучения поддержанный поддержанный Экспериментальная поддержка Экспериментальная поддержка Поддержка запланированного поста 2.3

Примеры и учебники

Вот несколько примеров использования стратегии распространения с пользовательскими циклами обучения:

  1. Учебник для обучения MNIST с использованием MirroredStrategy .
  2. Руководство по обучению MNIST с использованием TPUStrategy .
  3. Репозиторий TensorFlow Model Garden, содержащий коллекции самых современных моделей, реализованных с использованием различных стратегий.

Использование tf.distribute.Strategy с Estimator (ограниченная поддержка)

tf.estimator - это распределенный обучающий API-интерфейс tf.estimator который изначально поддерживал подход сервера асинхронных параметров. Как и в случае с Keras, мы интегрировали tf.distribute.Strategy в tf.Estimator . Если вы используете Estimator для обучения, вы можете легко перейти на распределенное обучение с минимальными изменениями в коде. Благодаря этому пользователи Estimator теперь могут проводить синхронное распределенное обучение на нескольких графических процессорах и нескольких рабочих, а также использовать TPU. Эта поддержка в Estimator, однако, ограничена. Посмотрите, что поддерживается сейчас в разделе ниже для более подробной информации.

Использование tf.distribute.Strategy с Estimator немного отличается от tf.distribute.Strategy с tf.distribute.Strategy . Вместо того, чтобы использовать strategy.scope , теперь мы передаем объект стратегии в RunConfig для Оценщика.

Вот фрагмент кода, который показывает это с помощью предварительно LinearRegressor Estimator LinearRegressor и MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp2ack9oru
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp2ack9oru', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Мы используем готовый Оценщик, но тот же код работает и с пользовательским Оценщиком. train_distribute определяет, как будет распространяться обучение, а eval_distribute определяет, как будет распространяться оценка. Это еще одно отличие от Keras, где мы используем одну и ту же стратегию как для обучения, так и для оценки.

Теперь мы можем обучить и оценить этот Оценщик с помощью функции ввода:

 def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
 
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-08-04T20:28:12Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp2ack9oru/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.20350s
INFO:tensorflow:Finished evaluation at 2020-08-04-20:28:12
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmp2ack9oru/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Еще одно отличие, которое стоит выделить здесь между Estimator и Keras, - это обработка ввода. В Keras мы упоминали, что каждая партия набора данных автоматически разделяется на несколько реплик. В Estimator, однако, мы не выполняем автоматическое разделение пакетов и не разделяем данные между различными работниками автоматически. У вас есть полный контроль над тем, как вы хотите, чтобы ваши данные распределялись между рабочими и устройствами, и вы должны предоставить input_fn чтобы указать, как распределять ваши данные.

Ваш input_fn вызывается один раз для каждого работника, что дает один набор данных для каждого работника. Затем одна партия из этого набора данных подается в одну реплику на этом работнике, тем самым потребляя N партий для N реплик на 1 работника. Другими словами, набор данных, возвращаемый input_fn должен предоставлять пакеты размером PER_REPLICA_BATCH_SIZE . Глобальный размер пакета для шага может быть получен как PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

При проведении обучения с несколькими работниками вы должны либо разделить свои данные по рабочим, либо перемешать со случайным начальным числом на каждом. Вы можете увидеть пример того, как это сделать, в Тренинге для нескольких рабочих с оценщиком .

Точно так же вы можете использовать стратегии нескольких рабочих и серверов параметров. Код остается тем же, но вам нужно использовать tf.estimator.train_and_evaluate и установить переменные среды TF_CONFIG для каждого двоичного tf.estimator.train_and_evaluate , работающего в вашем кластере.

Что сейчас поддерживается?

Поддержка Estimator для обучения с использованием всех стратегий, за исключением TPUStrategy . Базовое обучение и оценка должны работать, но ряд дополнительных функций, таких как строительные леса, еще не работают. В этой интеграции также может быть несколько ошибок. В настоящее время мы не планируем активно улучшать эту поддержку, и вместо этого сосредоточены на поддержке Keras и пользовательских циклов обучения. Если это вообще возможно, вы должны вместо этого использовать tf.distribute с этими API.

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API оценщика Ограниченная поддержка Не поддерживается Ограниченная поддержка Ограниченная поддержка Ограниченная поддержка

Примеры и учебники

Вот несколько примеров, демонстрирующих использование различных стратегий в Estimator:

  1. Обучение нескольких работников с помощью Estimator для обучения MNIST с несколькими работниками с использованием MultiWorkerMirroredStrategy .
  2. Сквозной пример для обучения нескольких работников в тензорном потоке / экосистеме с использованием шаблонов Kubernetes. Этот пример начинается с модели tf.keras.estimator.model_to_estimator и преобразует ее в tf.keras.estimator.model_to_estimator API tf.keras.estimator.model_to_estimator .
  3. Официальная модель ResNet50 , которую можно обучить, используя либо MirroredStrategy либо MultiWorkerMirroredStrategy .

Другие темы

В этом разделе мы рассмотрим некоторые темы, которые относятся к нескольким случаям использования.

Настройка переменной среды TF_CONFIG

Для обучения нескольких работников, как уже упоминалось ранее, вам нужно установить переменную среды TF_CONFIG для каждого двоичного TF_CONFIG работающего в вашем кластере. TF_CONFIG среды TF_CONFIG - это строка JSON, которая указывает, какие задачи составляют кластер, их адреса и роль каждой задачи в кластере. Мы предоставляем шаблон Kubernetes в репозитории тензорного потока / экосистемы, который устанавливает TF_CONFIG для ваших учебных задач.

Есть два компонента TF_CONFIG: кластер и задача. Кластер предоставляет информацию об обучающем кластере, который является диктатом, состоящим из различных типов заданий, таких как рабочий. В обучении с несколькими работниками обычно есть один работник, который берет на себя немного больше ответственности, например сохранение контрольной точки и запись сводного файла для TensorBoard в дополнение к тому, что делает обычный работник. Такой работник упоминается как «главный» работник, и обычно рабочий с индексом 0 назначается главным работником (фактически, именно так реализуется tf.distribute.Strategy). задача, с другой стороны, предоставляет информацию о текущей задаче. Первый кластер компонентов одинаков для всех работников, а задача второго компонента различна для каждого работника и определяет тип и индекс этого работника.

Одним из TF_CONFIG является:

 os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})
 

Этот TF_CONFIG указывает, что в TF_CONFIG есть три рабочих задачи и две задачи ps, а также их хосты и порты. Часть «задача» указывает, что роль текущей задачи в кластере, работник 1 (второй работник). Допустимые роли в кластере: "главный", "рабочий", "пс" и "оценщик". Не должно быть задания "ps", кроме случаев использования tf.distribute.experimental.ParameterServerStrategy .

Что дальше?

tf.distribute.Strategy активно разрабатывается. Мы приглашаем вас опробовать его и предоставить ваши отзывы по вопросам, связанным с GitHub .