Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Entrenamiento personalizado con tf.distribute.Strategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Este tutorial demuestra cómo usar tf.distribute.Strategy con ciclos de entrenamiento personalizados. Entrenaremos un modelo CNN simple en el conjunto de datos de moda MNIST. El conjunto de datos de moda MNIST contiene 60000 imágenes de trenes de tamaño 28 x 28 y 10000 imágenes de prueba de tamaño 28 x 28.

Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos dan flexibilidad y un mayor control del entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

Descargue el conjunto de datos de moda MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 1s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 1s 0us/step

Crea una estrategia para distribuir las variables y el gráfico.

¿Cómo tf.distribute.MirroredStrategy estrategia tf.distribute.MirroredStrategy ?

  • Todas las variables y el gráfico del modelo se replican en las réplicas.
  • La entrada se distribuye uniformemente entre las réplicas.
  • Cada réplica calcula la pérdida y los gradientes de la entrada que recibió.
  • Los gradientes se sincronizan en todas las réplicas al sumarlos.
  • Después de la sincronización, se realiza la misma actualización a las copias de las variables en cada réplica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Configurar canalización de entrada

Exporte el gráfico y las variables al formato de modelo guardado independiente de la plataforma. Después de guardar su modelo, puede cargarlo con o sin el alcance.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Cree los conjuntos de datos y distribúyalos:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crea el modelo

Cree un modelo usando tf.keras.Sequential . También puede utilizar la API de subclases de modelos para hacer esto.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definir la función de pérdida

Normalmente, en una sola máquina con 1 GPU / CPU, la pérdida se divide por el número de ejemplos en el lote de entrada.

Entonces, ¿cómo se debe calcular la pérdida cuando se usa un tf.distribute.Strategy ?

  • Por ejemplo, digamos que tiene 4 GPU y un tamaño de lote de 64. Un lote de entrada se distribuye entre las réplicas (4 GPU), y cada réplica recibe una entrada de tamaño 16.

  • El modelo de cada réplica hace un pase hacia adelante con su entrada respectiva y calcula la pérdida. Ahora, en lugar de dividir la pérdida por el número de ejemplos en su entrada respectiva (BATCH_SIZE_PER_REPLICA = 16), la pérdida debe dividirse por GLOBAL_BATCH_SIZE (64).

¿Por qué hacer esto?

  • Esto debe hacerse porque después de calcular los gradientes en cada réplica, se sincronizan entre las réplicas sumándolas .

¿Cómo hacer esto en TensorFlow?

  • Si está escribiendo un ciclo de entrenamiento personalizado, como en este tutorial, debe sumar las pérdidas por ejemplo y dividir la suma por GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) o puede usar tf.nn.compute_average_loss que toma la pérdida por ejemplo, los pesos de muestra opcionales y GLOBAL_BATCH_SIZE como argumentos y devuelve la pérdida escalada.

  • Si está usando pérdidas de regularización en su modelo, entonces necesita escalar el valor de pérdida por número de réplicas. Puede hacer esto usando la función tf.nn.scale_regularization_loss .

  • tf.reduce_mean se recomienda el uso de tf.reduce_mean . Al hacerlo, se divide la pérdida entre el tamaño real del lote por réplica, que puede variar de un paso a otro.

  • Esta reducción y escalado se realiza automáticamente en keras model.compile y model.fit

  • Si usa tf.keras.losses clases tf.keras.losses (como en el ejemplo a continuación), la reducción de pérdida debe especificarse explícitamente para que sea NONE o SUM . AUTO y SUM_OVER_BATCH_SIZE no están permitidos cuando se usan con tf.distribute.Strategy . AUTO no está permitido porque el usuario debe pensar explícitamente en la reducción que desea para asegurarse de que sea correcta en el caso distribuido. SUM_OVER_BATCH_SIZE no está permitido porque actualmente solo dividiría por tamaño de lote de réplica y dejaría la división por el número de réplicas al usuario, lo que podría ser fácil de pasar por alto. Entonces, en cambio, le pedimos al usuario que haga la reducción él mismo explícitamente.

  • Si las labels son multidimensionales, per_example_loss la per_example_loss entre el número de elementos de cada muestra. Por ejemplo, si la forma de las predictions es (batch_size, H, W, n_classes) y las labels son (batch_size, H, W) , necesitará actualizar per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Defina las métricas para rastrear la pérdida y la precisión

Estas métricas rastrean la pérdida de la prueba y el entrenamiento y la precisión de la prueba. Puede utilizar .result() para obtener las estadísticas acumuladas en cualquier momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

Bucle de entrenamiento

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.50295090675354, Accuracy: 82.1116714477539, Test Loss: 0.3852590322494507, Test Accuracy: 86.5999984741211
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.32958829402923584, Accuracy: 88.20333862304688, Test Loss: 0.3391425311565399, Test Accuracy: 87.6500015258789
Epoch 3, Loss: 0.2872008979320526, Accuracy: 89.57167053222656, Test Loss: 0.2974696457386017, Test Accuracy: 89.31000518798828
Epoch 4, Loss: 0.255713552236557, Accuracy: 90.58499908447266, Test Loss: 0.2988712787628174, Test Accuracy: 89.31999969482422
Epoch 5, Loss: 0.23122134804725647, Accuracy: 91.41667175292969, Test Loss: 0.27742496132850647, Test Accuracy: 89.99000549316406
Epoch 6, Loss: 0.212575763463974, Accuracy: 92.17333221435547, Test Loss: 0.2573488652706146, Test Accuracy: 90.75
Epoch 7, Loss: 0.1963273137807846, Accuracy: 92.77166748046875, Test Loss: 0.2587501108646393, Test Accuracy: 90.66000366210938
Epoch 8, Loss: 0.1779220998287201, Accuracy: 93.46666717529297, Test Loss: 0.267805814743042, Test Accuracy: 90.55999755859375
Epoch 9, Loss: 0.16410504281520844, Accuracy: 93.91333770751953, Test Loss: 0.25632956624031067, Test Accuracy: 91.00999450683594
Epoch 10, Loss: 0.14829590916633606, Accuracy: 94.47833251953125, Test Loss: 0.25820475816726685, Test Accuracy: 91.00999450683594

