Library for running a computation across multiple devices.
The intent of this library is that you can write an algorithm in a stylized way
and it will be usable with a variety of different tf.distribute.Strategy
implementations. Each descendant will implement a different strategy for
distributing the algorithm across multiple devices/machines. Furthermore, these
changes can be hidden inside the specific layers and other library classes that
need special treatment to run in a distributed setting, so that most users'
model definition code can run unchanged. The tf.distribute.Strategy
API works
the same way with eager and graph execution.
Guides
Tutorials
Distributed Training Tutorials
The tutorials cover how to use
tf.distribute.Strategy
to do distributed training with native Keras APIs, custom training loops, and Estimator APIs. They also cover how to save/load model when usingtf.distribute.Strategy
.
Glossary
- Data parallelism is where we run multiple copies of the model on different slices of the input data. This is in contrast to model parallelism where we divide up a single copy of a model across multiple devices. Note: we only support data parallelism for now, but hope to add support for model parallelism in the future.
- A device is a CPU or accelerator (e.g. GPUs, TPUs) on some machine that
TensorFlow can run operations on (see e.g.
tf.device
). You may have multiple devices on a single machine, or be connected to devices on multiple machines. Devices used to run computations are called worker devices. Devices used to store variables are parameter devices. For some strategies, such astf.distribute.MirroredStrategy
, the worker and parameter devices will be the same (see mirrored variables below). For others they will be different. For example,tf.distribute.experimental.CentralStorageStrategy
puts the variables on a single device (which may be a worker device or may be the CPU), andtf.distribute.experimental.ParameterServerStrategy
puts the variables on separate machines called parameter servers (see below). - A replica is one copy of the model, running on one slice of the input data. Right now each replica is executed on its own worker device, but once we add support for model parallelism a replica may span multiple worker devices.
- A host is the CPU device on a machine with worker devices, typically used for running input pipelines.
- A worker is defined to be the physical machine(s) containing the physical devices (e.g. GPUs, TPUs) on which the replicated computation is executed. A worker may contain one or more replicas, but contains at least one replica. Typically one worker will correspond to one machine, but in the case of very large models with model parallelism, one worker may span multiple machines. We typically run one input pipeline per worker, feeding all the replicas on that worker.
- Synchronous, or more commonly sync, training is where the updates from each replica are aggregated together before updating the model variables. This is in contrast to asynchronous, or async training, where each replica updates the model variables independently. You may also have replicas partitioned into groups which are in sync within each group but async between groups.
Parameter servers: These are machines that hold a single copy of parameters/variables, used by some strategies (right now just
tf.distribute.experimental.ParameterServerStrategy
). All replicas that want to operate on a variable retrieve it at the beginning of a step and send an update to be applied at the end of the step. These can in principle support either sync or async training, but right now we only have support for async training with parameter servers. Compare totf.distribute.experimental.CentralStorageStrategy
, which puts all variables on a single device on the same machine (and does sync training), andtf.distribute.MirroredStrategy
, which mirrors variables to multiple devices (see below).Replica context vs. Cross-replica context vs Update context
A replica context applies when you execute the computation function that was called with
strategy.run
. Conceptually, you're in replica context when executing the computation function that is being replicated.An update context is entered in a
tf.distribute.StrategyExtended.update
call.An cross-replica context is entered when you enter a
strategy.scope
. This is useful for callingtf.distribute.Strategy
methods which operate across the replicas (likereduce_to()
). By default you start in a replica context (the "default single replica context") and then some methods can switch you back and forth.Distributed value: Distributed value is represented by the base class
tf.distribute.DistributedValues
.tf.distribute.DistributedValues
is useful to represent values on multiple devices, and it contains a map from replica id to values. Two representative types oftf.distribute.DistributedValues
aretf.types.experimental.PerReplica
andtf.types.experimental.Mirrored
values.PerReplica
values exist on the worker devices, with a different value for each replica. They are produced by iterating through a distributed dataset returned bytf.distribute.Strategy.experimental_distribute_dataset
andtf.distribute.Strategy.distribute_datasets_from_function
. They are also the typical result returned bytf.distribute.Strategy.run
.Mirrored
values are likePerReplica
values, except we know that the value on all replicas are the same.Mirrored
values are kept synchronized by the distribution strategy in use, whilePerReplica
values are left unsynchronized.Mirrored
values typically represent model weights. We can safely read aMirrored
value in a cross-replica context by using the value on any replica, while PerReplica values can only be read within a replica context.Unwrapping and merging: Consider calling a function
fn
on multiple replicas, likestrategy.run(fn, args=[w])
with an argumentw
that is atf.distribute.DistributedValues
. This meansw
will have a map taking replica id0
tow0
, replica id1
tow1
, etc.strategy.run()
unwrapsw
before callingfn
, so it callsfn(w0)
on deviced0
,fn(w1)
on deviced1
, etc. It then merges the return values fromfn()
, which leads to one common object if the returned values are the same object from every replica, or aDistributedValues
object otherwise.Reductions and all-reduce: A reduction is a method of aggregating multiple values into one value, like "sum" or "mean". If a strategy is doing sync training, we will perform a reduction on the gradients to a parameter from all replicas before applying the update. All-reduce is an algorithm for performing a reduction on values from multiple devices and making the result available on all of those devices.
Mirrored variables: These are variables that are created on multiple devices, where we keep the variables in sync by applying the same updates to every copy. Mirrored variables are created with
tf.Variable(...synchronization=tf.VariableSynchronization.ON_WRITE...)
. Normally they are only used in synchronous training.SyncOnRead variables
SyncOnRead variables are created by
tf.Variable(...synchronization=tf.VariableSynchronization.ON_READ...)
, and they are created on multiple devices. In replica context, each component variable on the local replica can perform reads and writes without synchronization with each other. When the SyncOnRead variable is read in cross-replica context, the values from component variables are aggregated and returned.SyncOnRead variables bring a lot of custom configuration difficulty to the underlying logic, so we do not encourage users to instantiate and use SyncOnRead variable on their own. We have mainly used SyncOnRead variables for use cases such as batch norm and metrics. For performance reasons, we often don't need to keep these statistics in sync every step and they can be accumulated on each replica independently. The only time we want to sync them is reporting or checkpointing, which typically happens in cross-replica context. SyncOnRead variables are also often used by advanced users who want to control when variable values are aggregated. For example, users sometimes want to maintain gradients independently on each replica for a couple of steps without aggregation.
Distribute-aware layers
Layers are generally called in a replica context, except when defining a Keras functional model.
tf.distribute.in_cross_replica_context
will let you determine which case you are in. If in a replica context, thetf.distribute.get_replica_context
function will return the default replica context outside a strategy scope,None
within a strategy scope, and atf.distribute.ReplicaContext
object inside a strategy scope and within atf.distribute.Strategy.run
function. TheReplicaContext
object has anall_reduce
method for aggregating across all replicas.
Note that we provide a default version of tf.distribute.Strategy
that is
used when no other strategy is in scope, that provides the same API with
reasonable default behavior.
Modules
cluster_resolver
module: Library imports for ClusterResolvers.
experimental
module: Experimental Distribution Strategy library.
Classes
class CrossDeviceOps
: Base class for cross-device reduction and broadcasting algorithms.
class HierarchicalCopyAllReduce
: Hierarchical copy all-reduce implementation of CrossDeviceOps.
class InputContext
: A class wrapping information needed by an input function.
class InputReplicationMode
: Replication mode for input function.
class MirroredStrategy
: Synchronous training across multiple replicas on one machine.
class NcclAllReduce
: NCCL all-reduce implementation of CrossDeviceOps.
class OneDeviceStrategy
: A distribution strategy for running on a single device.
class ReduceOp
: Indicates how a set of values should be reduced.
class ReductionToOneDevice
: A CrossDeviceOps implementation that copies values to one device to reduce.
class ReplicaContext
: A class with a collection of APIs that can be called in a replica context.
class RunOptions
: Run options for strategy.run
.
class Server
: An in-process TensorFlow server, for use in distributed training.
class Strategy
: A list of devices with a state & compute distribution policy.
class StrategyExtended
: Additional APIs for algorithms that need to be distribution-aware.
Functions
experimental_set_strategy(...)
: Set a tf.distribute.Strategy
as current without with strategy.scope()
.
get_loss_reduction(...)
: tf.distribute.ReduceOp
corresponding to the last loss reduction.
get_replica_context(...)
: Returns the current tf.distribute.ReplicaContext
or None
.
get_strategy(...)
: Returns the current tf.distribute.Strategy
object.
has_strategy(...)
: Return if there is a current non-default tf.distribute.Strategy
.
in_cross_replica_context(...)
: Returns True
if in a cross-replica context.