RSVP für Ihr lokales TensorFlow Everywhere-Event noch heute!
Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Benutzerdefiniertes Training mit tf.distribute.Strategy

Ansicht auf TensorFlow.org In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Dieses Tutorial zeigt, wie Sie tf.distribute.Strategy mit benutzerdefinierten Trainingsschleifen verwenden. Wir werden ein einfaches CNN-Modell auf dem Mode-MNIST-Datensatz trainieren. Der MNIST-Modedatensatz enthält 60000 Zugbilder der Größe 28 x 28 und 10000 Testbilder der Größe 28 x 28.

Wir verwenden benutzerdefinierte Trainingsschleifen, um unser Modell zu trainieren, da sie uns Flexibilität und eine bessere Kontrolle über das Training bieten. Darüber hinaus ist es einfacher, das Modell und die Trainingsschleife zu debuggen.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

Laden Sie den Fashion MNIST-Datensatz herunter

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 1s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 1s 0us/step

Erstellen Sie eine Strategie zum Verteilen der Variablen und des Diagramms

Wie funktioniert tf.distribute.MirroredStrategy Strategie tf.distribute.MirroredStrategy ?

  • Alle Variablen und das Modelldiagramm werden auf den Replikaten repliziert.
  • Die Eingabe wird gleichmäßig auf die Replikate verteilt.
  • Jedes Replikat berechnet den Verlust und die Verläufe für die empfangene Eingabe.
  • Die Verläufe werden über alle Replikate hinweg synchronisiert, indem sie summiert werden.
  • Nach der Synchronisierung werden die Kopien der Variablen auf jedem Replikat auf dieselbe Weise aktualisiert.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Setup-Pipeline einrichten

Exportieren Sie das Diagramm und die Variablen in das plattformunabhängige SavedModel-Format. Nachdem Ihr Modell gespeichert wurde, können Sie es mit oder ohne Bereich laden.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

Erstellen Sie die Datensätze und verteilen Sie sie:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Erstellen Sie das Modell

Erstellen Sie ein Modell mit tf.keras.Sequential . Sie können dazu auch die Model Subclassing API verwenden.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definieren Sie die Verlustfunktion

Normalerweise wird auf einem einzelnen Computer mit 1 GPU / CPU der Verlust durch die Anzahl der Beispiele im Eingabestapel geteilt.

Wie sollte der Verlust berechnet werden, wenn eine tf.distribute.Strategy ?

  • Angenommen, Sie haben 4 GPUs und eine Stapelgröße von 64. Ein Eingabestapel wird auf die Replikate (4 GPUs) verteilt, wobei jedes Replikat eine Eingabe der Größe 16 erhält.

  • Das Modell auf jedem Replikat führt einen Vorwärtsdurchlauf mit seiner jeweiligen Eingabe durch und berechnet den Verlust. Anstatt den Verlust durch die Anzahl der Beispiele in der jeweiligen Eingabe (BATCH_SIZE_PER_REPLICA = 16) zu dividieren, sollte der Verlust durch GLOBAL_BATCH_SIZE (64) geteilt werden.

Warum das?

  • Dies muss getan werden , da nach dem die Gradienten auf jeder Replik berechnet werden, sie über die Replikate synchronisiert werden , indem sie summiert werden .

