¡Google I / O regresa del 18 al 20 de mayo! Reserva espacio y crea tu horario Regístrate ahora

Entrenamiento personalizado con tf.distribute.Strategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Este tutorial demuestra cómo usar tf.distribute.Strategy con ciclos de entrenamiento personalizados. Entrenaremos un modelo CNN simple en el conjunto de datos de moda MNIST. El conjunto de datos de moda MNIST contiene 60000 imágenes de trenes de tamaño 28 x 28 y 10000 imágenes de prueba de tamaño 28 x 28.

Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos dan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.1

Descargue el conjunto de datos de moda MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 0s 0us/step

Crea una estrategia para distribuir las variables y el gráfico.

¿Cómo tf.distribute.MirroredStrategy estrategia tf.distribute.MirroredStrategy ?

  • Todas las variables y el gráfico del modelo se replican en las réplicas.
  • La entrada se distribuye uniformemente entre las réplicas.
  • Cada réplica calcula la pérdida y los gradientes de la entrada que recibió.
  • Los gradientes se sincronizan en todas las réplicas al sumarlos.
  • Después de la sincronización, se realiza la misma actualización a las copias de las variables en cada réplica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Configurar canalización de entrada

Exporte el gráfico y las variables al formato de modelo guardado independiente de la plataforma. Después de guardar su modelo, puede cargarlo con o sin el alcance.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Cree los conjuntos de datos y distribúyalos:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crea el modelo

Cree un modelo usando tf.keras.Sequential . También puede utilizar la API de subclases de modelos para hacer esto.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definir la función de pérdida

Normalmente, en una sola máquina con 1 GPU / CPU, la pérdida se divide por la cantidad de ejemplos en el lote de entrada.

Entonces, ¿cómo se debe calcular la pérdida cuando se usa un tf.distribute.Strategy ?

  • Por ejemplo, digamos que tiene 4 GPU y un tamaño de lote de 64. Un lote de entrada se distribuye entre las réplicas (4 GPU), y cada réplica recibe una entrada de tamaño 16.

  • El modelo de cada réplica hace un pase hacia adelante con su entrada respectiva y calcula la pérdida. Ahora, en lugar de dividir la pérdida por el número de ejemplos en su entrada respectiva (BATCH_SIZE_PER_REPLICA = 16), la pérdida debe dividirse por GLOBAL_BATCH_SIZE (64).

¿Por qué hacer esto?

  • Esto debe hacerse porque después de que se calculan los gradientes en cada réplica, se sincronizan entre las réplicas al sumarlas .

¿Cómo hacer esto en TensorFlow?

  • Si está escribiendo un ciclo de entrenamiento personalizado, como en este tutorial, debe sumar las pérdidas por ejemplo y dividir la suma por GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) o puede usar tf.nn.compute_average_loss que toma la pérdida por ejemplo, pesos de muestra opcionales y GLOBAL_BATCH_SIZE como argumentos y devuelve la pérdida escalada.

  • Si está utilizando pérdidas de regularización en su modelo, debe escalar el valor de pérdida por número de réplicas. Puede hacer esto usando la función tf.nn.scale_regularization_loss .

  • tf.reduce_mean se recomienda el uso de tf.reduce_mean . Al hacerlo, se divide la pérdida por el tamaño de lote real por réplica, que puede variar de un paso a otro.

  • Esta reducción y escalado se realiza automáticamente en keras model.compile y model.fit

  • Si usatf.keras.losses clasestf.keras.losses (como en el ejemplo a continuación), la reducción de pérdida debe especificarse explícitamente para que sea NONE o SUM . AUTO y SUM_OVER_BATCH_SIZE no están permitidos cuando se usan con tf.distribute.Strategy . AUTO no está permitido porque el usuario debe pensar explícitamente en qué reducción desea para asegurarse de que sea correcta en el caso distribuido. SUM_OVER_BATCH_SIZE no está permitido porque actualmente solo dividiría por el tamaño de lote de réplicas y dejaría la división por el número de réplicas al usuario, lo que podría ser fácil de pasar por alto. Entonces, en cambio, le pedimos al usuario que haga la reducción él mismo explícitamente.

  • Si las labels son multidimensionales, per_example_loss la per_example_loss entre el número de elementos de cada muestra. Por ejemplo, si la forma de las predictions es (batch_size, H, W, n_classes) y las labels son (batch_size, H, W) , deberá actualizar per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Defina las métricas para realizar un seguimiento de la pérdida y la precisión

Estas métricas rastrean la pérdida de la prueba y la precisión del entrenamiento y la prueba. Puede utilizar .result() para obtener las estadísticas acumuladas en cualquier momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Bucle de entrenamiento

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5113666653633118, Accuracy: 81.77666473388672, Test Loss: 0.44048795104026794, Test Accuracy: 84.75999450683594
Epoch 2, Loss: 0.34178727865219116, Accuracy: 87.7300033569336, Test Loss: 0.33915770053863525, Test Accuracy: 88.06999969482422
Epoch 3, Loss: 0.2983285188674927, Accuracy: 89.19332885742188, Test Loss: 0.3055931031703949, Test Accuracy: 88.91000366210938
Epoch 4, Loss: 0.2684659957885742, Accuracy: 90.25333404541016, Test Loss: 0.29095324873924255, Test Accuracy: 89.55000305175781
Epoch 5, Loss: 0.24288544058799744, Accuracy: 91.15666961669922, Test Loss: 0.2943626642227173, Test Accuracy: 89.63999938964844
Epoch 6, Loss: 0.2228754460811615, Accuracy: 91.88333129882812, Test Loss: 0.2746399939060211, Test Accuracy: 90.02999877929688
Epoch 7, Loss: 0.2049170583486557, Accuracy: 92.5050048828125, Test Loss: 0.26037952303886414, Test Accuracy: 90.80000305175781
Epoch 8, Loss: 0.1883457750082016, Accuracy: 93.11333465576172, Test Loss: 0.2568276524543762, Test Accuracy: 90.9000015258789
Epoch 9, Loss: 0.17290380597114563, Accuracy: 93.61000061035156, Test Loss: 0.2655820846557617, Test Accuracy: 90.55000305175781
Epoch 10, Loss: 0.15979470312595367, Accuracy: 94.12000274658203, Test Loss: 0.2730286419391632, Test Accuracy: 90.52999877929688

