Индивидуальное обучение с tf.distribute.Strategy

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

В этом руководстве показано, как использовать tf.distribute.Strategy с пользовательскими циклами обучения. Мы будем обучать простую модель CNN на наборе данных моды MNIST. Набор данных fashion MNIST содержит 60 000 изображений поездов размером 28 x 28 и 10 000 тестовых изображений размером 28 x 28.

Мы используем настраиваемые циклы обучения для обучения нашей модели, потому что они дают нам гибкость и больший контроль над обучением. Кроме того, легче отладить модель и цикл обучения.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1

Загрузите набор данных моды MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

Создайте стратегию для распределения переменных и графика

Как работает стратегия tf.distribute.MirroredStrategy ?

  • Все переменные и график модели тиражируются на репликах.
  • Ввод равномерно распределяется по репликам.
  • Каждая реплика вычисляет потери и градиенты для полученных входных данных.
  • Градиенты синхронизируются по всем репликам путем их суммирования.
  • После синхронизации такое же обновление выполняется для копий переменных на каждой реплике.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Настройка входного конвейера

Экспортируйте график и переменные в независимый от платформы формат SavedModel. После того, как ваша модель сохранена, вы можете загрузить ее с прицелом или без него.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Создайте наборы данных и распространите их:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
2022-01-26 05:45:53.991501: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_UINT8
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
        dim {
          size: 1
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
}

2022-01-26 05:45:54.034762: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_UINT8
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:3"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
        dim {
          size: 1
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
}

Создайте модель

Создайте модель с помощью tf.keras.Sequential . Для этого вы также можете использовать API подклассов моделей.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Определите функцию потерь

Обычно на одной машине с 1 GPU/CPU потери делятся на количество примеров в пакете ввода.

Итак, как следует рассчитывать потери при использовании tf.distribute.Strategy ?

  • Например, предположим, что у вас есть 4 графических процессора и размер пакета 64. Один пакет входных данных распределяется по репликам (4 графических процессора), каждая реплика получает входные данные размером 16.

  • Модель на каждой реплике выполняет прямой проход с соответствующим входом и вычисляет потери. Теперь вместо деления потерь на количество примеров в соответствующих входных данных (BATCH_SIZE_PER_REPLICA = 16) потери следует разделить на GLOBAL_BATCH_SIZE (64).

Зачем это делать?

  • Это необходимо сделать, потому что после того, как градиенты рассчитаны для каждой реплики, они синхронизируются между репликами путем их суммирования .

Как это сделать в TensorFlow?

  • Если вы пишете собственный цикл обучения, как в этом руководстве, вы должны tf.nn.compute_average_loss потери для каждого примера и разделить сумму на scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) : tf.nn.compute_average_loss , который принимает потери для каждого примера, необязательные веса выборки и GLOBAL_BATCH_SIZE в качестве аргументов и возвращает масштабированные потери.

  • Если вы используете потери регуляризации в своей модели, вам необходимо масштабировать значение потерь по количеству реплик. Вы можете сделать это с помощью функции tf.nn.scale_regularization_loss .

  • Использование tf.reduce_mean не рекомендуется. При этом потери делятся на фактический размер пакета реплики, который может варьироваться от шага к шагу.

  • Это уменьшение и масштабирование выполняется автоматически в keras model.compile и model.fit

  • При использовании классов tf.keras.losses (как в приведенном ниже примере) необходимо явно указать уменьшение потерь, чтобы оно было одним из NONE или SUM . AUTO и SUM_OVER_BATCH_SIZE не допускаются при использовании с tf.distribute.Strategy . AUTO запрещен, потому что пользователь должен явно подумать о том, какое сокращение он хочет, чтобы убедиться, что оно правильное в распределенном случае. SUM_OVER_BATCH_SIZE запрещен, поскольку в настоящее время он будет делить только на размер пакета каждой реплики, а деление на количество реплик остается за пользователем, что может быть легко пропущено. Поэтому вместо этого мы просим пользователя выполнить сокращение самостоятельно.

  • Если labels многомерны, усредните per_example_loss по количеству элементов в каждой выборке. Например, если форма predictions (batch_size, H, W, n_classes) и labels (batch_size, H, W) , вам нужно будет обновить per_example_loss например: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Определите показатели для отслеживания потерь и точности

