Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Entrenamiento personalizado con tf.distribute.Strategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver código fuente en GitHub Descargar cuaderno

Este tutorial muestra cómo usar tf.distribute.Strategy con bucles de entrenamiento personalizados. Entrenaremos un modelo CNN simple en el conjunto de datos MNIST de moda. El conjunto de datos de moda MNIST contiene 60000 imágenes de trenes de tamaño 28 x 28 y 10000 imágenes de prueba de tamaño 28 x 28.

Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos dan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento.

 # Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.2.0

Descargue el conjunto de datos de moda MNIST

 fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 0s 0us/step

Crear una estrategia para distribuir las variables y el gráfico.

¿Cómo tf.distribute.MirroredStrategy estrategia tf.distribute.MirroredStrategy ?

  • Todas las variables y el gráfico del modelo se replican en las réplicas.
  • La entrada se distribuye uniformemente entre las réplicas.
  • Cada réplica calcula la pérdida y los gradientes para la entrada que recibió.
  • Los gradientes se sincronizan en todas las réplicas al sumarlas.
  • Después de la sincronización, se realiza la misma actualización a las copias de las variables en cada réplica.
 # If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

 print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
 
Number of devices: 1

Configurar canalización de entrada

Exporte el gráfico y las variables al formato Guardado de modelo agnóstico de plataforma. Después de guardar su modelo, puede cargarlo con o sin el alcance.

 BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10
 

Cree los conjuntos de datos y distribúyalos:

 train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
 

Crea el modelo

Cree un modelo usando tf.keras.Sequential . También puede usar la API de subclases de modelos para hacer esto.

 def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
 
 # Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
 

Definir la función de pérdida.

Normalmente, en una sola máquina con 1 GPU / CPU, la pérdida se divide por el número de ejemplos en el lote de entrada.

Entonces, ¿cómo debe calcularse la pérdida cuando se usa una tf.distribute.Strategy ?

  • Por ejemplo, supongamos que tiene 4 GPU y un tamaño de lote de 64. Un lote de entrada se distribuye entre las réplicas (4 GPU), cada réplica recibe una entrada de tamaño 16.

  • El modelo en cada réplica realiza un pase directo con su entrada respectiva y calcula la pérdida. Ahora, en lugar de dividir la pérdida por el número de ejemplos en su entrada respectiva (BATCH_SIZE_PER_REPLICA = 16), la pérdida se debe dividir por GLOBAL_BATCH_SIZE (64).

¿Por qué hacer esto?

  • Esto debe hacerse porque después de calcular los gradientes en cada réplica, se sincronizan entre las réplicas al sumarlas .

¿Cómo hacer esto en TensorFlow?

  • Si está escribiendo un ciclo de entrenamiento personalizado, como en este tutorial, debe sumar las pérdidas por ejemplo y dividir la suma entre GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) o puede usar tf.nn.compute_average_loss que toma la pérdida por ejemplo, los pesos de muestra opcionales y GLOBAL_BATCH_SIZE como argumentos y devuelve la pérdida escalada.

  • Si está utilizando pérdidas de regularización en su modelo, entonces necesita escalar el valor de la pérdida por número de réplicas. Puede hacerlo utilizando la función tf.nn.scale_regularization_loss .

  • tf.reduce_mean se recomienda usar tf.reduce_mean . Al hacerlo, se divide la pérdida por el tamaño de lote real por réplica, que puede variar paso a paso.

  • Esta reducción y escala se realiza automáticamente en keras model.compile y model.fit

  • Si utiliza tf.keras.losses clases tf.keras.losses (como en el ejemplo a continuación), la reducción de pérdida debe especificarse explícitamente para que sea NONE o SUM . AUTO y SUM_OVER_BATCH_SIZE no están permitidos cuando se usan con tf.distribute.Strategy . AUTO no está permitido porque el usuario debe pensar explícitamente en qué reducción quiere asegurarse de que sea correcta en el caso distribuido. SUM_OVER_BATCH_SIZE no está permitido porque actualmente solo se dividiría por tamaño de lote por réplica, y dejaría la división entre el número de réplicas para el usuario, lo que podría ser fácil de omitir. Por lo tanto, le pedimos al usuario que haga la reducción de manera explícita.

  • Si las labels son multidimensionales, per_example_loss el per_example_loss través del número de elementos en cada muestra. Por ejemplo, si la forma de las predictions es (batch_size, H, W, n_classes) y las labels son (batch_size, H, W) , deberá actualizar per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

 with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
 

Defina las métricas para rastrear la pérdida y la precisión.

Estas métricas rastrean la pérdida de la prueba y el entrenamiento y la precisión de la prueba. Puede usar .result() para obtener las estadísticas acumuladas en cualquier momento.

 with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
 

Circuito de entrenamiento

 # model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
 
 def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
 
 # `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
 
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.5121287107467651, Accuracy: 81.44499969482422, Test Loss: 0.4022502303123474, Test Accuracy: 85.36000061035156
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.33714210987091064, Accuracy: 87.89167022705078, Test Loss: 0.3380376994609833, Test Accuracy: 87.9000015258789
Epoch 3, Loss: 0.2912210524082184, Accuracy: 89.28166961669922, Test Loss: 0.29934415221214294, Test Accuracy: 88.97000122070312
Epoch 4, Loss: 0.25845447182655334, Accuracy: 90.49666595458984, Test Loss: 0.2924875319004059, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.23627422749996185, Accuracy: 91.37667083740234, Test Loss: 0.2735322117805481, Test Accuracy: 89.72000122070312
Epoch 6, Loss: 0.217362642288208, Accuracy: 91.90666961669922, Test Loss: 0.2779887914657593, Test Accuracy: 89.55000305175781
Epoch 7, Loss: 0.19758468866348267, Accuracy: 92.77666473388672, Test Loss: 0.2544868290424347, Test Accuracy: 90.58000183105469
Epoch 8, Loss: 0.18331800401210785, Accuracy: 93.22000122070312, Test Loss: 0.27537408471107483, Test Accuracy: 90.30000305175781
Epoch 9, Loss: 0.16616544127464294, Accuracy: 93.92832946777344, Test Loss: 0.27181997895240784, Test Accuracy: 89.74000549316406
Epoch 10, Loss: 0.15189047157764435, Accuracy: 94.3499984741211, Test Loss: 0.29621848464012146, Test Accuracy: 89.8800048828125

