Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

Entrenamiento personalizado con tf.distribute.Strategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Este tutorial muestra cómo utilizar tf.distribute.Strategy con bucles de formación personalizado. Entrenaremos un modelo CNN simple en el conjunto de datos de moda MNIST. El conjunto de datos de moda MNIST contiene 60000 imágenes de trenes de tamaño 28 x 28 y 10000 imágenes de prueba de tamaño 28 x 28.

Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos dan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

Descargue el conjunto de datos de moda MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

Crea una estrategia para distribuir las variables y el gráfico.

¿Cómo tf.distribute.MirroredStrategy trabajo de la estrategia?

  • Todas las variables y el gráfico del modelo se replican en las réplicas.
  • La entrada se distribuye uniformemente entre las réplicas.
  • Cada réplica calcula la pérdida y los gradientes de la entrada que recibió.
  • Los gradientes se sincronizan en todas las réplicas al sumarlos.
  • Después de la sincronización, se realiza la misma actualización a las copias de las variables en cada réplica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Configurar canalización de entrada

Exporte el gráfico y las variables al formato de modelo guardado independiente de la plataforma. Después de guardar su modelo, puede cargarlo con o sin el alcance.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Cree los conjuntos de datos y distribúyalos:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crea el modelo

Crear un modelo utilizando tf.keras.Sequential . También puede utilizar la API de subclases de modelos para hacer esto.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definir la función de pérdida

Normalmente, en una sola máquina con 1 GPU / CPU, la pérdida se divide por la cantidad de ejemplos en el lote de entrada.

Entonces, ¿cómo debe calcularse la pérdida cuando se utiliza un tf.distribute.Strategy ?

  • Por ejemplo, digamos que tiene 4 GPU y un tamaño de lote de 64. Un lote de entrada se distribuye entre las réplicas (4 GPU), y cada réplica recibe una entrada de tamaño 16.

  • El modelo de cada réplica hace un pase hacia adelante con su entrada respectiva y calcula la pérdida. Ahora, en lugar de dividir la pérdida por el número de ejemplos en su entrada respectiva (BATCH_SIZE_PER_REPLICA = 16), la pérdida debe dividirse por GLOBAL_BATCH_SIZE (64).

¿Por qué hacer esto?

  • Esto tiene que ser hecho porque después de los gradientes se calculan en cada réplica, que se sincronizan a través de las réplicas sumando ellos.

¿Cómo hacer esto en TensorFlow?

  • Si estás escribiendo un bucle de formación a medida, como en este tutorial, debe sumar las pérdidas por ejemplo, y dividir la suma por el GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) o puede utilizar tf.nn.compute_average_loss que toma la pérdida por ejemplo, pesos de la muestra opcionales, y GLOBAL_BATCH_SIZE como argumentos y devuelve la pérdida de escala.

  • Si está utilizando pérdidas de regularización en su modelo, debe escalar el valor de pérdida por número de réplicas. Usted puede hacer esto mediante el uso de la tf.nn.scale_regularization_loss función.

  • Usando tf.reduce_mean no se recomienda. Al hacerlo, se divide la pérdida por el tamaño de lote real por réplica, que puede variar de un paso a otro.

  • Esta reducción y la ampliación se realiza automáticamente en Keras model.compile y model.fit

  • Si se utiliza tf.keras.losses clases (como en el ejemplo a continuación), la reducción de la pérdida necesita ser especificada explícitamente a ser uno de NONE o SUM . AUTO y SUM_OVER_BATCH_SIZE no están permitidos cuando se utiliza con tf.distribute.Strategy . AUTO no está permitido porque el usuario debe pensar de manera explícita acerca de lo que la reducción quieren asegurarse de que es correcta en el caso distribuido. SUM_OVER_BATCH_SIZE no está permitido ya que de momento sólo se dividiría por Per réplica de tamaño del lote, y dejar la división por el número de réplicas para el usuario, que puede ser fácil pasar por alto. Entonces, en cambio, le pedimos al usuario que haga la reducción él mismo explícitamente.

  • Si labels es multidimensional, a continuación, promediar la per_example_loss a través del número de elementos en cada muestra. Por ejemplo, si la forma de predictions es (batch_size, H, W, n_classes) y labels es (batch_size, H, W) , que tendrá que actualizar per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Defina las métricas para realizar un seguimiento de la pérdida y la precisión

Estas métricas rastrean la pérdida de la prueba y la precisión del entrenamiento y la prueba. Puede utilizar .result() para obtener las estadísticas acumuladas en cualquier momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Bucle de entrenamiento

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5044084787368774, Accuracy: 81.87333679199219, Test Loss: 0.3816865086555481, Test Accuracy: 86.5999984741211
Epoch 2, Loss: 0.3375805616378784, Accuracy: 87.8566665649414, Test Loss: 0.3369813859462738, Test Accuracy: 87.76000213623047
Epoch 3, Loss: 0.2896445095539093, Accuracy: 89.50499725341797, Test Loss: 0.299490362405777, Test Accuracy: 89.22000122070312
Epoch 4, Loss: 0.259074866771698, Accuracy: 90.58833312988281, Test Loss: 0.2881558835506439, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.2341146171092987, Accuracy: 91.38999938964844, Test Loss: 0.2916182577610016, Test Accuracy: 89.61000061035156
Epoch 6, Loss: 0.21513047814369202, Accuracy: 92.02333068847656, Test Loss: 0.2755740284919739, Test Accuracy: 89.85000610351562
Epoch 7, Loss: 0.1952667236328125, Accuracy: 92.88333129882812, Test Loss: 0.27464523911476135, Test Accuracy: 90.36000061035156
Epoch 8, Loss: 0.17831537127494812, Accuracy: 93.3566665649414, Test Loss: 0.26432710886001587, Test Accuracy: 90.19000244140625
Epoch 9, Loss: 0.16429665684700012, Accuracy: 93.85333251953125, Test Loss: 0.2659859359264374, Test Accuracy: 91.0999984741211
Epoch 10, Loss: 0.1503313183784485, Accuracy: 94.42166900634766, Test Loss: 0.2602477967739105, Test Accuracy: 91.06999969482422