Эти показатели отслеживают потери при тестировании, а также точность обучения и тестирования. Вы можете использовать .result() для получения накопленной статистики в любое время.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Тренировочный цикл

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5106383562088013, Accuracy: 81.77999877929688, Test Loss: 0.39399346709251404, Test Accuracy: 85.79000091552734
Epoch 2, Loss: 0.3362727463245392, Accuracy: 87.91333770751953, Test Loss: 0.35871225595474243, Test Accuracy: 86.7699966430664
Epoch 3, Loss: 0.2928692400455475, Accuracy: 89.2683334350586, Test Loss: 0.2999486029148102, Test Accuracy: 89.04000091552734
Epoch 4, Loss: 0.2605818510055542, Accuracy: 90.41999816894531, Test Loss: 0.28474125266075134, Test Accuracy: 89.47000122070312
Epoch 5, Loss: 0.23641237616539001, Accuracy: 91.32166290283203, Test Loss: 0.26421546936035156, Test Accuracy: 90.41000366210938
Epoch 6, Loss: 0.2192477434873581, Accuracy: 91.90499877929688, Test Loss: 0.2650589942932129, Test Accuracy: 90.4800033569336
Epoch 7, Loss: 0.20016911625862122, Accuracy: 92.66999816894531, Test Loss: 0.25025954842567444, Test Accuracy: 90.9000015258789
Epoch 8, Loss: 0.18381091952323914, Accuracy: 93.26499938964844, Test Loss: 0.2585820257663727, Test Accuracy: 90.95999908447266
Epoch 9, Loss: 0.1699329912662506, Accuracy: 93.67500305175781, Test Loss: 0.26234227418899536, Test Accuracy: 91.0199966430664
Epoch 10, Loss: 0.15756534039974213, Accuracy: 94.16333770751953, Test Loss: 0.25516414642333984, Test Accuracy: 90.93000030517578

На что следует обратить внимание в приведенном выше примере:

  • Мы перебираем наборы данных train_dist_dataset и test_dist_dataset , используя конструкцию for x in ...
  • Масштабированный убыток — это возвращаемое значение distributed_train_step . Это значение агрегируется по репликам с помощью вызова tf.distribute.Strategy.reduce а затем по пакетам путем суммирования возвращаемого значения вызовов tf.distribute.Strategy.reduce .
  • tf.keras.Metrics следует обновлять внутри train_step и test_step , которые tf.distribute.Strategy.run . * tf.distribute.Strategy.run возвращает результаты каждой локальной реплики в стратегии, и есть несколько способов использовать этот результат. Вы можете выполнить tf.distribute.Strategy.reduce , чтобы получить агрегированное значение. Вы также можете выполнить tf.distribute.Strategy.experimental_local_results , чтобы получить список значений, содержащихся в результате, по одному на локальную реплику.

Восстановите последнюю контрольную точку и проверьте

Модель, отмеченная контрольной точкой с помощью tf.distribute.Strategy , может быть восстановлена ​​со стратегией или без нее.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0199966430664

Альтернативные способы перебора набора данных

Использование итераторов

