Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Szkolenie niestandardowe z tf.distribute.Strategy

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Ten samouczek pokazuje, jak używać tf.distribute.Strategy z niestandardowymi pętlami szkoleniowymi. Wyszkolimy prosty model CNN na zestawie danych MNIST dotyczących mody. Zbiór danych MNIST dotyczący mody zawiera 60000 obrazów pociągów o wymiarach 28 x 28 i 10000 obrazów testowych o rozmiarze 28 x 28.

Używamy niestandardowych pętli treningowych do trenowania naszego modelu, ponieważ zapewniają nam elastyczność i większą kontrolę nad treningiem. Ponadto łatwiej jest debugować model i pętlę szkoleniową.

 # Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.2.0

Pobierz zbiór danych dotyczących mody MNIST

 fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 0s 0us/step

Stwórz strategię dystrybucji zmiennych i wykresu

Jak działa strategia tf.distribute.MirroredStrategy ?

  • Wszystkie zmienne i wykres modelu są replikowane na replikach.
  • Dane wejściowe są równomiernie rozprowadzane w replikach.
  • Każda replika oblicza straty i gradienty dla otrzymanych danych wejściowych.
  • Gradienty są synchronizowane we wszystkich replikach, sumując je.
  • Po synchronizacji ta sama aktualizacja jest wykonywana w kopiach zmiennych w każdej replice.
 # If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

 print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
 
Number of devices: 1

Skonfiguruj potok wejściowy

Wyeksportuj wykres i zmienne do formatu SavedModel niezależnego od platformy. Po zapisaniu modelu możesz załadować go z lunetą lub bez.

 BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10
 

Utwórz zbiory danych i rozpowszechnij je:

 train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
 

Utwórz model

Utwórz model za pomocą tf.keras.Sequential . W tym celu można również użyć interfejsu API Model Subclassing.

 def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
 
 # Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
 

Zdefiniuj funkcję straty

Zwykle na pojedynczej maszynie z 1 GPU / CPU strata jest dzielona przez liczbę przykładów w partii danych wejściowych.

Jak więc obliczyć stratę, używając tf.distribute.Strategy ?

  • Na przykład, powiedzmy, że masz 4 procesory GPU i rozmiar partii 64. Jedna partia danych wejściowych jest rozdzielana na repliki (4 procesory GPU), a każda replika otrzymuje dane wejściowe o rozmiarze 16.

  • Model na każdej replice wykonuje ruch do przodu z odpowiednim wejściem i oblicza straty. Teraz zamiast dzielić stratę przez liczbę przykładów na odpowiednim wejściu (BATCH_SIZE_PER_REPLICA = 16), stratę należy podzielić przez GLOBAL_BATCH_SIZE (64).

Czemu to robić?

  • Należy to zrobić, ponieważ po obliczeniu gradientów w każdej replice są one synchronizowane między replikami przez zsumowanie .

Jak to zrobić w TensorFlow?

  • Jeśli piszesz niestandardową pętlę szkoleniową, tak jak w tym samouczku, powinieneś zsumować straty na przykład i podzielić sumę przez GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) lub możesz użyć tf.nn.compute_average_loss który przyjmuje jako argumenty stratę na przykład, opcjonalne wagi próbek i GLOBAL_BATCH_SIZE i zwraca skalowaną stratę.

  • Jeśli używasz w modelu strat regularyzacyjnych, musisz przeskalować wartość strat według liczby replik. Możesz to zrobić za pomocą funkcji tf.nn.scale_regularization_loss .

  • Używanie tf.reduce_mean nie jest zalecane. Spowoduje to podzielenie strat przez rzeczywisty rozmiar partii repliki, który może się zmieniać w zależności od kroku.

  • Ta redukcja i skalowanie jest wykonywane automatycznie w keras model.compile i model.fit

  • W przypadku używania klas tf.keras.losses (jak w poniższym przykładzie), redukcja strat musi być wyraźnie określona jako NONE lub SUM . AUTO i SUM_OVER_BATCH_SIZE są niedozwolone, gdy są używane z tf.distribute.Strategy . AUTO jest niedozwolone, ponieważ użytkownik powinien wyraźnie pomyśleć o tym, jaką redukcję chce, aby upewnić się, że jest poprawna w przypadku rozproszonym. SUM_OVER_BATCH_SIZE jest niedozwolone, ponieważ obecnie dzieli się tylko przez rozmiar partii replik i pozostawia dzielenie przez liczbę replik użytkownikowi, co może być łatwe do przeoczenia. Zamiast tego prosimy użytkownika o wyraźne dokonanie redukcji.

  • Jeśli labels są wielowymiarowe, per_example_loss liczby elementów w każdej próbce. Na przykład, jeśli kształt predictions to (batch_size, H, W, n_classes) a labels to (batch_size, H, W) , będziesz musiał zaktualizować per_example_loss przykład: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

 with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
 