Wie geht das in TensorFlow?

  • Wenn Sie wie in diesem Lernprogramm eine benutzerdefinierte Trainingsschleife schreiben, sollten Sie die Verluste pro Beispiel summieren und die Summe durch GLOBAL_BATCH_SIZE dividieren: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) oder Sie können tf.nn.compute_average_loss dem der Verlust pro Beispiel, die optionalen Stichprobengewichte und GLOBAL_BATCH_SIZE als Argumente verwendet werden und der skalierte Verlust zurückgegeben wird.

  • Wenn Sie in Ihrem Modell Regularisierungsverluste verwenden, müssen Sie den Verlustwert nach Anzahl der Replikate skalieren. Sie können dies mit der Funktion tf.nn.scale_regularization_loss tun.

  • Die Verwendung von tf.reduce_mean wird nicht empfohlen. Dadurch wird der Verlust durch die tatsächliche Chargengröße pro Replikat dividiert, die von Schritt zu Schritt variieren kann.

  • Diese Reduzierung und Skalierung erfolgt automatisch in keras model.compile und model.fit

  • Wenntf.keras.losses Klassen vontf.keras.losses verwenden (wie im folgenden Beispiel), muss die Verlustreduzierung explizit als NONE oder SUM . AUTO und SUM_OVER_BATCH_SIZE sind bei Verwendung mit tf.distribute.Strategy nicht tf.distribute.Strategy . AUTO ist nicht AUTO , da der Benutzer explizit darüber nachdenken sollte, welche Reduzierung er vornehmen möchte, um sicherzustellen, dass sie im verteilten Fall korrekt ist. SUM_OVER_BATCH_SIZE ist nicht SUM_OVER_BATCH_SIZE , da es derzeit nur durch die SUM_OVER_BATCH_SIZE pro Replikat dividiert und die Division durch die Anzahl der Replikate dem Benutzer überlässt, was möglicherweise leicht zu übersehen ist. Stattdessen bitten wir den Benutzer, die Reduzierung selbst explizit vorzunehmen.

  • Wenn labels mehrdimensional sind, per_example_loss den per_example_loss pro Beispiel über die Anzahl der Elemente in jeder Stichprobe. Wenn beispielsweise die Form der predictions (batch_size, H, W, n_classes) und die labels (batch_size, H, W) müssen Sie per_example_loss wie per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32) aktualisieren: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

Definieren Sie die Metriken, um Verlust und Genauigkeit zu verfolgen

Diese Metriken verfolgen den Testverlust sowie die Trainings- und Testgenauigkeit. Sie können .result() jederzeit verwenden, um die gesammelten Statistiken .result() .

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

Trainingsschleife

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.50295090675354, Accuracy: 82.1116714477539, Test Loss: 0.3852590322494507, Test Accuracy: 86.5999984741211
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.32958829402923584, Accuracy: 88.20333862304688, Test Loss: 0.3391425311565399, Test Accuracy: 87.6500015258789
Epoch 3, Loss: 0.2872008979320526, Accuracy: 89.57167053222656, Test Loss: 0.2974696457386017, Test Accuracy: 89.31000518798828
Epoch 4, Loss: 0.255713552236557, Accuracy: 90.58499908447266, Test Loss: 0.2988712787628174, Test Accuracy: 89.31999969482422
Epoch 5, Loss: 0.23122134804725647, Accuracy: 91.41667175292969, Test Loss: 0.27742496132850647, Test Accuracy: 89.99000549316406
Epoch 6, Loss: 0.212575763463974, Accuracy: 92.17333221435547, Test Loss: 0.2573488652706146, Test Accuracy: 90.75
Epoch 7, Loss: 0.1963273137807846, Accuracy: 92.77166748046875, Test Loss: 0.2587501108646393, Test Accuracy: 90.66000366210938
Epoch 8, Loss: 0.1779220998287201, Accuracy: 93.46666717529297, Test Loss: 0.267805814743042, Test Accuracy: 90.55999755859375
Epoch 9, Loss: 0.16410504281520844, Accuracy: 93.91333770751953, Test Loss: 0.25632956624031067, Test Accuracy: 91.00999450683594
Epoch 10, Loss: 0.14829590916633606, Accuracy: 94.47833251953125, Test Loss: 0.25820475816726685, Test Accuracy: 91.00999450683594

Dinge, die im obigen Beispiel zu beachten sind:

Stellen Sie den neuesten Prüfpunkt wieder her und testen Sie ihn

Ein Modell mit einem tf.distribute.Strategy Checkpoint kann mit oder ohne Strategie wiederhergestellt werden.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.00999450683594

Alternative Möglichkeiten zum Durchlaufen eines Datasets

Iteratoren verwenden

Wenn Sie über eine bestimmte Anzahl von Schritten und nicht über das gesamte Dataset iterieren möchten, können Sie einen Iterator erstellen, indem Sie den iter und den Explizitätsaufruf als next auf dem Iterator verwenden. Sie können den Datensatz sowohl innerhalb als auch außerhalb der Funktion tf. iterieren. Hier ist ein kleiner Ausschnitt, der die Iteration des Datensatzes außerhalb der Funktion tf. mit einem Iterator demonstriert.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.12157603353261948, Accuracy: 95.0
Epoch 10, Loss: 0.1367541253566742, Accuracy: 94.6875
Epoch 10, Loss: 0.14902949333190918, Accuracy: 93.90625
Epoch 10, Loss: 0.12149540334939957, Accuracy: 95.625
Epoch 10, Loss: 0.13160167634487152, Accuracy: 94.6875
Epoch 10, Loss: 0.13297739624977112, Accuracy: 95.3125
Epoch 10, Loss: 0.16038034856319427, Accuracy: 94.53125
Epoch 10, Loss: 0.1035340279340744, Accuracy: 96.40625
Epoch 10, Loss: 0.11846740543842316, Accuracy: 95.625
Epoch 10, Loss: 0.09006750583648682, Accuracy: 96.71875