Если вы хотите выполнить итерацию по заданному количеству шагов, а не по всему набору данных, вы можете создать итератор, используя вызов iter и явный вызов next на итераторе. Вы можете перебирать набор данных как внутри, так и вне tf.function. Вот небольшой фрагмент, демонстрирующий итерацию набора данных вне tf.function с использованием итератора.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.17486707866191864, Accuracy: 93.4375
Epoch 10, Loss: 0.12386945635080338, Accuracy: 95.3125
Epoch 10, Loss: 0.16411852836608887, Accuracy: 93.90625
Epoch 10, Loss: 0.10728752613067627, Accuracy: 96.40625
Epoch 10, Loss: 0.11865834891796112, Accuracy: 95.625
Epoch 10, Loss: 0.12875251471996307, Accuracy: 95.15625
Epoch 10, Loss: 0.1189488023519516, Accuracy: 95.625
Epoch 10, Loss: 0.1456708014011383, Accuracy: 95.15625
Epoch 10, Loss: 0.12446556240320206, Accuracy: 95.3125
Epoch 10, Loss: 0.1380888819694519, Accuracy: 95.46875

Итерация внутри tf.function

Вы также можете перебрать весь входной train_dist_dataset внутри tf.function, используя конструкцию for x in ... или создав итераторы, как мы сделали выше. В приведенном ниже примере демонстрируется перенос одной эпохи обучения в tf.function и итерация по train_dist_dataset внутри функции.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
Epoch 1, Loss: 0.14398494362831116, Accuracy: 94.63999938964844
Epoch 2, Loss: 0.13246288895606995, Accuracy: 94.97333526611328
Epoch 3, Loss: 0.11922841519117355, Accuracy: 95.63833618164062
Epoch 4, Loss: 0.11084160208702087, Accuracy: 95.99333190917969
Epoch 5, Loss: 0.10420522093772888, Accuracy: 96.0816650390625
Epoch 6, Loss: 0.09215126931667328, Accuracy: 96.63500213623047
Epoch 7, Loss: 0.0878651961684227, Accuracy: 96.67666625976562
Epoch 8, Loss: 0.07854588329792023, Accuracy: 97.09333038330078
Epoch 9, Loss: 0.07217177003622055, Accuracy: 97.34833526611328
Epoch 10, Loss: 0.06753655523061752, Accuracy: 97.48999786376953

Отслеживание потерь при обучении в репликах

Мы не рекомендуем использовать tf.metrics.Mean для отслеживания потерь при обучении в разных репликах из-за выполняемого вычисления масштабирования потерь.

Например, если вы запускаете задание обучения со следующими характеристиками:

  • Две реплики
  • На каждой реплике обрабатываются два образца
  • Результирующие значения потерь: [2, 3] и [4, 5] на каждой реплике.
  • Глобальный размер партии = 4

При масштабировании потерь вы вычисляете значение потерь для каждого образца на каждой реплике, складывая значения потерь, а затем разделяя их на глобальный размер пакета. В этом случае: (2 + 3) / 4 = 1.25 и (4 + 5) / 4 = 2.25 .

Если вы используете tf.metrics.Mean для отслеживания потерь в двух репликах, результат будет другим. В этом примере вы получаете в total 3,50 и count 2, что приводит к total количеству / count = 1,75, когда result() вызывается для метрики. Потери, рассчитанные с помощью tf.keras.Metrics , масштабируются дополнительным коэффициентом, равным количеству синхронизированных реплик.

Руководство и примеры

Вот несколько примеров использования стратегии распределения с пользовательскими циклами обучения:

  1. Распределенное руководство по обучению
  2. Пример DenseNet с использованием MirroredStrategy .
  3. Пример BERT , обученный с использованием MirroredStrategy и TPUStrategy . Этот пример особенно полезен для понимания того, как выполнять загрузку с контрольной точки и создавать периодические контрольные точки во время распределенного обучения и т. д.
  4. Пример NCF , обученный с использованием MirroredStrategy , который можно включить с помощью флага keras_use_ctl .
  5. Пример NMT, обученный с помощью MirroredStrategy .

Дополнительные примеры перечислены в руководстве по стратегии распределения .

Следующие шаги

  • Попробуйте новый API tf.distribute.Strategy на своих моделях.
  • Посетите раздел « Производительность» в руководстве, чтобы узнать больше о других стратегиях и инструментах , которые вы можете использовать для оптимизации производительности ваших моделей TensorFlow.