Formazione personalizzata con tf.distribute.Strategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza la fonte su GitHub Scarica il taccuino

Questo tutorial mostra come utilizzare tf.distribute.Strategy con i cicli di formazione personalizzati. Addestreremo un semplice modello CNN sul set di dati MNIST della moda. Il set di dati fashion MNIST contiene 60000 immagini di treni di dimensioni 28 x 28 e 10000 immagini di prova di dimensioni 28 x 28.

Utilizziamo cicli di addestramento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'addestramento. Inoltre, è più semplice eseguire il debug del modello e del ciclo di addestramento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

Scarica il set di dati MNIST della moda

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

Crea una strategia per distribuire le variabili e il grafico

Come funziona tf.distribute.MirroredStrategy lavoro strategia?

  • Tutte le variabili e il grafico del modello vengono replicati sulle repliche.
  • L'input è distribuito uniformemente tra le repliche.
  • Ogni replica calcola la perdita e i gradienti per l'input ricevuto.
  • I gradienti vengono sincronizzati su tutte le repliche sommandoli.
  • Dopo la sincronizzazione, viene effettuato lo stesso aggiornamento alle copie delle variabili su ciascuna replica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Configurazione della pipeline di input

Esporta il grafico e le variabili nel formato SavedModel indipendente dalla piattaforma. Dopo che il modello è stato salvato, puoi caricarlo con o senza l'oscilloscopio.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Crea i set di dati e distribuiscili:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crea il modello

Creare un modello usando tf.keras.Sequential . Puoi anche usare l'API Model Subclassing per farlo.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definire la funzione di perdita

Normalmente, su una singola macchina con 1 GPU/CPU, la perdita viene divisa per il numero di esempi nel batch di input.

Quindi, come se la perdita di essere calcolato quando si utilizza un tf.distribute.Strategy ?

  • Ad esempio, supponiamo che tu abbia 4 GPU e una dimensione batch di 64. Un batch di input è distribuito tra le repliche (4 GPU), ogni replica riceve un input di dimensione 16.

  • Il modello su ciascuna replica esegue un passaggio in avanti con il rispettivo input e calcola la perdita. Ora, invece di dividere la perdita per il numero di esempi nel rispettivo input (BATCH_SIZE_PER_REPLICA = 16), la perdita dovrebbe essere divisa per GLOBAL_BATCH_SIZE (64).

Perché farlo?

  • Questo deve essere fatto perché dopo le pendenze sono calcolate su ogni replica, vengono sincronizzati tra le repliche da loro somma.

