Questa pagina è stata tradotta dall'API Cloud Translation.
Switch to English

Formazione personalizzata con tf.distribute.Strategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza sorgente su GitHub Scarica notebook

Questo tutorial mostra come utilizzare tf.distribute.Strategy con cicli di addestramento personalizzati. Addestreremo un semplice modello CNN sul dataset MNIST della moda. Il dataset MNIST della moda contiene 60000 immagini di treni di dimensioni 28 x 28 e 10000 immagini di prova di dimensioni 28 x 28.

Utilizziamo cicli di addestramento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'addestramento. Inoltre, è più facile eseguire il debug del modello e del ciclo di addestramento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

Scarica il dataset MNIST della moda

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 1s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 1s 0us/step

Crea una strategia per distribuire le variabili e il grafico

Come funziona la strategia tf.distribute.MirroredStrategy ?

  • Tutte le variabili e il grafico del modello vengono replicati sulle repliche.
  • L'input è distribuito uniformemente tra le repliche.
  • Ogni replica calcola la perdita e le sfumature per l'input ricevuto.
  • I gradienti vengono sincronizzati tra tutte le repliche sommandoli.
  • Dopo la sincronizzazione, viene effettuato lo stesso aggiornamento alle copie delle variabili su ciascuna replica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Configurazione della pipeline di input

Esporta il grafico e le variabili nel formato SavedModel indipendente dalla piattaforma. Dopo aver salvato il modello, è possibile caricarlo con o senza l'ambito.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Crea i set di dati e distribuiscili:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crea il modello

Crea un modello usando tf.keras.Sequential . Puoi anche usare l'API Model Subclassing per farlo.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definisci la funzione di perdita

Normalmente, su una singola macchina con 1 GPU / CPU, la perdita è divisa per il numero di esempi nel batch di input.

Quindi, come dovrebbe essere calcolata la perdita quando si usa tf.distribute.Strategy ?

  • Per un esempio, supponiamo di avere 4 GPU e una dimensione batch di 64. Un batch di input viene distribuito tra le repliche (4 GPU), ogni replica riceve un input di dimensione 16.

  • Il modello su ciascuna replica esegue un passaggio in avanti con il rispettivo input e calcola la perdita. Ora, invece di dividere la perdita per il numero di esempi nel rispettivo input (BATCH_SIZE_PER_REPLICA = 16), la perdita dovrebbe essere divisa per GLOBAL_BATCH_SIZE (64).

Perché fare questo?

  • Questo deve essere fatto perché dopo che i gradienti sono stati calcolati su ciascuna replica, vengono sincronizzati tra le repliche sommandoli .