Cosas a tener en cuenta en el ejemplo anterior:

Restaurar el último punto de control y prueba

Un modelo tf.distribute.Strategy con un tf.distribute.Strategy se puede restaurar con o sin una estrategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.00999450683594

Formas alternativas de iterar sobre un conjunto de datos

Usando iteradores

Si desea iterar sobre un número determinado de pasos y no a través de todo el conjunto de datos, puede crear un iterador usando la llamada iter y la next llamada explícita en el iterador. Puede optar por iterar sobre el conjunto de datos tanto dentro como fuera de la función tf. Aquí hay un pequeño fragmento que demuestra la iteración del conjunto de datos fuera de la función tf.function usando un iterador.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.12157603353261948, Accuracy: 95.0
Epoch 10, Loss: 0.1367541253566742, Accuracy: 94.6875
Epoch 10, Loss: 0.14902949333190918, Accuracy: 93.90625
Epoch 10, Loss: 0.12149540334939957, Accuracy: 95.625
Epoch 10, Loss: 0.13160167634487152, Accuracy: 94.6875
Epoch 10, Loss: 0.13297739624977112, Accuracy: 95.3125
Epoch 10, Loss: 0.16038034856319427, Accuracy: 94.53125
Epoch 10, Loss: 0.1035340279340744, Accuracy: 96.40625
Epoch 10, Loss: 0.11846740543842316, Accuracy: 95.625
Epoch 10, Loss: 0.09006750583648682, Accuracy: 96.71875

Iterando dentro de una función tf.

También puede iterar sobre todo el train_dist_dataset entrada dentro de una función tf. Usando la construcción for x in ... o creando iteradores como hicimos anteriormente. El siguiente ejemplo demuestra cómo envolver una época de entrenamiento en una función tf.function e iterar sobre train_dist_dataset dentro de la función.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13680464029312134, Accuracy: 94.90499877929688
Epoch 2, Loss: 0.12503673136234283, Accuracy: 95.33499908447266
Epoch 3, Loss: 0.11472766101360321, Accuracy: 95.71333312988281
Epoch 4, Loss: 0.10419528931379318, Accuracy: 96.13500213623047
Epoch 5, Loss: 0.09566374123096466, Accuracy: 96.44833374023438
Epoch 6, Loss: 0.08704081922769547, Accuracy: 96.82499694824219
Epoch 7, Loss: 0.08157625794410706, Accuracy: 96.96333312988281
Epoch 8, Loss: 0.07562965154647827, Accuracy: 97.11000061035156
Epoch 9, Loss: 0.0676642507314682, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06430575996637344, Accuracy: 97.58333587646484

Seguimiento de la pérdida de entrenamiento en las réplicas

No recomendamos el uso de tf.metrics.Mean para rastrear la pérdida de la formación a través de diferentes réplicas, debido a la pérdida de cálculos de escalamiento que se lleva a cabo.

Por ejemplo, si ejecuta un trabajo de entrenamiento con las siguientes características:

  • Dos réplicas
  • Se procesan dos muestras en cada réplica
  • Valores de pérdida resultantes: [2, 3] y [4, 5] en cada réplica
  • Tamaño de lote global = 4

Con el escalado de pérdidas, se calcula el valor de pérdida por muestra en cada réplica sumando los valores de pérdida y luego dividiendo por el tamaño global del lote. En este caso: (2 + 3) / 4 = 1.25 y (4 + 5) / 4 = 2.25 .

Si usa tf.metrics.Mean para rastrear la pérdida en las dos réplicas, el resultado es diferente. En este ejemplo, termina con un total de 3.50 y count de 2, lo que da como resultado total / count = 1.75 cuando se llama a result() en la métrica. La pérdida calculada con tf.keras.Metrics se escala mediante un factor adicional que es igual al número de réplicas sincronizadas.

Guía y ejemplos

A continuación, se muestran algunos ejemplos del uso de la estrategia de distribución con ciclos de entrenamiento personalizados:

  1. Guía de formación distribuida
  2. Ejemplo de DenseNet usando MirroredStrategy .
  3. Ejemplo de BERT entrenado con MirroredStrategy y TPUStrategy . Este ejemplo es particularmente útil para comprender cómo cargar desde un punto de control y generar puntos de control periódicos durante la capacitación distribuida, etc.
  4. Ejemplo de NCF entrenado usando MirroredStrategy que se puede habilitar usando el indicador keras_use_ctl .
  5. Ejemplo de NMT entrenado con MirroredStrategy .

Más ejemplos enumerados en la guía de estrategia de distribución .

Próximos pasos