Эта страница была переведа с помощью Cloud Translation API.
Switch to English

Индивидуальное обучение с tf.distribute.Strategy

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

В этом руководстве показано, как использовать tf.distribute.Strategy с настраиваемыми циклами обучения. Мы обучим простую модель CNN на наборе данных fashion MNIST. Набор данных Fashion MNIST содержит 60000 изображений поездов размером 28 x 28 и 10000 тестовых изображений размером 28 x 28.

Мы используем настраиваемые циклы обучения для обучения нашей модели, потому что они дают нам гибкость и больший контроль над обучением. Более того, легче отлаживать модель и цикл обучения.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

Загрузите набор данных Fashion MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 1s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 1s 0us/step

Создайте стратегию для распределения переменных и графика

Как работает стратегия tf.distribute.MirroredStrategy ?

  • Все переменные и граф модели воспроизводятся на репликах.
  • Входные данные равномерно распределяются по репликам.
  • Каждая реплика вычисляет потери и градиенты для полученных входных данных.
  • Градиенты синхронизируются на всех репликах путем их суммирования.
  • После синхронизации выполняется такое же обновление копий переменных на каждой реплике.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Настройка входного конвейера

Экспортируйте график и переменные в независимый от платформы формат SavedModel. После сохранения модели вы можете загрузить ее с прицелом или без него.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Создайте наборы данных и распространите их:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Создайте модель

Создайте модель с помощью tf.keras.Sequential . Для этого также можно использовать API-интерфейс подкласса модели.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Определите функцию потерь

Обычно на одной машине с 1 GPU / CPU потери делятся на количество примеров в пакете ввода.

Итак, как следует рассчитать потери при использовании tf.distribute.Strategy ?

  • Например, предположим, что у вас есть 4 GPU и размер пакета 64. Один пакет ввода распределяется по репликам (4 GPU), каждая реплика получает ввод размером 16.

  • Модель на каждой реплике выполняет прямой проход с соответствующими входными данными и вычисляет потери. Теперь, вместо деления потерь на количество примеров в соответствующем вводе (BATCH_SIZE_PER_REPLICA = 16), потери следует разделить на GLOBAL_BATCH_SIZE (64).

Зачем это делать?

  • Это необходимо сделать, потому что после расчета градиентов на каждой реплике они синхронизируются между репликами путем их суммирования .

Как это сделать в TensorFlow?

  • Если вы пишете собственный цикл обучения, как в этом руководстве, вам следует просуммировать потери для каждого примера и разделить сумму на GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) или вы можете использовать tf.nn.compute_average_loss который принимает потери для каждого примера, необязательные веса выборки и GLOBAL_BATCH_SIZE в качестве аргументов и возвращает масштабированные потери.

  • Если вы используете в своей модели потери регуляризации, вам необходимо масштабировать значение потерь по количеству реплик. Вы можете сделать это с помощью функции tf.nn.scale_regularization_loss .

  • Использование tf.reduce_mean не рекомендуется. При этом потери делятся на фактический размер пакета реплик, который может меняться шаг за шагом.

  • Это уменьшение и масштабирование выполняется автоматически в keras model.compile и model.fit

  • При использовании классов tf.keras.losses (как в приведенном ниже примере) необходимо явно указать уменьшение потерь, tf.keras.losses NONE или SUM . AUTO и SUM_OVER_BATCH_SIZE запрещены при использовании с tf.distribute.Strategy . AUTO запрещен, потому что пользователь должен явно подумать о том, какое сокращение он хочет, чтобы убедиться, что оно правильно в распределенном случае. SUM_OVER_BATCH_SIZE запрещено, потому что в настоящее время он будет делиться только на размер пакета реплик, а деление на количество реплик SUM_OVER_BATCH_SIZE пользователя, что может быть легко пропустить. Поэтому вместо этого мы просим пользователя сделать сокращение самостоятельно.

  • Если labels многомерные, то усредните per_example_loss по количеству элементов в каждой выборке. Например, если форма predictions имеет вид (batch_size, H, W, n_classes) а labels - (batch_size, H, W) , вам нужно будет обновить per_example_loss например: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Определите метрики для отслеживания потерь и точности

Эти показатели позволяют отслеживать потери в тестах, а также точность обучения и тестирования. Вы можете использовать .result() для получения накопленной статистики в любое время.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

Цикл обучения

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.50295090675354, Accuracy: 82.1116714477539, Test Loss: 0.3852590322494507, Test Accuracy: 86.5999984741211
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.32958829402923584, Accuracy: 88.20333862304688, Test Loss: 0.3391425311565399, Test Accuracy: 87.6500015258789
Epoch 3, Loss: 0.2872008979320526, Accuracy: 89.57167053222656, Test Loss: 0.2974696457386017, Test Accuracy: 89.31000518798828
Epoch 4, Loss: 0.255713552236557, Accuracy: 90.58499908447266, Test Loss: 0.2988712787628174, Test Accuracy: 89.31999969482422
Epoch 5, Loss: 0.23122134804725647, Accuracy: 91.41667175292969, Test Loss: 0.27742496132850647, Test Accuracy: 89.99000549316406
Epoch 6, Loss: 0.212575763463974, Accuracy: 92.17333221435547, Test Loss: 0.2573488652706146, Test Accuracy: 90.75
Epoch 7, Loss: 0.1963273137807846, Accuracy: 92.77166748046875, Test Loss: 0.2587501108646393, Test Accuracy: 90.66000366210938
Epoch 8, Loss: 0.1779220998287201, Accuracy: 93.46666717529297, Test Loss: 0.267805814743042, Test Accuracy: 90.55999755859375
Epoch 9, Loss: 0.16410504281520844, Accuracy: 93.91333770751953, Test Loss: 0.25632956624031067, Test Accuracy: 91.00999450683594
Epoch 10, Loss: 0.14829590916633606, Accuracy: 94.47833251953125, Test Loss: 0.25820475816726685, Test Accuracy: 91.00999450683594