Cosas a tener en cuenta en el ejemplo anterior:

Restaurar el último punto de control y prueba

Un modelo tf.distribute.Strategy con un tf.distribute.Strategy se puede restaurar con o sin una estrategia.

 eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
 
 @tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
 
 checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
 
Accuracy after restoring the saved model without strategy: 89.74000549316406

Formas alternativas de iterar sobre un conjunto de datos

Usando iteradores

Si desea iterar sobre un número dado de pasos y no a través de todo el conjunto de datos, puede crear un iterador usando la llamada iter y la llamada explícita a next en el iterador. Puede elegir iterar sobre el conjunto de datos tanto dentro como fuera de la función tf. Aquí hay un pequeño fragmento que muestra la iteración del conjunto de datos fuera de la función tf.funcionando un iterador.

 for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
 
Epoch 10, Loss: 0.15917226672172546, Accuracy: 95.0
Epoch 10, Loss: 0.12831072509288788, Accuracy: 95.3125
Epoch 10, Loss: 0.1469763219356537, Accuracy: 94.21875
Epoch 10, Loss: 0.1432376652956009, Accuracy: 94.53125
Epoch 10, Loss: 0.10891322791576385, Accuracy: 96.5625
Epoch 10, Loss: 0.11562182009220123, Accuracy: 95.46875
Epoch 10, Loss: 0.12230901420116425, Accuracy: 95.625
Epoch 10, Loss: 0.149966761469841, Accuracy: 94.21875
Epoch 10, Loss: 0.13360166549682617, Accuracy: 94.6875
Epoch 10, Loss: 0.13077859580516815, Accuracy: 95.0

Iterando dentro de una función tf.

También puede iterar sobre toda la entrada train_dist_dataset dentro de una función tf usando la construcción for x in ... o creando iteradores como lo hicimos anteriormente. El ejemplo a continuación muestra cómo completar una época de entrenamiento en una función tf e iterar sobre train_dist_dataset dentro de la función.

 @tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
 
Epoch 1, Loss: 0.1388315111398697, Accuracy: 94.83000183105469
Epoch 2, Loss: 0.1286177784204483, Accuracy: 95.1433334350586
Epoch 3, Loss: 0.11653201282024384, Accuracy: 95.70000457763672
Epoch 4, Loss: 0.1051444411277771, Accuracy: 96.06666564941406
Epoch 5, Loss: 0.09490712732076645, Accuracy: 96.46833801269531
Epoch 6, Loss: 0.08746836334466934, Accuracy: 96.77166748046875
Epoch 7, Loss: 0.07981529831886292, Accuracy: 97.04833984375
Epoch 8, Loss: 0.07256246358156204, Accuracy: 97.28666687011719
Epoch 9, Loss: 0.06466992199420929, Accuracy: 97.63333129882812
Epoch 10, Loss: 0.06099675968289375, Accuracy: 97.73833465576172

Seguimiento de la pérdida de entrenamiento en réplicas

No recomendamos el uso de tf.metrics.Mean para rastrear la pérdida de la formación a través de diferentes réplicas, debido a la pérdida de cálculos de escalamiento que se lleva a cabo.

Por ejemplo, si ejecuta un trabajo de capacitación con las siguientes características:

  • Dos réplicas
  • Se procesan dos muestras en cada réplica
  • Valores de pérdida resultantes: [2, 3] y [4, 5] en cada réplica
  • Tamaño de lote global = 4

Con la escala de pérdida, calcula el valor de pérdida por muestra en cada réplica al agregar los valores de pérdida y luego dividirlos por el tamaño de lote global. En este caso: (2 + 3) / 4 = 1.25 y (4 + 5) / 4 = 2.25 .

Si usa tf.metrics.Mean para rastrear la pérdida en las dos réplicas, el resultado es diferente. En este ejemplo, terminas con un total de 3.50 y un count de 2, lo que resulta en total / count = 1.75 cuando se llama a result() en la métrica. La pérdida calculada con tf.keras.Metrics se escala por un factor adicional que es igual al número de réplicas sincronizadas.

Guía y ejemplos

Aquí hay algunos ejemplos para usar la estrategia de distribución con bucles de entrenamiento personalizados:

  1. Guía de entrenamiento distribuido
  2. Ejemplo de DenseNet usando MirroredStrategy .
  3. Ejemplo de BERT entrenado usando MirroredStrategy y TPUStrategy . Este ejemplo es particularmente útil para comprender cómo cargar desde un punto de control y generar puntos de control periódicos durante el entrenamiento distribuido, etc.
  4. Ejemplo de NCF entrenado usando MirroredStrategy que se puede habilitar usando el indicador keras_use_ctl .
  5. Ejemplo de NMT entrenado usando MirroredStrategy .

Más ejemplos listados en la Guía de estrategia de distribución .

Próximos pasos