Formazione personalizzata con tf.distribute.Strategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

Questo tutorial mostra come usare tf.distribute.Strategy con cicli di formazione personalizzati. Addestreremo un semplice modello CNN sul set di dati fashion MNIST. Il set di dati fashion MNIST contiene 60000 immagini di treni di dimensioni 28 x 28 e 10000 immagini di test di dimensioni 28 x 28.

Utilizziamo cicli di allenamento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'allenamento. Inoltre, è più semplice eseguire il debug del modello e del ciclo di addestramento.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1

Scarica il set di dati MNIST della moda

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

Creare una strategia per distribuire le variabili e il grafico

Come funziona la strategia tf.distribute.MirroredStrategy ?

  • Tutte le variabili e il grafico del modello vengono replicati sulle repliche.
  • L'input è distribuito uniformemente tra le repliche.
  • Ogni replica calcola la perdita e i gradienti per l'input ricevuto.
  • I gradienti vengono sincronizzati su tutte le repliche sommandole.
  • Dopo la sincronizzazione, lo stesso aggiornamento viene effettuato alle copie delle variabili su ciascuna replica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Imposta la pipeline di input

Esporta il grafico e le variabili nel formato SavedModel indipendente dalla piattaforma. Dopo che il tuo modello è stato salvato, puoi caricarlo con o senza l'oscilloscopio.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Crea i set di dati e distribuiscili:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
2022-01-26 05:45:53.991501: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_UINT8
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
        dim {
          size: 1
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
}

2022-01-26 05:45:54.034762: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_UINT8
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:3"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
        dim {
          size: 1
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_UINT8
        }
      }
    }
  }
}

Crea il modello

Crea un modello usando tf.keras.Sequential . Puoi anche utilizzare l'API Model Subclassing per farlo.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definire la funzione di perdita

Normalmente, su una singola macchina con 1 GPU/CPU, la perdita è divisa per il numero di esempi nel batch di input.

Quindi, come dovrebbe essere calcolata la perdita quando si utilizza un tf.distribute.Strategy ?

  • Ad esempio, supponiamo che tu abbia 4 GPU e una dimensione batch di 64. Un batch di input è distribuito tra le repliche (4 GPU), ciascuna replica riceve un input di dimensione 16.

  • Il modello su ciascuna replica esegue un passaggio in avanti con il rispettivo input e calcola la perdita. Ora, invece di dividere la perdita per il numero di esempi nel rispettivo input (BATCH_SIZE_PER_REPLICA = 16), la perdita dovrebbe essere divisa per GLOBAL_BATCH_SIZE (64).

Perché farlo?

  • Questo deve essere fatto perché dopo che i gradienti sono stati calcolati su ciascuna replica, vengono sincronizzati tra le repliche sommandoli .