Cosas a tener en cuenta en el ejemplo anterior:

Restaurar el último punto de control y prueba

Un modelo el punto de control con un tf.distribute.Strategy puede ser restaurado con o sin una estrategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0999984741211

Formas alternativas de iterar sobre un conjunto de datos

Usando iteradores

Si desea repetir un número determinado de pasos y no a través de todo el conjunto de datos se puede crear un iterador utilizando el iter llamada y llamada explícita next en el iterador. Puede elegir iterar sobre el conjunto de datos tanto dentro como fuera de la función tf. Aquí hay un pequeño fragmento que demuestra la iteración del conjunto de datos fuera de la función tf.function usando un iterador.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14126229286193848, Accuracy: 95.0
Epoch 10, Loss: 0.1343936026096344, Accuracy: 95.0
Epoch 10, Loss: 0.12443388998508453, Accuracy: 94.84375
Epoch 10, Loss: 0.1607474684715271, Accuracy: 94.21875
Epoch 10, Loss: 0.10524413734674454, Accuracy: 96.71875
Epoch 10, Loss: 0.11492376029491425, Accuracy: 96.71875
Epoch 10, Loss: 0.16041627526283264, Accuracy: 94.21875
Epoch 10, Loss: 0.13022005558013916, Accuracy: 94.6875
Epoch 10, Loss: 0.17113295197486877, Accuracy: 93.28125
Epoch 10, Loss: 0.12315043061971664, Accuracy: 95.625

Iterando dentro de una función tf.

También iterar sobre la entrada completa puede train_dist_dataset dentro de un tf.function utilizando la for x in ... constructo o mediante la creación de iteradores como lo hicimos anteriormente. El siguiente ejemplo demuestra envolver una época de formación en un tf.function y la iteración en train_dist_dataset dentro de la función.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13766956329345703, Accuracy: 94.89666748046875
Epoch 2, Loss: 0.12510614097118378, Accuracy: 95.35166931152344
Epoch 3, Loss: 0.11464647948741913, Accuracy: 95.70333099365234
Epoch 4, Loss: 0.10295023769140244, Accuracy: 96.12000274658203
Epoch 5, Loss: 0.09352775663137436, Accuracy: 96.49666595458984
Epoch 6, Loss: 0.08494547754526138, Accuracy: 96.87166595458984
Epoch 7, Loss: 0.07917638123035431, Accuracy: 97.09166717529297
Epoch 8, Loss: 0.07128290832042694, Accuracy: 97.37833404541016
Epoch 9, Loss: 0.06662175804376602, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06016768515110016, Accuracy: 97.82833099365234

Seguimiento de la pérdida de entrenamiento en las réplicas

No recomendamos el uso de tf.metrics.Mean para rastrear la pérdida de la formación a través de diferentes réplicas, debido a la pérdida de cálculos de escalamiento que se lleva a cabo.

Por ejemplo, si ejecuta un trabajo de entrenamiento con las siguientes características:

  • Dos réplicas
  • Se procesan dos muestras en cada réplica
  • Valores de pérdida resultantes: [2, 3] y [4, 5] en cada réplica
  • Tamaño de lote global = 4

Con la escala de pérdida, calcula el valor de pérdida por muestra en cada réplica sumando los valores de pérdida y luego dividiendo por el tamaño del lote global. En este caso: (2 + 3) / 4 = 1.25 y (4 + 5) / 4 = 2.25 .

Si utiliza tf.metrics.Mean para rastrear la pérdida a través de las dos réplicas, el resultado es diferente. En este ejemplo, se termina con un total de 3,50 y count de 2, lo que resulta en total / count = 1,75 cuando result() se llama en la métrica. La pérdida calculada con tf.keras.Metrics es escalado por un factor adicional que es igual al número de réplicas en sincronía.

Guía y ejemplos

A continuación, se muestran algunos ejemplos del uso de la estrategia de distribución con ciclos de entrenamiento personalizados:

  1. Guía de formación distribuida
  2. DenseNet ejemplo usando MirroredStrategy .
  3. BERT ejemplo entrenado usando MirroredStrategy y TPUStrategy . Este ejemplo es particularmente útil para comprender cómo cargar desde un punto de control y generar puntos de control periódicos durante la capacitación distribuida, etc.
  4. NCF ejemplo entrenado usando MirroredStrategy que se pueden activar mediante el keras_use_ctl bandera.
  5. NMT ejemplo entrenado usando MirroredStrategy .

Más ejemplos enumerados en la guía de la estrategia de distribución .

Próximos pasos