Distributed training in TensorFlow

View on TensorFlow.org Run in Google Colab View source on GitHub

Overview

The tf.distribute.Strategy API provides an abstraction for distributing your training across multiple processing units. The goal is to allow users to enable distributed training using existing models and training code, with minimal changes.

This tutorial uses the tf.distribute.MirroredStrategy, which does in-graph replication with synchronous training on many GPUs on one machine. Essentially, it copies all of the model's variables to each processor. Then, it uses all-reduce to combine the gradients from all processors and applies the combined value to all copies of the model.

MirroredStategy is one of several distribution strategy available in TensorFlow core. You can read about more strategies at distribution strategy guide.

Keras API

This example uses the tf.keras API to build the model and training loop. For custom training loops, see the tf.distribute.Strategy with training loops tutorial.

Import Dependencies

# Import TensorFlow and TensorFlow Datasets
!pip install -q tensorflow-gpu==2.0.0-alpha0
!pip install -q tensorflow_datasets
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import tensorflow_datasets as tfds

import os

Download the dataset

Download the MNIST dataset and load it from TensorFlow Datasets. This returns a dataset in tf.data format.

Setting with_info to True includes the metadata for the entire dataset, which is being saved here to info. Among other things, this metadata object includes the number of train and test examples.

datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets['train'], datasets['test']
Downloading and preparing dataset mnist (11.06 MiB) to /home/kbuilder/tensorflow_datasets/mnist/1.0.0...