Come farlo in TensorFlow?

  • Se stai scrivendo un ciclo di allenamento personalizzato, come in questo tutorial, dovresti sommare le perdite per esempio e dividere la somma per GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) oppure puoi usare tf.nn.compute_average_loss che prende la perdita per esempio, i pesi campione facoltativi e GLOBAL_BATCH_SIZE come argomenti e restituisce la perdita in scala.

  • Se stai utilizzando le perdite di regolarizzazione nel tuo modello, devi ridimensionare il valore della perdita in base al numero di repliche. Puoi farlo usando la funzione tf.nn.scale_regularization_loss .

  • L'utilizzo tf.reduce_mean non è consigliato. In questo modo si divide la perdita per la dimensione effettiva del batch di replica che può variare passo dopo passo.

  • Questa riduzione e ridimensionamento viene eseguita automaticamente in keras model.compile e model.fit

  • Se si utilizzano classi tf.keras.losses (come nell'esempio seguente), la riduzione della perdita deve essere specificata esplicitamente come una tra NONE o SUM . AUTO e SUM_OVER_BATCH_SIZE non sono consentiti se utilizzati con tf.distribute.Strategy . AUTO non è consentito perché l'utente dovrebbe pensare esplicitamente a quale riduzione desidera per assicurarsi che sia corretta nel caso distribuito. SUM_OVER_BATCH_SIZE non è consentito perché attualmente dividerebbe solo per dimensione batch di replica e lascerebbe la divisione per numero di repliche all'utente, cosa che potrebbe essere facile da perdere. Quindi, invece, chiediamo all'utente di eseguire la riduzione in modo esplicito.

  • Se labels sono multidimensionali, calcola la media di per_example_loss sul numero di elementi in ciascun campione. Ad esempio, se la forma delle predictions è (batch_size, H, W, n_classes) e labels è (batch_size, H, W) , dovrai aggiornare per_example_loss come: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Definisci le metriche per monitorare la perdita e l'accuratezza

Queste metriche tengono traccia della perdita del test, della formazione e dell'accuratezza del test. Puoi utilizzare .result() per ottenere le statistiche accumulate in qualsiasi momento.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Ciclo di formazione

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5106383562088013, Accuracy: 81.77999877929688, Test Loss: 0.39399346709251404, Test Accuracy: 85.79000091552734
Epoch 2, Loss: 0.3362727463245392, Accuracy: 87.91333770751953, Test Loss: 0.35871225595474243, Test Accuracy: 86.7699966430664
Epoch 3, Loss: 0.2928692400455475, Accuracy: 89.2683334350586, Test Loss: 0.2999486029148102, Test Accuracy: 89.04000091552734
Epoch 4, Loss: 0.2605818510055542, Accuracy: 90.41999816894531, Test Loss: 0.28474125266075134, Test Accuracy: 89.47000122070312
Epoch 5, Loss: 0.23641237616539001, Accuracy: 91.32166290283203, Test Loss: 0.26421546936035156, Test Accuracy: 90.41000366210938
Epoch 6, Loss: 0.2192477434873581, Accuracy: 91.90499877929688, Test Loss: 0.2650589942932129, Test Accuracy: 90.4800033569336
Epoch 7, Loss: 0.20016911625862122, Accuracy: 92.66999816894531, Test Loss: 0.25025954842567444, Test Accuracy: 90.9000015258789
Epoch 8, Loss: 0.18381091952323914, Accuracy: 93.26499938964844, Test Loss: 0.2585820257663727, Test Accuracy: 90.95999908447266
Epoch 9, Loss: 0.1699329912662506, Accuracy: 93.67500305175781, Test Loss: 0.26234227418899536, Test Accuracy: 91.0199966430664
Epoch 10, Loss: 0.15756534039974213, Accuracy: 94.16333770751953, Test Loss: 0.25516414642333984, Test Accuracy: 90.93000030517578

Cose da notare nell'esempio sopra:

Ripristina l'ultimo checkpoint e prova

Un modello sottoposto a checkpoint con tf.distribute.Strategy può essere ripristinato con o senza una strategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0199966430664

Modi alternativi di iterazione su un set di dati

Utilizzo di iteratori

Se si desidera eseguire l'iterazione su un determinato numero di passaggi e non sull'intero set di dati, è possibile creare un iteratore utilizzando la chiamata iter e la chiamata esplicita next sull'iteratore. Puoi scegliere di scorrere il set di dati sia all'interno che all'esterno della funzione tf. Ecco un piccolo frammento che mostra l'iterazione del set di dati al di fuori di tf.function usando un iteratore.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.17486707866191864, Accuracy: 93.4375
Epoch 10, Loss: 0.12386945635080338, Accuracy: 95.3125
Epoch 10, Loss: 0.16411852836608887, Accuracy: 93.90625
Epoch 10, Loss: 0.10728752613067627, Accuracy: 96.40625
Epoch 10, Loss: 0.11865834891796112, Accuracy: 95.625
Epoch 10, Loss: 0.12875251471996307, Accuracy: 95.15625
Epoch 10, Loss: 0.1189488023519516, Accuracy: 95.625
Epoch 10, Loss: 0.1456708014011383, Accuracy: 95.15625
Epoch 10, Loss: 0.12446556240320206, Accuracy: 95.3125
Epoch 10, Loss: 0.1380888819694519, Accuracy: 95.46875

Iterazione all'interno di una funzione tf

Puoi anche scorrere l'intero input train_dist_dataset all'interno di una funzione tf. usando il costrutto for x in ... o creando iteratori come abbiamo fatto sopra. L'esempio seguente mostra il wrapping di un'epoca di training in un tf.function e l'iterazione su train_dist_dataset all'interno della funzione.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
Epoch 1, Loss: 0.14398494362831116, Accuracy: 94.63999938964844
Epoch 2, Loss: 0.13246288895606995, Accuracy: 94.97333526611328
Epoch 3, Loss: 0.11922841519117355, Accuracy: 95.63833618164062
Epoch 4, Loss: 0.11084160208702087, Accuracy: 95.99333190917969
Epoch 5, Loss: 0.10420522093772888, Accuracy: 96.0816650390625
Epoch 6, Loss: 0.09215126931667328, Accuracy: 96.63500213623047
Epoch 7, Loss: 0.0878651961684227, Accuracy: 96.67666625976562
Epoch 8, Loss: 0.07854588329792023, Accuracy: 97.09333038330078
Epoch 9, Loss: 0.07217177003622055, Accuracy: 97.34833526611328
Epoch 10, Loss: 0.06753655523061752, Accuracy: 97.48999786376953

Monitoraggio della perdita di allenamento tra le repliche

Non è consigliabile utilizzare tf.metrics.Mean per tenere traccia della perdita di addestramento su diverse repliche, a causa del calcolo del ridimensionamento della perdita eseguito.

Ad esempio, se si esegue un processo di formazione con le seguenti caratteristiche:

  • Due repliche
  • Due campioni vengono elaborati su ciascuna replica
  • Valori di perdita risultanti: [2, 3] e [4, 5] su ciascuna replica
  • Dimensione globale del lotto = 4

Con il ridimensionamento delle perdite, si calcola il valore della perdita per campione su ciascuna replica aggiungendo i valori di perdita e quindi dividendo per la dimensione del batch globale. In questo caso: (2 + 3) / 4 = 1.25 e (4 + 5) / 4 = 2.25 .

Se utilizzi tf.metrics.Mean per tenere traccia della perdita tra le due repliche, il risultato è diverso. In questo esempio, si ottiene un total di 3,50 e un count di 2, che risulta in total / count = 1,75 quando result() viene chiamato sulla metrica. La perdita calcolata con tf.keras.Metrics viene ridimensionata di un fattore aggiuntivo uguale al numero di repliche sincronizzate.

Guida ed esempi

Di seguito sono riportati alcuni esempi di utilizzo della strategia di distribuzione con cicli di formazione personalizzati:

  1. Guida alla formazione distribuita
  2. Esempio DenseNet usando MirroredStrategy .
  3. Esempio BERT addestrato utilizzando MirroredStrategy e TPUStrategy . Questo esempio è particolarmente utile per capire come caricare da un checkpoint e generare checkpoint periodici durante l'addestramento distribuito, ecc.
  4. Esempio NCF addestrato utilizzando MirroredStrategy che può essere abilitato utilizzando il flag keras_use_ctl .
  5. Esempio NMT addestrato utilizzando MirroredStrategy .

Altri esempi elencati nella guida alla strategia di distribuzione .

Prossimi passi