Come eseguire questa operazione in TensorFlow?

  • Se stai scrivendo un ciclo di allenamento personalizzato, come in questo tutorial, dovresti sommare le perdite per esempio e dividere la somma per GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) oppure puoi usare tf.nn.compute_average_loss che prende la perdita per esempio, i pesi campione opzionali e GLOBAL_BATCH_SIZE come argomenti e restituisce la perdita scalata.

  • Se si utilizzano perdite di regolarizzazione nel modello, è necessario ridimensionare il valore della perdita in base al numero di repliche. Puoi farlo utilizzando la funzione tf.nn.scale_regularization_loss .

  • L'uso di tf.reduce_mean non è consigliato. In questo modo si divide la perdita per le dimensioni effettive del batch di replica, che possono variare passo dopo passo.

  • Questa riduzione e il ridimensionamento è fatto automaticamente in keras model.compile e model.fit

  • Se si utilizzano tf.keras.losses classi tf.keras.losses (come nell'esempio seguente), la riduzione della perdita deve essere specificata esplicitamente in modo che sia NONE o SUM . AUTO e SUM_OVER_BATCH_SIZE non sono consentiti se utilizzati con tf.distribute.Strategy . AUTO non è consentito perché l'utente dovrebbe pensare esplicitamente a quale riduzione desidera assicurarsi che sia corretta nel caso distribuito. SUM_OVER_BATCH_SIZE non è consentito perché attualmente divide solo per la dimensione del batch di replica e lascia all'utente la divisione per numero di repliche, che potrebbe essere facile da perdere. Quindi chiediamo invece all'utente di fare la riduzione in modo esplicito.

  • Se le labels sono multidimensionali, per_example_loss la media di per_example_loss sul numero di elementi in ciascun campione. Ad esempio, se la forma delle predictions è (batch_size, H, W, n_classes) e le labels è (batch_size, H, W) , sarà necessario aggiornare per_example_loss come: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Definisci le metriche per monitorare la perdita e l'accuratezza

Queste metriche tengono traccia della perdita del test e della formazione e dell'accuratezza del test. Puoi usare .result() per ottenere le statistiche accumulate in qualsiasi momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

Ciclo di formazione

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.50295090675354, Accuracy: 82.1116714477539, Test Loss: 0.3852590322494507, Test Accuracy: 86.5999984741211
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.32958829402923584, Accuracy: 88.20333862304688, Test Loss: 0.3391425311565399, Test Accuracy: 87.6500015258789
Epoch 3, Loss: 0.2872008979320526, Accuracy: 89.57167053222656, Test Loss: 0.2974696457386017, Test Accuracy: 89.31000518798828
Epoch 4, Loss: 0.255713552236557, Accuracy: 90.58499908447266, Test Loss: 0.2988712787628174, Test Accuracy: 89.31999969482422
Epoch 5, Loss: 0.23122134804725647, Accuracy: 91.41667175292969, Test Loss: 0.27742496132850647, Test Accuracy: 89.99000549316406
Epoch 6, Loss: 0.212575763463974, Accuracy: 92.17333221435547, Test Loss: 0.2573488652706146, Test Accuracy: 90.75
Epoch 7, Loss: 0.1963273137807846, Accuracy: 92.77166748046875, Test Loss: 0.2587501108646393, Test Accuracy: 90.66000366210938
Epoch 8, Loss: 0.1779220998287201, Accuracy: 93.46666717529297, Test Loss: 0.267805814743042, Test Accuracy: 90.55999755859375
Epoch 9, Loss: 0.16410504281520844, Accuracy: 93.91333770751953, Test Loss: 0.25632956624031067, Test Accuracy: 91.00999450683594
Epoch 10, Loss: 0.14829590916633606, Accuracy: 94.47833251953125, Test Loss: 0.25820475816726685, Test Accuracy: 91.00999450683594

Cose da notare nell'esempio sopra:

Ripristina l'ultimo checkpoint e prova

Un modello con checkpoint con tf.distribute.Strategy può essere ripristinato con o senza strategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.00999450683594

Modi alternativi di iterazione su un set di dati

Usare gli iteratori

Se si desidera iterare su un determinato numero di passaggi e non sull'intero set di dati, è possibile creare un iteratore utilizzando la chiamata iter e la chiamata esplicita next sull'iteratore. È possibile scegliere di iterare sul set di dati sia all'interno che all'esterno della funzione tf.. Ecco un piccolo frammento che mostra l'iterazione del set di dati al di fuori della funzione tf. utilizzando un iteratore.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.12157603353261948, Accuracy: 95.0
Epoch 10, Loss: 0.1367541253566742, Accuracy: 94.6875
Epoch 10, Loss: 0.14902949333190918, Accuracy: 93.90625
Epoch 10, Loss: 0.12149540334939957, Accuracy: 95.625
Epoch 10, Loss: 0.13160167634487152, Accuracy: 94.6875
Epoch 10, Loss: 0.13297739624977112, Accuracy: 95.3125
Epoch 10, Loss: 0.16038034856319427, Accuracy: 94.53125
Epoch 10, Loss: 0.1035340279340744, Accuracy: 96.40625
Epoch 10, Loss: 0.11846740543842316, Accuracy: 95.625
Epoch 10, Loss: 0.09006750583648682, Accuracy: 96.71875

Iterazione all'interno di una funzione tf

Puoi anche iterare sull'intero train_dist_dataset input train_dist_dataset all'interno di una funzione tf. usando il costrutto for x in ... o creando iteratori come abbiamo fatto sopra. L'esempio seguente mostra il wrapping di un'epoca di addestramento in una funzione tf. e l'iterazione su train_dist_dataset all'interno della funzione.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13680464029312134, Accuracy: 94.90499877929688
Epoch 2, Loss: 0.12503673136234283, Accuracy: 95.33499908447266
Epoch 3, Loss: 0.11472766101360321, Accuracy: 95.71333312988281
Epoch 4, Loss: 0.10419528931379318, Accuracy: 96.13500213623047
Epoch 5, Loss: 0.09566374123096466, Accuracy: 96.44833374023438
Epoch 6, Loss: 0.08704081922769547, Accuracy: 96.82499694824219
Epoch 7, Loss: 0.08157625794410706, Accuracy: 96.96333312988281
Epoch 8, Loss: 0.07562965154647827, Accuracy: 97.11000061035156
Epoch 9, Loss: 0.0676642507314682, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06430575996637344, Accuracy: 97.58333587646484

Monitoraggio della perdita di formazione tra le repliche

Si consiglia di non utilizzare tf.metrics.Mean per monitorare la perdita di formazione tra le varie repliche, a causa del calcolo ridimensionamento perdita che viene effettuata.

Ad esempio, se esegui un lavoro di formazione con le seguenti caratteristiche:

  • Due repliche
  • Due campioni vengono elaborati su ciascuna replica
  • Valori di perdita risultanti: [2, 3] e [4, 5] su ciascuna replica
  • Dimensione batch globale = 4

Con il ridimensionamento delle perdite, si calcola il valore di perdita per campione su ciascuna replica aggiungendo i valori di perdita e quindi dividendo per la dimensione del batch globale. In questo caso: (2 + 3) / 4 = 1.25 e (4 + 5) / 4 = 2.25 .

Se utilizzi tf.metrics.Mean per tenere traccia delle perdite tra le due repliche, il risultato è diverso. In questo esempio, si ottiene un total di 3,50 e un count di 2, che risulta in total / count = 1,75 quando result() viene chiamato sulla metrica. La perdita calcolata con tf.keras.Metrics viene ridimensionata di un fattore aggiuntivo uguale al numero di repliche sincronizzate.

Guida ed esempi

Di seguito sono riportati alcuni esempi di utilizzo della strategia di distribuzione con cicli di addestramento personalizzati:

  1. Guida alla formazione distribuita
  2. Esempio di DenseNet che utilizza MirroredStrategy .
  3. Esempio BERT addestrato utilizzando MirroredStrategy e TPUStrategy . Questo esempio è particolarmente utile per capire come caricare da un checkpoint e generare checkpoint periodici durante l'addestramento distribuito, ecc.
  4. Esempio NCF addestrato utilizzando MirroredStrategy che può essere abilitato utilizzando il flag keras_use_ctl .
  5. Esempio NMT addestrato utilizzando MirroredStrategy .

Altri esempi elencati nella guida alla strategia di distribuzione .

Prossimi passi