Distributed Training in TensorFlow

View on TensorFlow.org Run in Google Colab View source on GitHub

Overview

The tf.distribute.Strategy API is an easy way to distribute your training across multiple devices/machines. Our goal is to allow users to use existing models and training code with minimal changes to enable distributed training.

Currently, in core TensorFlow, we support tf.distribute.MirroredStrategy. This does in-graph replication with synchronous training on many GPUs on one machine. Essentially, we create copies of all variables in the model's layers on each device. We then use all-reduce to combine gradients across the devices before applying them to the variables to keep them in sync.

Many other strategies are available in TensorFlow 1.12+ contrib and will soon be available in core TensorFlow. You can find more information about them in the contrib README. You can also read the public design review for updating the tf.distribute.Strategy API as part of the move from contrib to core TF.

Example with Keras API

Let's see how to scale to multiple GPUs on one machine using MirroredStrategy with tf.keras.

We will take a very simple model consisting of a single layer. First, we will import Tensorflow.

!pip install -q tf-nightly
import tensorflow as tf

To distribute a Keras model on multiple GPUs using MirroredStrategy, we first instantiate a MirroredStrategy object.

strategy = tf.distribute.MirroredStrategy(["/gpu:0", "/cpu:0"])
INFO:tensorflow:Device is available but not used by distribute strategy: /device:XLA_CPU:0
WARNING:tensorflow:Not all devices in `tf.distribute.Strategy` are visible to TensorFlow.

We then create and compile the Keras model in strategy.scope.

with strategy.scope():
  inputs = tf.keras.layers.Input(shape=(1,))
  predictions = tf.keras.layers.Dense(1)(inputs)
  model = tf.keras.models.Model(inputs=inputs, outputs=predictions)
  model.compile(loss='mse',
                optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2))
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow/python/ops/init_ops.py:1253: calling VarianceScaling.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow/python/ops/resource_variable_ops.py:439: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.

Let's also define a simple input dataset for training this model.

train_dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(10000).batch(10)

To train the model we call Keras fit API using the input dataset that we created earlier, same as how we would in a non-distributed case.

model.fit(train_dataset, epochs=5, steps_per_epoch=10)
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow/python/data/ops/dataset_ops.py:1763: DatasetV1.make_initializable_iterator (from tensorflow.python.data.ops.dataset_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `for ... in dataset:` to iterate over a dataset. If using tf.estimator, return the `Dataset` object directly from your input function. As a last resort, you can use `tf.compat.v1.data.make_initializable_iterator(dataset)`.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow/python/ops/math_ops.py:3067: to_int32 (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.cast instead.
Epoch 1/5
10/10 [==============================] - 0s 11ms/step - loss: 0.2001
Epoch 2/5
10/10 [==============================] - 0s 1ms/step - loss: 1.4211e-15
Epoch 3/5
10/10 [==============================] - 0s 1ms/step - loss: 0.0000e+00
Epoch 4/5
10/10 [==============================] - 0s 1ms/step - loss: 0.0000e+00
Epoch 5/5
10/10 [==============================] - 0s 1ms/step - loss: 0.0000e+00

Similarly, we can also call evaluate and predict as before using appropriate datasets.

eval_dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.evaluate(eval_dataset, steps=10)
 1/10 [==>...........................] - ETA: 0s - loss: 0.0000e+00
0.0
predict_dataset = tf.data.Dataset.from_tensors(([1.])).repeat(10).batch(2)
model.predict(predict_dataset, steps=10)
WARNING:tensorflow:Your dataset iterator ran out of data; interrupting training. Make sure that your dataset can generate at least `steps_per_epoch * epochs` batches (in this case, 10 batches). You may need touse the repeat() function when building your dataset.

array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)

That's all you need to train your model with Keras on multiple GPUs with MirroredStrategy. It will take care of splitting up the input dataset, replicating layers and variables on each device, and combining and applying gradients.

The model and input code does not have to change because we have changed the underlying components of TensorFlow (such as optimizer, batch norm and summaries) to become strategy-aware. That means those components know how to combine their state across devices. Further, saving and checkpointing works seamlessly, so you can save with one or no distribute strategy and resume with another.

Example with Estimator API

You can also use tf.distribute.Strategy API with Estimator. Let's see a simple example of it's usage with MirroredStrategy.

We will use the LinearRegressor premade estimator as an example. To use MirroredStrategy with Estimator, all we need to do is:

  • Create an instance of the MirroredStrategy class.
  • Pass it to the RunConfig parameter of the custom or premade Estimator.
strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=strategy, eval_distribute=strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
INFO:tensorflow:Device is available but not used by distribute strategy: /device:XLA_CPU:0
WARNING:tensorflow:Not all devices in `tf.distribute.Strategy` are visible to TensorFlow.
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpkz1301og
INFO:tensorflow:Using config: {'_tf_random_seed': None, '_log_step_count_steps': 100, '_keep_checkpoint_every_n_hours': 10000, '_num_ps_replicas': 0, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fd28c44b4a8>, '_master': '', '_num_worker_replicas': 1, '_experimental_distribute': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fd28c44b668>, '_task_id': 0, '_is_chief': True, '_distribute_coordinator_mode': None, '_keep_checkpoint_max': 5, '_global_id_in_cluster': 0, '_device_fn': None, '_service': None, '_protocol': None, '_save_checkpoints_steps': None, '_model_dir': '/tmp/tmpkz1301og', '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fd28c44b668>, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_save_summary_steps': 100, '_task_type': 'worker', '_save_checkpoints_secs': 600, '_evaluation_master': ''}

We will define a simple input function to feed data for training this model.

def input_fn():
  return tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.])).repeat(10000).batch(10)