На что следует обратить внимание в приведенном выше примере:

  • Мы перебираем train_dist_dataset и test_dist_dataset используя конструкцию for x in ...
  • Масштабируемая потеря - это возвращаемое значение distributed_train_step . Это значение агрегируется по репликам с tf.distribute.Strategy.reduce вызова tf.distribute.Strategy.reduce а затем по tf.distribute.Strategy.reduce путем суммирования возвращаемого значения вызовов tf.distribute.Strategy.reduce .
  • tf.keras.Metrics должны быть обновлены внутри train_step и test_step которые выполняются tf.distribute.Strategy.run . * tf.distribute.Strategy.run возвращает результаты от каждой локальной реплики в стратегии, и есть несколько способов получить этот результат. Вы можете выполнить tf.distribute.Strategy.reduce чтобы получить агрегированное значение. Вы также можете выполнить tf.distribute.Strategy.experimental_local_results чтобы получить список значений, содержащихся в результате, по одному на локальную реплику.

Восстановите последнюю контрольную точку и протестируйте

Контрольная точка модели с tf.distribute.Strategy может быть восстановлена ​​со стратегией или без tf.distribute.Strategy .

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.00999450683594

Альтернативные способы перебора набора данных

Использование итераторов

Если вы хотите перебрать заданное количество шагов, а не весь набор данных, вы можете создать итератор, используя вызов iter и явный вызов next на итераторе. Вы можете перебирать набор данных как внутри, так и вне функции tf. Вот небольшой фрагмент, демонстрирующий итерацию набора данных за пределами функции tf. с использованием итератора.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.12157603353261948, Accuracy: 95.0
Epoch 10, Loss: 0.1367541253566742, Accuracy: 94.6875
Epoch 10, Loss: 0.14902949333190918, Accuracy: 93.90625
Epoch 10, Loss: 0.12149540334939957, Accuracy: 95.625
Epoch 10, Loss: 0.13160167634487152, Accuracy: 94.6875
Epoch 10, Loss: 0.13297739624977112, Accuracy: 95.3125
Epoch 10, Loss: 0.16038034856319427, Accuracy: 94.53125
Epoch 10, Loss: 0.1035340279340744, Accuracy: 96.40625
Epoch 10, Loss: 0.11846740543842316, Accuracy: 95.625
Epoch 10, Loss: 0.09006750583648682, Accuracy: 96.71875

Итерация внутри tf.функции

Вы также можете перебирать весь входной train_dist_dataset внутри tf.function, используя конструкцию for x in ... или создавая итераторы, как мы делали выше. В приведенном ниже примере демонстрируется обертывание одной эпохи обучения в tf.function и повторение train_dist_dataset внутри функции.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13680464029312134, Accuracy: 94.90499877929688
Epoch 2, Loss: 0.12503673136234283, Accuracy: 95.33499908447266
Epoch 3, Loss: 0.11472766101360321, Accuracy: 95.71333312988281
Epoch 4, Loss: 0.10419528931379318, Accuracy: 96.13500213623047
Epoch 5, Loss: 0.09566374123096466, Accuracy: 96.44833374023438
Epoch 6, Loss: 0.08704081922769547, Accuracy: 96.82499694824219
Epoch 7, Loss: 0.08157625794410706, Accuracy: 96.96333312988281
Epoch 8, Loss: 0.07562965154647827, Accuracy: 97.11000061035156
Epoch 9, Loss: 0.0676642507314682, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06430575996637344, Accuracy: 97.58333587646484

Отслеживание потери тренировок по репликам

Мы не рекомендуем использовать tf.metrics.Mean для отслеживания потерь при обучении по разным репликам из-за выполняемых вычислений масштабирования потерь.

Например, если вы выполняете учебное задание со следующими характеристиками:

  • Две реплики
  • На каждой реплике обрабатываются два образца.
  • Итоговые значения потерь: [2, 3] и [4, 5] на каждой реплике.
  • Глобальный размер партии = 4

При масштабировании потерь вы вычисляете значение потерь для каждой выборки для каждой реплики, складывая значения потерь и затем деля на глобальный размер пакета. В этом случае: (2 + 3) / 4 = 1.25 и (4 + 5) / 4 = 2.25 .

Если вы используете tf.metrics.Mean для отслеживания потерь в двух репликах, результат будет другим. В этом примере вы получаете в total 3,50 и count 2, что дает total / count = 1,75 при вызове result() для метрики. Потери, рассчитанные с помощью tf.keras.Metrics , масштабируются с помощью дополнительного коэффициента, который равен количеству синхронизированных реплик.

Руководство и примеры

Вот несколько примеров использования стратегии распространения с пользовательскими циклами обучения:

  1. Распределенное учебное руководство
  2. Пример DenseNet с использованием MirroredStrategy .
  3. Пример BERT, обученный с использованием MirroredStrategy и TPUStrategy . Этот пример особенно полезен для понимания того, как выполнять загрузку из контрольной точки и создавать периодические контрольные точки во время распределенного обучения и т. Д.
  4. Пример NCF, обученный с использованием MirroredStrategy который можно включить с keras_use_ctl флага keras_use_ctl .
  5. Пример NMT, обученный с использованием MirroredStrategy .

Дополнительные примеры приведены в руководстве по стратегии распространения .

Следующие шаги