Come farlo in TensorFlow?

  • Se si scrive un ciclo di formazione personalizzato, come in questo tutorial, si dovrebbe sommare le perdite per esempio e dividere la somma per il GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) oppure è possibile utilizzare tf.nn.compute_average_loss che prende il per esempio perdita, pesi campione opzionali, e GLOBAL_BATCH_SIZE come argomenti e restituisce la perdita in scala.

  • Se si utilizzano perdite di regolarizzazione nel modello, è necessario ridimensionare il valore della perdita in base al numero di repliche. È possibile farlo utilizzando la tf.nn.scale_regularization_loss funzioni.

  • Utilizzando tf.reduce_mean non è raccomandato. In questo modo si divide la perdita per le dimensioni effettive del batch di replica che possono variare di passaggio in passaggio.

  • Questa riduzione e il ridimensionamento è fatto automaticamente in keras model.compile e model.fit

  • Se si utilizza tf.keras.losses classi (come nell'esempio di seguito), la riduzione di perdita deve essere esplicitamente specificato per essere uno NONE o SUM . AUTO e SUM_OVER_BATCH_SIZE non sono consentite quando viene utilizzato con tf.distribute.Strategy . AUTO viene annullato perché l'utente dovrebbe pensare in modo esplicito su ciò che la riduzione che vogliono assicurarsi che sia corretto nel caso distribuito. SUM_OVER_BATCH_SIZE è consentita perché attualmente sarebbe dividere solo per replica dimensione del lotto, e lasciare il dividendo per numero di repliche per l'utente, che potrebbe essere facile perdere. Quindi, invece, chiediamo all'utente di eseguire la riduzione da solo in modo esplicito.

  • Se labels è multidimensionale, quindi la media per_example_loss attraverso il numero di elementi in ciascun campione. Ad esempio, se la forma di predictions è (batch_size, H, W, n_classes) e labels è (batch_size, H, W) , sarà necessario aggiornare per_example_loss come: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Definisci le metriche per monitorare la perdita e l'accuratezza

Queste metriche tengono traccia della perdita del test, dell'addestramento e dell'accuratezza del test. È possibile utilizzare .result() per ottenere le statistiche accumulate in qualsiasi momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Ciclo di allenamento

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5044084787368774, Accuracy: 81.87333679199219, Test Loss: 0.3816865086555481, Test Accuracy: 86.5999984741211
Epoch 2, Loss: 0.3375805616378784, Accuracy: 87.8566665649414, Test Loss: 0.3369813859462738, Test Accuracy: 87.76000213623047
Epoch 3, Loss: 0.2896445095539093, Accuracy: 89.50499725341797, Test Loss: 0.299490362405777, Test Accuracy: 89.22000122070312
Epoch 4, Loss: 0.259074866771698, Accuracy: 90.58833312988281, Test Loss: 0.2881558835506439, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.2341146171092987, Accuracy: 91.38999938964844, Test Loss: 0.2916182577610016, Test Accuracy: 89.61000061035156
Epoch 6, Loss: 0.21513047814369202, Accuracy: 92.02333068847656, Test Loss: 0.2755740284919739, Test Accuracy: 89.85000610351562
Epoch 7, Loss: 0.1952667236328125, Accuracy: 92.88333129882812, Test Loss: 0.27464523911476135, Test Accuracy: 90.36000061035156
Epoch 8, Loss: 0.17831537127494812, Accuracy: 93.3566665649414, Test Loss: 0.26432710886001587, Test Accuracy: 90.19000244140625
Epoch 9, Loss: 0.16429665684700012, Accuracy: 93.85333251953125, Test Loss: 0.2659859359264374, Test Accuracy: 91.0999984741211
Epoch 10, Loss: 0.1503313183784485, Accuracy: 94.42166900634766, Test Loss: 0.2602477967739105, Test Accuracy: 91.06999969482422

Cose da notare nell'esempio sopra:

Ripristina l'ultimo checkpoint e prova

Un modello checkpoint con una tf.distribute.Strategy può essere ripristinato con o senza una strategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0999984741211

Modi alternativi di iterare su un set di dati

Utilizzo degli iteratori

Se si desidera iterare in un determinato numero di passi e non attraverso l'intero set di dati è possibile creare un iteratore utilizzando l' iter chiamata e chiamata esplicitamente next sul iteratore. Puoi scegliere di scorrere il set di dati sia all'interno che all'esterno di tf.function. Ecco un piccolo frammento che dimostra l'iterazione del set di dati al di fuori di tf.function usando un iteratore.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14126229286193848, Accuracy: 95.0
Epoch 10, Loss: 0.1343936026096344, Accuracy: 95.0
Epoch 10, Loss: 0.12443388998508453, Accuracy: 94.84375
Epoch 10, Loss: 0.1607474684715271, Accuracy: 94.21875
Epoch 10, Loss: 0.10524413734674454, Accuracy: 96.71875
Epoch 10, Loss: 0.11492376029491425, Accuracy: 96.71875
Epoch 10, Loss: 0.16041627526283264, Accuracy: 94.21875
Epoch 10, Loss: 0.13022005558013916, Accuracy: 94.6875
Epoch 10, Loss: 0.17113295197486877, Accuracy: 93.28125
Epoch 10, Loss: 0.12315043061971664, Accuracy: 95.625

Iterazione all'interno di una funzione tf

È inoltre iterate su tutta d'ingresso può train_dist_dataset all'interno di un tf.function utilizzando il for x in ... costrutto o creando iteratori come abbiamo fatto in precedenza. L'esempio seguente mostra il confezionamento un'epoca di formazione in un tf.function e l'iterazione di train_dist_dataset all'interno della funzione.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13766956329345703, Accuracy: 94.89666748046875
Epoch 2, Loss: 0.12510614097118378, Accuracy: 95.35166931152344
Epoch 3, Loss: 0.11464647948741913, Accuracy: 95.70333099365234
Epoch 4, Loss: 0.10295023769140244, Accuracy: 96.12000274658203
Epoch 5, Loss: 0.09352775663137436, Accuracy: 96.49666595458984
Epoch 6, Loss: 0.08494547754526138, Accuracy: 96.87166595458984
Epoch 7, Loss: 0.07917638123035431, Accuracy: 97.09166717529297
Epoch 8, Loss: 0.07128290832042694, Accuracy: 97.37833404541016
Epoch 9, Loss: 0.06662175804376602, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06016768515110016, Accuracy: 97.82833099365234

Monitoraggio della perdita di addestramento tra le repliche

Si consiglia di non utilizzare tf.metrics.Mean per monitorare la perdita di formazione tra le varie repliche, a causa del calcolo ridimensionamento perdita che viene effettuata.

Ad esempio, se esegui un lavoro di formazione con le seguenti caratteristiche:

  • Due repliche
  • Vengono processati due campioni su ogni replica
  • Valori di perdita risultanti: [2, 3] e [4, 5] su ciascuna replica
  • Dimensione globale del lotto = 4

Con il ridimensionamento delle perdite, si calcola il valore della perdita per campione su ciascuna replica aggiungendo i valori della perdita e quindi dividendo per la dimensione del batch globale. In questo caso: (2 + 3) / 4 = 1.25 e (4 + 5) / 4 = 2.25 .

Se si utilizza tf.metrics.Mean per monitorare la perdita attraverso le due repliche, il risultato è diverso. In questo esempio, si finisce con un total di 3,50 e count di 2, che si traduce in total / count = 1.75 quando result() è chiamato alla metrica. Perdita calcolati con tf.keras.Metrics è scalato di un fattore aggiuntivo che è uguale al numero di repliche in sincronia.

Guida ed esempi

Di seguito sono riportati alcuni esempi di utilizzo della strategia di distribuzione con cicli di formazione personalizzati:

  1. Guida alla formazione distribuita
  2. DenseNet esempio con MirroredStrategy .
  3. BERT esempio addestrato utilizzando MirroredStrategy e TPUStrategy . Questo esempio è particolarmente utile per capire come caricare da un checkpoint e generare checkpoint periodici durante l'addestramento distribuito, ecc.
  4. NCF esempio allenato utilizzando MirroredStrategy che può essere attivato utilizzando il keras_use_ctl bandiera.
  5. NMT esempio allenato utilizzando MirroredStrategy .

Altri esempi elencati nella guida strategica di distribuzione .

Prossimi passi