Zdefiniuj wskaźniki, aby śledzić straty i dokładność

Te metryki śledzą utratę testów i szkolenie oraz dokładność testów. Możesz użyć .result() aby uzyskać .result() statystyki w dowolnym momencie.

 with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
 

Pętla treningowa

 # model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
 
 def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
 
 # `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
 
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.5121287107467651, Accuracy: 81.44499969482422, Test Loss: 0.4022502303123474, Test Accuracy: 85.36000061035156
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.33714210987091064, Accuracy: 87.89167022705078, Test Loss: 0.3380376994609833, Test Accuracy: 87.9000015258789
Epoch 3, Loss: 0.2912210524082184, Accuracy: 89.28166961669922, Test Loss: 0.29934415221214294, Test Accuracy: 88.97000122070312
Epoch 4, Loss: 0.25845447182655334, Accuracy: 90.49666595458984, Test Loss: 0.2924875319004059, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.23627422749996185, Accuracy: 91.37667083740234, Test Loss: 0.2735322117805481, Test Accuracy: 89.72000122070312
Epoch 6, Loss: 0.217362642288208, Accuracy: 91.90666961669922, Test Loss: 0.2779887914657593, Test Accuracy: 89.55000305175781
Epoch 7, Loss: 0.19758468866348267, Accuracy: 92.77666473388672, Test Loss: 0.2544868290424347, Test Accuracy: 90.58000183105469
Epoch 8, Loss: 0.18331800401210785, Accuracy: 93.22000122070312, Test Loss: 0.27537408471107483, Test Accuracy: 90.30000305175781
Epoch 9, Loss: 0.16616544127464294, Accuracy: 93.92832946777344, Test Loss: 0.27181997895240784, Test Accuracy: 89.74000549316406
Epoch 10, Loss: 0.15189047157764435, Accuracy: 94.3499984741211, Test Loss: 0.29621848464012146, Test Accuracy: 89.8800048828125

Rzeczy, o których należy pamiętać w powyższym przykładzie:

Przywróć najnowszy punkt kontrolny i wykonaj test

Model punktu kontrolnego z tf.distribute.Strategy można przywrócić ze strategią lub bez niej.

 eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
 
 @tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
 
 checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
 
Accuracy after restoring the saved model without strategy: 89.74000549316406

Alternatywne sposoby iteracji po zbiorze danych

Korzystanie z iteratorów

Jeśli chcesz wykonać iterację przez określoną liczbę kroków, a nie przez cały zestaw danych, możesz utworzyć iterator, używając wywołania iter i wywołania explicity next w iteratorze. Możesz wybrać iterację zestawu danych zarówno wewnątrz, jak i na zewnątrz funkcji tf. Oto mały fragment ilustrujący iterację zestawu danych poza funkcją tf.funkcja za pomocą iteratora.

 for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
 