Cosas a tener en cuenta en el ejemplo anterior:

Restaurar el último punto de control y prueba

Un modelo tf.distribute.Strategy con un tf.distribute.Strategy se puede restaurar con o sin una estrategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 90.55000305175781

Formas alternativas de iterar sobre un conjunto de datos

Usando iteradores

Si desea iterar sobre un número determinado de pasos y no a través de todo el conjunto de datos, puede crear un iterador utilizando la llamada iter y la next llamada explícita en el iterador. Puede elegir iterar sobre el conjunto de datos tanto dentro como fuera de la función tf. Aquí hay un pequeño fragmento que demuestra la iteración del conjunto de datos fuera de la función tf.function usando un iterador.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.13083496689796448, Accuracy: 95.0
Epoch 10, Loss: 0.13865676522254944, Accuracy: 94.375
Epoch 10, Loss: 0.12255267798900604, Accuracy: 95.78125
Epoch 10, Loss: 0.16597780585289001, Accuracy: 94.375
Epoch 10, Loss: 0.1374451071023941, Accuracy: 94.6875
Epoch 10, Loss: 0.11878655105829239, Accuracy: 95.3125
Epoch 10, Loss: 0.1515883207321167, Accuracy: 93.75
Epoch 10, Loss: 0.14494936168193817, Accuracy: 95.46875
Epoch 10, Loss: 0.16899390518665314, Accuracy: 94.375
Epoch 10, Loss: 0.12619170546531677, Accuracy: 95.3125

Iterando dentro de una función tf.

También puede iterar sobre todo el train_dist_dataset entrada dentro de un tf.function usando la construcción for x in ... o creando iteradores como hicimos anteriormente. El siguiente ejemplo demuestra cómo envolver una época de entrenamiento en una función tf.function e iterar sobre train_dist_dataset dentro de la función.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.1478981077671051, Accuracy: 94.64833068847656
Epoch 2, Loss: 0.13465435802936554, Accuracy: 95.04833221435547
Epoch 3, Loss: 0.12460654228925705, Accuracy: 95.43000030517578
Epoch 4, Loss: 0.11515416204929352, Accuracy: 95.80000305175781
Epoch 5, Loss: 0.10600292682647705, Accuracy: 96.06999969482422
Epoch 6, Loss: 0.10013402998447418, Accuracy: 96.30500030517578
Epoch 7, Loss: 0.09145957976579666, Accuracy: 96.60833740234375
Epoch 8, Loss: 0.08386243134737015, Accuracy: 96.88999938964844
Epoch 9, Loss: 0.07785602658987045, Accuracy: 97.06500244140625
Epoch 10, Loss: 0.0744374692440033, Accuracy: 97.19166564941406

Seguimiento de la pérdida de entrenamiento en las réplicas

No recomendamos el uso de tf.metrics.Mean para rastrear la pérdida de la formación a través de diferentes réplicas, debido a la pérdida de cálculos de escalamiento que se lleva a cabo.

Por ejemplo, si ejecuta un trabajo de entrenamiento con las siguientes características:

  • Dos réplicas
  • Se procesan dos muestras en cada réplica
  • Valores de pérdida resultantes: [2, 3] y [4, 5] en cada réplica
  • Tamaño de lote global = 4

Con el escalado de pérdidas, se calcula el valor de pérdida por muestra en cada réplica sumando los valores de pérdida y luego dividiendo por el tamaño del lote global. En este caso: (2 + 3) / 4 = 1.25 y (4 + 5) / 4 = 2.25 .

Si usa tf.metrics.Mean para rastrear la pérdida en las dos réplicas, el resultado es diferente. En este ejemplo, termina con un total de 3.50 y count de 2, lo que da como resultado total / count = 1.75 cuando se llama a result() en la métrica. La pérdida calculada con tf.keras.Metrics se escala mediante un factor adicional que es igual al número de réplicas sincronizadas.

Guía y ejemplos

A continuación, se muestran algunos ejemplos del uso de la estrategia de distribución con ciclos de entrenamiento personalizados:

  1. Guía de formación distribuida
  2. Ejemplo de DenseNet usando MirroredStrategy .
  3. Ejemplo de BERT entrenado con MirroredStrategy y TPUStrategy . Este ejemplo es particularmente útil para comprender cómo cargar desde un punto de control y generar puntos de control periódicos durante la capacitación distribuida, etc.
  4. Ejemplo de NCF entrenado usando MirroredStrategy que se puede habilitar usando el indicador keras_use_ctl .
  5. Ejemplo de NMT entrenado con MirroredStrategy .

Más ejemplos enumerados en la guía de estrategia de distribución .

Próximos pasos