Cette page a été traduite par l'API Cloud Translation.
Switch to English

Formation personnalisée avec tf.distribute.Strategy

Voir sur TensorFlow.org Exécuter dans Google Colab Afficher la source sur GitHub Télécharger le cahier

Ce didacticiel montre comment utiliser tf.distribute.Strategy avec des boucles d'entraînement personnalisées. Nous allons former un modèle CNN simple sur l'ensemble de données mode MNIST. Le jeu de données mode MNIST contient 60000 images de train de taille 28 x 28 et 10000 images de test de taille 28 x 28.

Nous utilisons des boucles de formation personnalisées pour former notre modèle, car elles nous donnent de la flexibilité et un plus grand contrôle sur la formation. De plus, il est plus facile de déboguer le modèle et la boucle d'apprentissage.

 # Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.2.0

Téléchargez le jeu de données mode MNIST

 fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 0s 0us/step

Créer une stratégie pour distribuer les variables et le graphique

Comment fonctionne la stratégie tf.distribute.MirroredStrategy ?

  • Toutes les variables et le graphe du modèle sont répliqués sur les répliques.
  • L'entrée est uniformément répartie entre les répliques.
  • Chaque réplique calcule la perte et les gradients pour l'entrée reçue.
  • Les dégradés sont synchronisés sur toutes les répliques en les additionnant.
  • Après la synchronisation, la même mise à jour est effectuée sur les copies des variables sur chaque réplica.
 # If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

 print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
 
Number of devices: 1

Configurer le pipeline d'entrée

Exportez le graphique et les variables au format SavedModel indépendant de la plate-forme. Une fois votre modèle enregistré, vous pouvez le charger avec ou sans la portée.

 BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10
 

Créez les ensembles de données et diffusez-les:

 train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
 

Créer le modèle

Créez un modèle à l'aide de tf.keras.Sequential . Vous pouvez également utiliser l'API de sous-classification de modèle pour ce faire.

 def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
 
 # Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
 

Définir la fonction de perte

Normalement, sur une seule machine avec 1 GPU / CPU, la perte est divisée par le nombre d'exemples dans le lot d'entrée.

Alors, comment calculer la perte lors de l'utilisation d'une tf.distribute.Strategy ?

  • Pour un exemple, disons que vous avez 4 GPU et une taille de lot de 64. Un lot d'entrées est réparti sur les répliques (4 GPU), chaque réplique recevant une entrée de taille 16.

  • Le modèle sur chaque réplique effectue une passe avant avec son entrée respective et calcule la perte. Maintenant, au lieu de diviser la perte par le nombre d'exemples dans son entrée respective (BATCH_SIZE_PER_REPLICA = 16), la perte doit être divisée par GLOBAL_BATCH_SIZE (64).

Pourquoi faire ceci?

  • Cela doit être fait car une fois les dégradés calculés sur chaque réplique, ils sont synchronisés entre les répliques en les additionnant .