Then we can call train on the regressor instance to train the model.

regressor.train(input_fn=input_fn, steps=10)
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:Partitioned variables are disabled when using current tf.distribute.Strategy.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow/python/feature_column/feature_column_v2.py:2705: to_float (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.cast instead.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpkz1301og/model.ckpt.
INFO:tensorflow:Initialize strategy
INFO:tensorflow:loss = 10.0, step = 0
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmpkz1301og/model.ckpt.
INFO:tensorflow:Finalize strategy.
INFO:tensorflow:Loss for final step: 1.6284133e+16.

And we can evaluate to evaluate the trained model.

regressor.evaluate(input_fn=input_fn, steps=10)
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-01-08T00:06:06Z
INFO:tensorflow:Graph was finalized.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow/python/training/saver.py:1266: checkpoint_exists (from tensorflow.python.training.checkpoint_management) is deprecated and will be removed in a future version.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
INFO:tensorflow:Restoring parameters from /tmp/tmpkz1301og/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Initialize strategy
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Finalize strategy.
INFO:tensorflow:Finished evaluation at 2019-01-08-00:06:07
INFO:tensorflow:Saving dict for global step 10: average_loss = 7.979226e+16, global_step = 10, label/mean = 1.0, loss = 7.979226e+17, prediction/mean = -282475260.0
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmpkz1301og/model.ckpt-10

{'average_loss': 7.979226e+16,
 'global_step': 10,
 'label/mean': 1.0,
 'loss': 7.979226e+17,
 'prediction/mean': -282475260.0}

That's it! This change will now configure estimator to run on all GPUs on your machine.

Customization and Performance Tips

Above, we showed the easiest way to use MirroredStrategy. There are few things you can customize in practice:

  • You can specify a list of specific GPUs (using param devices), in case you don't want auto detection.
  • You can specify various parameters for all reduce with the cross_device_ops param, such as the all reduce algorithm to use, and gradient repacking.

We've tried to make it such that you get the best performance for your existing model without having to specify any additional options. We also recommend you follow the tips from Input Pipeline Performance Guide. Specifically, we found using map_and_batch and dataset.prefetch in the input function gives a solid boost in performance. When using dataset.prefetch, use buffer_size=tf.data.experemental.AUTOTUNE to let it detect optimal buffer size.

Caveats

This API is still in progress there are a lot of improvements forthcoming:

  • Summaries are only computed in the first replica in MirroredStrategy.
  • PartitionedVariables are not supported yet.
  • Performance improvements, especially w.r.t. input handling, eager execution etc.

What's next?

tf.distribute.Strategy is actively under development and we will be adding more examples and tutorials in the near future. Please give it a try, we welcome your feedback via issues on GitHub.