Epoch 10, Loss: 0.15917226672172546, Accuracy: 95.0
Epoch 10, Loss: 0.12831072509288788, Accuracy: 95.3125
Epoch 10, Loss: 0.1469763219356537, Accuracy: 94.21875
Epoch 10, Loss: 0.1432376652956009, Accuracy: 94.53125
Epoch 10, Loss: 0.10891322791576385, Accuracy: 96.5625
Epoch 10, Loss: 0.11562182009220123, Accuracy: 95.46875
Epoch 10, Loss: 0.12230901420116425, Accuracy: 95.625
Epoch 10, Loss: 0.149966761469841, Accuracy: 94.21875
Epoch 10, Loss: 0.13360166549682617, Accuracy: 94.6875
Epoch 10, Loss: 0.13077859580516815, Accuracy: 95.0

Iterowanie wewnątrz funkcji tf

Możesz również iterować po całym wejściowym train_dist_dataset danych train_dist_dataset wewnątrz funkcji tf., używając konstrukcji for x in ... lub tworząc iteratory, tak jak to zrobiliśmy powyżej. Poniższy przykład demonstruje pakowanie jednej epoki uczenia w tf.function i iterowanie po train_dist_dataset wewnątrz funkcji.

 @tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
 
Epoch 1, Loss: 0.1388315111398697, Accuracy: 94.83000183105469
Epoch 2, Loss: 0.1286177784204483, Accuracy: 95.1433334350586
Epoch 3, Loss: 0.11653201282024384, Accuracy: 95.70000457763672
Epoch 4, Loss: 0.1051444411277771, Accuracy: 96.06666564941406
Epoch 5, Loss: 0.09490712732076645, Accuracy: 96.46833801269531
Epoch 6, Loss: 0.08746836334466934, Accuracy: 96.77166748046875
Epoch 7, Loss: 0.07981529831886292, Accuracy: 97.04833984375
Epoch 8, Loss: 0.07256246358156204, Accuracy: 97.28666687011719
Epoch 9, Loss: 0.06466992199420929, Accuracy: 97.63333129882812
Epoch 10, Loss: 0.06099675968289375, Accuracy: 97.73833465576172

Śledzenie utraty treningu w replikach

Nie zalecamy korzystania tf.metrics.Mean śledzić utratę szkolenia na różnych replikach, ze względu na obliczenia strat skalowania, które jest prowadzone.

Na przykład, jeśli prowadzisz szkolenie o następujących cechach:

  • Dwie repliki
  • Na każdej replice przetwarzane są dwie próbki
  • Wynikowe wartości strat: [2, 3] i [4, 5] na każdej replice
  • Globalna wielkość partii = 4

W przypadku skalowania strat oblicza się wartość strat na próbkę w każdej replice, dodając wartości strat, a następnie dzieląc przez globalny rozmiar partii. W tym przypadku: (2 + 3) / 4 = 1.25 i (4 + 5) / 4 = 2.25 .

Jeśli używasz tf.metrics.Mean do śledzenia strat w obu replikach, wynik będzie inny. W tym przykładzie otrzymujesz total 3,50 i count 2, co daje total / count = 1,75, gdy result() jest metryka result() . Strata obliczana za pomocą tf.keras.Metrics jest skalowana o dodatkowy współczynnik równy liczbie synchronizowanych replik.

Przewodnik i przykłady

Oto kilka przykładów użycia strategii dystrybucji z niestandardowymi pętlami szkoleniowymi:

  1. Rozproszony przewodnik szkoleniowy
  2. Przykład DenseNet przy użyciu MirroredStrategy .
  3. Przykład BERT został przeszkolony przy użyciu MirroredStrategy i TPUStrategy . Ten przykład jest szczególnie pomocny w zrozumieniu, jak ładować z punktu kontrolnego i generować okresowe punkty kontrolne podczas szkolenia rozproszonego itp.
  4. Przykład NCF wytrenowany przy użyciu MirroredStrategy który można włączyć za pomocą flagi keras_use_ctl .
  5. Przykład NMT został przeszkolony przy użyciu MirroredStrategy .

Więcej przykładów znajduje się w przewodniku po strategii dystrybucji .

Następne kroki