Iterieren innerhalb einer tf.Funktion

Sie können auch über die gesamte Eingabe train_dist_dataset innerhalb einer tf.- train_dist_dataset iterieren, indem Sie das Konstrukt for x in ... verwenden oder Iteratoren wie oben beschrieben erstellen. Das folgende Beispiel zeigt, wie eine Trainingsepoche in eine tf.-Funktion eingeschlossen und über train_dist_dataset innerhalb der Funktion train_dist_dataset wird.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13680464029312134, Accuracy: 94.90499877929688
Epoch 2, Loss: 0.12503673136234283, Accuracy: 95.33499908447266
Epoch 3, Loss: 0.11472766101360321, Accuracy: 95.71333312988281
Epoch 4, Loss: 0.10419528931379318, Accuracy: 96.13500213623047
Epoch 5, Loss: 0.09566374123096466, Accuracy: 96.44833374023438
Epoch 6, Loss: 0.08704081922769547, Accuracy: 96.82499694824219
Epoch 7, Loss: 0.08157625794410706, Accuracy: 96.96333312988281
Epoch 8, Loss: 0.07562965154647827, Accuracy: 97.11000061035156
Epoch 9, Loss: 0.0676642507314682, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06430575996637344, Accuracy: 97.58333587646484

Verfolgung des Trainingsverlusts über Replikate hinweg

Wir empfehlen nicht , mit tf.metrics.Mean über verschiedene Repliken der Trainingsverlust zu verfolgen, weil der Verlust Skalierung Berechnung , die durchgeführt wird.

Zum Beispiel, wenn Sie einen Schulungsjob mit den folgenden Merkmalen ausführen:

  • Zwei Repliken
  • Auf jeder Replik werden zwei Proben verarbeitet
  • Resultierende Verlustwerte: [2, 3] und [4, 5] auf jeder Replik
  • Globale Chargengröße = 4

Bei der Verlustskalierung berechnen Sie den Verlustwert pro Stichprobe für jedes Replikat, indem Sie die Verlustwerte addieren und dann durch die globale Stapelgröße dividieren. In diesem Fall: (2 + 3) / 4 = 1.25 und (4 + 5) / 4 = 2.25 .

Wenn Sie tf.metrics.Mean , um den Verlust zwischen den beiden Replikaten zu verfolgen, ist das Ergebnis unterschiedlich. In diesem Beispiel erhalten Sie total 3,50 und eine count von 2, was zu total / count = 1,75 führt, wenn result() für die Metrik aufgerufen wird. Der mit tf.keras.Metrics berechnete tf.keras.Metrics wird um einen zusätzlichen Faktor skaliert, der der Anzahl der synchronen Replikate entspricht.

Leitfaden und Beispiele

Hier einige Beispiele für die Verwendung der Verteilungsstrategie mit benutzerdefinierten Trainingsschleifen:

  1. Verteilter Trainingsleitfaden
  2. DenseNet- Beispiel mit MirroredStrategy .
  3. BERT- Beispiel, das mit MirroredStrategy und TPUStrategy trainiert wurde. Dieses Beispiel ist besonders hilfreich, um zu verstehen, wie während eines verteilten Trainings usw. von einem Kontrollpunkt aus geladen und regelmäßige Kontrollpunkte generiert werden.
  4. NCF- Beispiel, das mit MirroredStrategy trainiert wurde und mit dem Flag keras_use_ctl aktiviert werden keras_use_ctl .
  5. NMT- Beispiel mit MirroredStrategy trainiert.

Weitere Beispiele finden Sie im Handbuch zur Vertriebsstrategie .

Nächste Schritte

  • Probieren Sie die neue API tf.distribute.Strategy für Ihre Modelle aus.
  • Besuchen Sie den Abschnitt Leistung im Handbuch, um mehr über andere Strategien und Tools zu erfahren, mit denen Sie die Leistung Ihrer TensorFlow-Modelle optimieren können.