HBox(children=(IntProgress(value=1, bar_style='info', description='Dl Completed...', max=1, style=ProgressStyl…
HBox(children=(IntProgress(value=1, bar_style='info', description='Dl Size...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=1, bar_style='info', description='Extraction completed...', max=1, style=Prog…





HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

HBox(children=(IntProgress(value=0, description='Shuffling...', max=10, style=ProgressStyle(description_width=…
WARNING: Logging before flag parsing goes to stderr.
W0517 17:52:15.519026 139779903616768 deprecation.py:323] From /home/kbuilder/.local/lib/python3.5/site-packages/tensorflow_datasets/core/file_format_adapter.py:209: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`

HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=6000, style=ProgressStyle(description_width=…

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

HBox(children=(IntProgress(value=0, description='Shuffling...', max=1, style=ProgressStyle(description_width='…
HBox(children=(IntProgress(value=1, bar_style='info', description='Reading...', max=1, style=ProgressStyle(des…
HBox(children=(IntProgress(value=0, description='Writing...', max=10000, style=ProgressStyle(description_width…
Dataset mnist downloaded and prepared to /home/kbuilder/tensorflow_datasets/mnist/1.0.0. Subsequent calls will reuse this data.

Define Distribution Strategy

Create a MirroredStrategy object. This will handle distribution, and provides a context manager (tf.distribute.MirroredStrategy.scope) to build your model inside.

strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

Setup Input pipeline

If a model is trained on multiple GPUs, the batch size should be increased accordingly so as to make effective use of the extra computing power. Moreover, the learning rate should be tuned accordingly.

# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

Pixel values, which are 0-255, have to be normalized to the 0-1 range. Define this scale in a function.

def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label

Apply this function to the training and test data, shuffle the training data, and batch it for training.

train_dataset = mnist_train.map(scale).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

Create the model

Create and compile the Keras model in the context of strategy.scope.

with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])

  model.compile(loss='sparse_categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

Define the callbacks.

The callbacks used here are:

  • Tensorboard: This callback writes a log for Tensorboard which allows you to visualize the graphs.
  • Model Checkpoint: This callback saves the model after every epoch.
  • Learning Rate Scheduler: Using this callback, you can schedule the learning rate to change after every epoch/batch.

For illustrative purposes, add a print callback to display the learning rate in the notebook.

# Define the checkpoint directory to store the checkpoints

checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >= 3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

Train and evaluate

Now, train the model in the usual way, calling fit on the model and passing in the dataset created at the beginning of the tutorial. This step is the same whether you are distributing the training or not.

model.fit(train_dataset, epochs=10, callbacks=callbacks)
Epoch 1/10
    938/Unknown - 22s 23ms/step - loss: 0.1993 - accuracy: 0.9438
Learning rate for epoch 1 is 0.0010000000474974513
938/938 [==============================] - 22s 23ms/step - loss: 0.1993 - accuracy: 0.9438
Epoch 2/10
933/938 [============================>.] - ETA: 0s - loss: 0.0669 - accuracy: 0.9796
Learning rate for epoch 2 is 0.0010000000474974513
938/938 [==============================] - 15s 16ms/step - loss: 0.0669 - accuracy: 0.9796
Epoch 3/10
935/938 [============================>.] - ETA: 0s - loss: 0.0457 - accuracy: 0.9864
Learning rate for epoch 3 is 0.0010000000474974513
938/938 [==============================] - 15s 16ms/step - loss: 0.0457 - accuracy: 0.9864
Epoch 4/10
937/938 [============================>.] - ETA: 0s - loss: 0.0245 - accuracy: 0.9930
Learning rate for epoch 4 is 9.999999747378752e-05
938/938 [==============================] - 15s 16ms/step - loss: 0.0245 - accuracy: 0.9930
Epoch 5/10
935/938 [============================>.] - ETA: 0s - loss: 0.0213 - accuracy: 0.9943
Learning rate for epoch 5 is 9.999999747378752e-05
938/938 [==============================] - 15s 17ms/step - loss: 0.0213 - accuracy: 0.9943
Epoch 6/10
936/938 [============================>.] - ETA: 0s - loss: 0.0194 - accuracy: 0.9949
Learning rate for epoch 6 is 9.999999747378752e-05
938/938 [==============================] - 15s 16ms/step - loss: 0.0194 - accuracy: 0.9949
Epoch 7/10
935/938 [============================>.] - ETA: 0s - loss: 0.0178 - accuracy: 0.9957
Learning rate for epoch 7 is 9.999999747378752e-05
938/938 [==============================] - 15s 16ms/step - loss: 0.0178 - accuracy: 0.9956
Epoch 8/10
933/938 [============================>.] - ETA: 0s - loss: 0.0153 - accuracy: 0.9964
Learning rate for epoch 8 is 9.999999747378752e-06
938/938 [==============================] - 14s 15ms/step - loss: 0.0152 - accuracy: 0.9965
Epoch 9/10
937/938 [============================>.] - ETA: 0s - loss: 0.0150 - accuracy: 0.9965
Learning rate for epoch 9 is 9.999999747378752e-06
938/938 [==============================] - 14s 15ms/step - loss: 0.0150 - accuracy: 0.9965
Epoch 10/10
937/938 [============================>.] - ETA: 0s - loss: 0.0148 - accuracy: 0.9966
Learning rate for epoch 10 is 9.999999747378752e-06
938/938 [==============================] - 15s 16ms/step - loss: 0.0148 - accuracy: 0.9966

<tensorflow.python.keras.callbacks.History at 0x7f207857ccf8>

As you can see below, the checkpoints are getting saved.

# check the checkpoint directory
!ls {checkpoint_dir}
checkpoint           ckpt_5.data-00000-of-00002
ckpt_10.data-00000-of-00002  ckpt_5.data-00001-of-00002
ckpt_10.data-00001-of-00002  ckpt_5.index
ckpt_10.index            ckpt_6.data-00000-of-00002
ckpt_1.data-00000-of-00002   ckpt_6.data-00001-of-00002
ckpt_1.data-00001-of-00002   ckpt_6.index
ckpt_1.index             ckpt_7.data-00000-of-00002
ckpt_2.data-00000-of-00002   ckpt_7.data-00001-of-00002
ckpt_2.data-00001-of-00002   ckpt_7.index
ckpt_2.index             ckpt_8.data-00000-of-00002
ckpt_3.data-00000-of-00002   ckpt_8.data-00001-of-00002
ckpt_3.data-00001-of-00002   ckpt_8.index
ckpt_3.index             ckpt_9.data-00000-of-00002
ckpt_4.data-00000-of-00002   ckpt_9.data-00001-of-00002
ckpt_4.data-00001-of-00002   ckpt_9.index
ckpt_4.index

To see how the model perform, load the latest checkpoint and call evaluate on the test data.

Call evaluate as before using appropriate datasets.

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

eval_loss, eval_acc = model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
    157/Unknown - 5s 30ms/step - loss: 0.0380 - accuracy: 0.9872Eval loss: 0.038007019133274415, Eval Accuracy: 0.9872000217437744

To see the output, you can download and view the TensorBoard logs at the terminal.

$ tensorboard --logdir=path/to/log-directory
!ls -sh ./logs
total 4.0K
4.0K train

Export to SavedModel

If you want to export the graph and the variables, SavedModel is the best way of doing this. The model can be loaded back with or without the scope. Moreover, SavedModel is platform agnostic.

path = 'saved_model/'
tf.keras.experimental.export_saved_model(model, path)
W0517 17:55:06.781145 139779903616768 deprecation.py:323] From /home/kbuilder/.local/lib/python3.5/site-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:253: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
W0517 17:55:06.782722 139779903616768 export_utils.py:182] Export includes no default signature!
W0517 17:55:07.092861 139779903616768 export_utils.py:182] Export includes no default signature!

Load the model without strategy.scope.

unreplicated_model = tf.keras.experimental.load_from_saved_model(path)

unreplicated_model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
W0517 17:55:07.583333 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.beta_1
W0517 17:55:07.584523 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.beta_2
W0517 17:55:07.585199 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.decay
W0517 17:55:07.585765 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.learning_rate
W0517 17:55:07.586341 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-0.kernel
W0517 17:55:07.586936 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-0.bias
W0517 17:55:07.587494 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-1.kernel
W0517 17:55:07.588069 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-1.bias
W0517 17:55:07.588624 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-2.kernel
W0517 17:55:07.589210 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-2.bias
W0517 17:55:07.589818 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-0.kernel
W0517 17:55:07.590364 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-0.bias
W0517 17:55:07.590868 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-1.kernel
W0517 17:55:07.591382 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-1.bias
W0517 17:55:07.591873 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-2.kernel
W0517 17:55:07.592389 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-2.bias
W0517 17:55:07.595419 139779903616768 util.py:252] A checkpoint was restored (e.g. tf.train.Checkpoint.restore or tf.keras.Model.load_weights) but not all checkpointed values were used. See above for specific issues. Use expect_partial() on the load status object, e.g. tf.train.Checkpoint.restore(...).expect_partial(), to silence these warnings, or use assert_consumed() to make the check explicit. See https://www.tensorflow.org/alpha/guide/checkpoints#loading_mechanics for details.
W0517 17:55:07.601516 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer
W0517 17:55:07.602131 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.iter
W0517 17:55:07.602683 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.beta_1
W0517 17:55:07.603218 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.beta_2
W0517 17:55:07.603775 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.decay
W0517 17:55:07.604314 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer.learning_rate
W0517 17:55:07.604873 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-0.kernel
W0517 17:55:07.605429 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-0.bias
W0517 17:55:07.605927 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-1.kernel
W0517 17:55:07.606444 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-1.bias
W0517 17:55:07.606950 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-2.kernel
W0517 17:55:07.607475 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'm' for (root).layer_with_weights-2.bias
W0517 17:55:07.607969 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-0.kernel
W0517 17:55:07.608509 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-0.bias
W0517 17:55:07.609074 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-1.kernel
W0517 17:55:07.609719 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-1.bias
W0517 17:55:07.610311 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-2.kernel
W0517 17:55:07.610856 139779903616768 util.py:244] Unresolved object in checkpoint: (root).optimizer's state 'v' for (root).layer_with_weights-2.bias
W0517 17:55:07.611422 139779903616768 util.py:252] A checkpoint was restored (e.g. tf.train.Checkpoint.restore or tf.keras.Model.load_weights) but not all checkpointed values were used. See above for specific issues. Use expect_partial() on the load status object, e.g. tf.train.Checkpoint.restore(...).expect_partial(), to silence these warnings, or use assert_consumed() to make the check explicit. See https://www.tensorflow.org/alpha/guide/checkpoints#loading_mechanics for details.

    157/Unknown - 2s 10ms/step - loss: 0.0380 - accuracy: 0.9872Eval loss: 0.03800744543302134, Eval Accuracy: 0.9872000217437744

Load the model with strategy.scope.

with strategy.scope():
  replicated_model = tf.keras.experimental.load_from_saved_model(path)
  replicated_model.compile(loss='sparse_categorical_crossentropy',
                           optimizer=tf.keras.optimizers.Adam(),
                           metrics=['accuracy'])

  eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
  print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
    157/Unknown - 5s 29ms/step - loss: 0.0380 - accuracy: 0.9872Eval loss: 0.038007019133274415, Eval Accuracy: 0.9872000217437744

Next steps