Treinamento personalizado com tf.distribute.Strategy

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Este tutorial demonstra como usar tf.distribute.Strategy com loops de treinamento personalizado. Vamos treinar um modelo CNN simples no conjunto de dados MNIST de moda. O conjunto de dados de moda MNIST contém 60.000 imagens de trem de tamanho 28 x 28 e 10.000 imagens de teste de tamanho 28 x 28.

Estamos usando loops de treinamento personalizados para treinar nosso modelo porque eles nos dão flexibilidade e um maior controle sobre o treinamento. Além disso, é mais fácil depurar o modelo e o loop de treinamento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

Baixe o conjunto de dados de moda MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

Crie uma estratégia para distribuir as variáveis ​​e o gráfico

Como é que tf.distribute.MirroredStrategy trabalho de estratégia?

  • Todas as variáveis ​​e o gráfico do modelo são replicados nas réplicas.
  • A entrada é distribuída uniformemente pelas réplicas.
  • Cada réplica calcula a perda e gradientes para a entrada que recebeu.
  • Os gradientes são sincronizados em todas as réplicas somando-os.
  • Após a sincronização, a mesma atualização é feita nas cópias das variáveis ​​em cada réplica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Configurar pipeline de entrada

Exporte o gráfico e as variáveis ​​para o formato SavedModel independente de plataforma. Depois que seu modelo for salvo, você pode carregá-lo com ou sem o osciloscópio.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Crie os conjuntos de dados e distribua-os:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crie o modelo

Criar um modelo usando tf.keras.Sequential . Você também pode usar a API Model Subclassing para fazer isso.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Defina a função de perda

Normalmente, em uma única máquina com 1 GPU / CPU, a perda é dividida pelo número de exemplos no lote de entrada.

Então, como deve a perda ser calculado quando se utiliza um tf.distribute.Strategy ?

  • Por exemplo, digamos que você tenha 4 GPUs e um tamanho de lote de 64. Um lote de entrada é distribuído pelas réplicas (4 GPUs), cada réplica recebendo uma entrada de tamanho 16.

  • O modelo em cada réplica faz um passe para frente com sua respectiva entrada e calcula a perda. Agora, ao invés de dividir a perda pelo número de exemplos em sua respectiva entrada (BATCH_SIZE_PER_REPLICA = 16), a perda deve ser dividida pelo GLOBAL_BATCH_SIZE (64).

Por que fazer isso?

  • Isso precisa ser feito porque após os gradientes são calculados sobre cada réplica, eles são sincronizados através das réplicas pela soma deles.

Como fazer isso no TensorFlow?

  • Se você estiver escrevendo um loop de treinamento personalizado, como neste tutorial, você deve somar os por exemplo perdas e dividir a soma pelo GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) ou você pode usar tf.nn.compute_average_loss que leva a perda por exemplo, pesos de amostra opcionais, e GLOBAL_BATCH_SIZE como argumentos e retorna a perda dimensionado.

  • Se você estiver usando perdas de regularização em seu modelo, será necessário dimensionar o valor de perda por número de réplicas. Você pode fazer isso usando o tf.nn.scale_regularization_loss função.

  • Usando tf.reduce_mean não é recomendado. Isso divide a perda pelo tamanho real do lote por réplica, que pode variar passo a passo.

  • Esta redução e dimensionamento é feito automaticamente no keras model.compile e model.fit

  • Se usando tf.keras.losses as classes (como no exemplo a seguir), a redução de perda tem de ser explicitamente especificada para ser um dos NONE ou SUM . AUTO e SUM_OVER_BATCH_SIZE não são permitidas quando usado com tf.distribute.Strategy . AUTO não é permitida porque o usuário deve pensar explicitamente sobre o que redução eles querem ter certeza de que está correto no caso distribuído. SUM_OVER_BATCH_SIZE não é permitido porque atualmente ele só iria dividir por per tamanho de lote de réplicas, e deixar a divisão por número de réplicas para o usuário, que pode ser fácil perder. Em vez disso, pedimos ao usuário que faça a redução explicitamente.

  • Se labels é multi-dimensional, em seguida, a média do per_example_loss em todo o número de elementos em cada amostra. Por exemplo, se a forma de predictions é (batch_size, H, W, n_classes) e labels é (batch_size, H, W) , será necessário a actualização de per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Defina as métricas para rastrear perda e precisão

Essas métricas rastreiam a perda e o treinamento do teste e a precisão do teste. Você pode usar .result() para obter as estatísticas acumuladas a qualquer momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Loop de treinamento

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5044084787368774, Accuracy: 81.87333679199219, Test Loss: 0.3816865086555481, Test Accuracy: 86.5999984741211
Epoch 2, Loss: 0.3375805616378784, Accuracy: 87.8566665649414, Test Loss: 0.3369813859462738, Test Accuracy: 87.76000213623047
Epoch 3, Loss: 0.2896445095539093, Accuracy: 89.50499725341797, Test Loss: 0.299490362405777, Test Accuracy: 89.22000122070312
Epoch 4, Loss: 0.259074866771698, Accuracy: 90.58833312988281, Test Loss: 0.2881558835506439, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.2341146171092987, Accuracy: 91.38999938964844, Test Loss: 0.2916182577610016, Test Accuracy: 89.61000061035156
Epoch 6, Loss: 0.21513047814369202, Accuracy: 92.02333068847656, Test Loss: 0.2755740284919739, Test Accuracy: 89.85000610351562
Epoch 7, Loss: 0.1952667236328125, Accuracy: 92.88333129882812, Test Loss: 0.27464523911476135, Test Accuracy: 90.36000061035156
Epoch 8, Loss: 0.17831537127494812, Accuracy: 93.3566665649414, Test Loss: 0.26432710886001587, Test Accuracy: 90.19000244140625
Epoch 9, Loss: 0.16429665684700012, Accuracy: 93.85333251953125, Test Loss: 0.2659859359264374, Test Accuracy: 91.0999984741211
Epoch 10, Loss: 0.1503313183784485, Accuracy: 94.42166900634766, Test Loss: 0.2602477967739105, Test Accuracy: 91.06999969482422