Comment faire cela dans TensorFlow?

  • Si vous écrivez une boucle d'entraînement personnalisée, comme dans ce didacticiel, vous devez additionner les pertes par exemple et diviser la somme par GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) ou vous pouvez utiliser tf.nn.compute_average_loss qui prend la perte par exemple, les poids d'échantillon facultatifs et GLOBAL_BATCH_SIZE comme arguments et renvoie la perte mise à l'échelle.

  • Si vous utilisez des pertes de régularisation dans votre modèle, vous devez mettre à l'échelle la valeur de la perte en fonction du nombre de répliques. Vous pouvez le faire en utilisant la fonction tf.nn.scale_regularization_loss .

  • L'utilisation de tf.reduce_mean n'est pas recommandée. Cela divise la perte par la taille réelle du lot par réplique, qui peut varier d'une étape à l'autre.

  • Cette réduction et mise à l'échelle se fait automatiquement dans keras model.compile et model.fit

  • Si vous tf.keras.losses classes tf.keras.losses (comme dans l'exemple ci-dessous), la réduction de perte doit être explicitement spécifiée comme étant NONE ou SUM . AUTO et SUM_OVER_BATCH_SIZE ne sont pas autorisés lorsqu'ils sont utilisés avec tf.distribute.Strategy . AUTO est interdit car l'utilisateur doit explicitement réfléchir à la réduction qu'il souhaite pour s'assurer qu'elle est correcte dans le cas distribué. SUM_OVER_BATCH_SIZE n'est pas autorisé car actuellement, il ne diviserait que par la taille de lot de réplica et laisserait la division par le nombre de répliques à l'utilisateur, ce qui pourrait être facile à manquer. Nous demandons donc à l'utilisateur de faire la réduction lui-même explicitement.

  • Si les labels sont multidimensionnelles, per_example_loss la moyenne de per_example_loss sur le nombre d'éléments de chaque échantillon. Par exemple, si la forme des predictions est (batch_size, H, W, n_classes) et les labels est (batch_size, H, W) , vous devrez mettre à jour per_example_loss comme: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

 with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
 

Définir les métriques pour suivre la perte et la précision

Ces mesures permettent de suivre la perte de test, la formation et la précision des tests. Vous pouvez utiliser .result() pour obtenir les statistiques accumulées à tout moment.

 with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
 

Boucle d'entraînement

 # model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
 
 def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
 
 # `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
 
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.5121287107467651, Accuracy: 81.44499969482422, Test Loss: 0.4022502303123474, Test Accuracy: 85.36000061035156
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.33714210987091064, Accuracy: 87.89167022705078, Test Loss: 0.3380376994609833, Test Accuracy: 87.9000015258789
Epoch 3, Loss: 0.2912210524082184, Accuracy: 89.28166961669922, Test Loss: 0.29934415221214294, Test Accuracy: 88.97000122070312
Epoch 4, Loss: 0.25845447182655334, Accuracy: 90.49666595458984, Test Loss: 0.2924875319004059, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.23627422749996185, Accuracy: 91.37667083740234, Test Loss: 0.2735322117805481, Test Accuracy: 89.72000122070312
Epoch 6, Loss: 0.217362642288208, Accuracy: 91.90666961669922, Test Loss: 0.2779887914657593, Test Accuracy: 89.55000305175781
Epoch 7, Loss: 0.19758468866348267, Accuracy: 92.77666473388672, Test Loss: 0.2544868290424347, Test Accuracy: 90.58000183105469
Epoch 8, Loss: 0.18331800401210785, Accuracy: 93.22000122070312, Test Loss: 0.27537408471107483, Test Accuracy: 90.30000305175781
Epoch 9, Loss: 0.16616544127464294, Accuracy: 93.92832946777344, Test Loss: 0.27181997895240784, Test Accuracy: 89.74000549316406
Epoch 10, Loss: 0.15189047157764435, Accuracy: 94.3499984741211, Test Loss: 0.29621848464012146, Test Accuracy: 89.8800048828125

Choses à noter dans l'exemple ci-dessus:

Restaurez le dernier point de contrôle et testez

Un modèle avec un point de contrôle tf.distribute.Strategy peut être restauré avec ou sans stratégie.

 eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
 
 @tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
 
 checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
 
Accuracy after restoring the saved model without strategy: 89.74000549316406

Autres méthodes d'itération sur un ensemble de données

Utiliser des itérateurs

Si vous souhaitez itérer sur un nombre donné d'étapes et non sur l'ensemble de données, vous pouvez créer un itérateur à l'aide de l'appel iter et de l'appel explicite next sur l'itérateur. Vous pouvez choisir de parcourir l'ensemble de données à la fois à l'intérieur et à l'extérieur de la fonction tf.function. Voici un petit extrait de code montrant l'itération de l'ensemble de données en dehors de la fonction tf.function à l'aide d'un itérateur.

 for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
 
Epoch 10, Loss: 0.15917226672172546, Accuracy: 95.0
Epoch 10, Loss: 0.12831072509288788, Accuracy: 95.3125
Epoch 10, Loss: 0.1469763219356537, Accuracy: 94.21875
Epoch 10, Loss: 0.1432376652956009, Accuracy: 94.53125
Epoch 10, Loss: 0.10891322791576385, Accuracy: 96.5625
Epoch 10, Loss: 0.11562182009220123, Accuracy: 95.46875
Epoch 10, Loss: 0.12230901420116425, Accuracy: 95.625
Epoch 10, Loss: 0.149966761469841, Accuracy: 94.21875
Epoch 10, Loss: 0.13360166549682617, Accuracy: 94.6875
Epoch 10, Loss: 0.13077859580516815, Accuracy: 95.0

Itérer dans une fonction tf

Vous pouvez également parcourir l'ensemble de l'entrée train_dist_dataset dans une fonction tf.function en utilisant la construction for x in ... ou en créant des itérateurs comme nous l'avons fait ci-dessus. L'exemple ci-dessous montre l'encapsulation d'une époque d'entraînement dans une fonction tf.function et l'itération sur train_dist_dataset à l'intérieur de la fonction.

 @tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
 
Epoch 1, Loss: 0.1388315111398697, Accuracy: 94.83000183105469
Epoch 2, Loss: 0.1286177784204483, Accuracy: 95.1433334350586
Epoch 3, Loss: 0.11653201282024384, Accuracy: 95.70000457763672
Epoch 4, Loss: 0.1051444411277771, Accuracy: 96.06666564941406
Epoch 5, Loss: 0.09490712732076645, Accuracy: 96.46833801269531
Epoch 6, Loss: 0.08746836334466934, Accuracy: 96.77166748046875
Epoch 7, Loss: 0.07981529831886292, Accuracy: 97.04833984375
Epoch 8, Loss: 0.07256246358156204, Accuracy: 97.28666687011719
Epoch 9, Loss: 0.06466992199420929, Accuracy: 97.63333129882812
Epoch 10, Loss: 0.06099675968289375, Accuracy: 97.73833465576172

Suivi des pertes d'entraînement sur les répliques

Nous ne recommandons pas d' utiliser tf.metrics.Mean pour suivre la perte de formation dans les différentes répliques, en raison du calcul de la mise à l' échelle de la perte qui est effectuée.

Par exemple, si vous exécutez une tâche d'entraînement avec les caractéristiques suivantes:

  • Deux répliques
  • Deux échantillons sont traités sur chaque réplique
  • Valeurs de perte résultantes: [2, 3] et [4, 5] sur chaque réplique
  • Taille globale du lot = 4

Avec la mise à l'échelle des pertes, vous calculez la valeur de perte par échantillon sur chaque réplica en ajoutant les valeurs de perte, puis en divisant par la taille globale du lot. Dans ce cas: (2 + 3) / 4 = 1.25 et (4 + 5) / 4 = 2.25 .

Si vous utilisez tf.metrics.Mean pour suivre les pertes entre les deux réplicas, le résultat est différent. Dans cet exemple, vous vous retrouvez avec un total de 3,50 et un count de 2, ce qui donne un total / count = 1,75 lorsque result() est appelé sur la métrique. La perte calculée avec tf.keras.Metrics est mise à l'échelle par un facteur supplémentaire égal au nombre de répliques synchronisées.

Guide et exemples

Voici quelques exemples d'utilisation de la stratégie de distribution avec des boucles d'entraînement personnalisées:

  1. Guide de formation distribué
  2. Exemple DenseNet utilisant MirroredStrategy .
  3. BERT exemple formé en utilisant MirroredStrategy et TPUStrategy . Cet exemple est particulièrement utile pour comprendre comment charger à partir d'un point de contrôle et générer des points de contrôle périodiques pendant la formation distribuée, etc.
  4. Exemple NCF formé à l'aide de MirroredStrategy qui peut être activé à l'aide de l'indicateur keras_use_ctl .
  5. Exemple NMT formé à l'aide de MirroredStrategy .

Plus d'exemples répertoriés dans le guide de stratégie de distribution .

Prochaines étapes