Coisas a serem observadas no exemplo acima:

Restaure o último ponto de verificação e teste

Um modelo checkpointed com um tf.distribute.Strategy pode ser restaurada com ou sem uma estratégia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0999984741211

Formas alternativas de iteração em um conjunto de dados

Usando iteradores

Se você quiser iterar sobre um determinado número de passos e não através de todo o conjunto de dados que você pode criar uma iteração usando o iter de chamadas e chamada explicitamente next no iterator. Você pode escolher iterar sobre o conjunto de dados dentro e fora de tf.function. Aqui está um pequeno fragmento que demonstra a iteração do conjunto de dados fora do tf.function usando um iterador.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14126229286193848, Accuracy: 95.0
Epoch 10, Loss: 0.1343936026096344, Accuracy: 95.0
Epoch 10, Loss: 0.12443388998508453, Accuracy: 94.84375
Epoch 10, Loss: 0.1607474684715271, Accuracy: 94.21875
Epoch 10, Loss: 0.10524413734674454, Accuracy: 96.71875
Epoch 10, Loss: 0.11492376029491425, Accuracy: 96.71875
Epoch 10, Loss: 0.16041627526283264, Accuracy: 94.21875
Epoch 10, Loss: 0.13022005558013916, Accuracy: 94.6875
Epoch 10, Loss: 0.17113295197486877, Accuracy: 93.28125
Epoch 10, Loss: 0.12315043061971664, Accuracy: 95.625

Iterando dentro de um tf.function

Você também interagir sobre toda a entrada pode train_dist_dataset dentro de um tf.function usando o for x in ... construção ou criando iterators como fizemos anteriormente. O exemplo abaixo demonstra envolver um período de formação de uma tf.function e iteração sobre train_dist_dataset dentro da função.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13766956329345703, Accuracy: 94.89666748046875
Epoch 2, Loss: 0.12510614097118378, Accuracy: 95.35166931152344
Epoch 3, Loss: 0.11464647948741913, Accuracy: 95.70333099365234
Epoch 4, Loss: 0.10295023769140244, Accuracy: 96.12000274658203
Epoch 5, Loss: 0.09352775663137436, Accuracy: 96.49666595458984
Epoch 6, Loss: 0.08494547754526138, Accuracy: 96.87166595458984
Epoch 7, Loss: 0.07917638123035431, Accuracy: 97.09166717529297
Epoch 8, Loss: 0.07128290832042694, Accuracy: 97.37833404541016
Epoch 9, Loss: 0.06662175804376602, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06016768515110016, Accuracy: 97.82833099365234

Rastreamento de perda de treinamento em réplicas

Nós não recomendamos o uso tf.metrics.Mean para controlar a perda de treinamento em diferentes réplicas, por causa da computação de escala perda que é realizado.

Por exemplo, se você executar um job de treinamento com as seguintes características:

  • Duas réplicas
  • Duas amostras são processadas em cada réplica
  • Valores de perda resultantes: [2, 3] e [4, 5] em cada réplica
  • Tamanho do lote global = 4

Com a escala de perda, você calcula o valor de perda por amostra em cada réplica adicionando os valores de perda e, em seguida, dividindo pelo tamanho do lote global. Neste caso: (2 + 3) / 4 = 1.25 e (4 + 5) / 4 = 2.25 .

Se você usar tf.metrics.Mean para controlar a perda através das duas réplicas, o resultado é diferente. Neste exemplo, é acabar com um total de 3,50 e count de 2, o que resulta em total / count = 1,75 quando result() é chamado na métrica. Perda calculada com tf.keras.Metrics é escalado por um fator adicional que é igual ao número de réplicas em sincronia.

Guia e exemplos

Aqui estão alguns exemplos de uso de estratégia de distribuição com loops de treinamento personalizados:

  1. Guia de treinamento distribuído
  2. DenseNet exemplo usando MirroredStrategy .
  3. BERT exemplo treinado usando MirroredStrategy e TPUStrategy . Este exemplo é particularmente útil para entender como carregar de um ponto de verificação e gerar pontos de verificação periódicos durante o treinamento distribuído, etc.
  4. NCF exemplo treinado usando MirroredStrategy que pode ser ativado usando o keras_use_ctl bandeira.
  5. NMT exemplo treinado usando MirroredStrategy .

Mais exemplos listados na guia de estratégia de distribuição .

Próximos passos