![]() |
![]() |
A state & compute distribution policy on a list of devices.
tf.distribute.Strategy(
extended
)
See the guide
for overview and examples. See tf.distribute.StrategyExtended
and
tf.distribute
for a glossory of concepts mentioned on this page such as "per-replica",
replica, and reduce.
In short:
- To use it with Keras
compile
/fit
, please read. - You may pass descendant of
tf.distribute.Strategy
totf.estimator.RunConfig
to specify how atf.estimator.Estimator
should distribute its computation. See guide. - Otherwise, use
tf.distribute.Strategy.scope
to specify that a strategy should be used when building an executing your model. (This puts you in the "cross-replica context" for this strategy, which means the strategy is put in control of things like variable placement.) If you are writing a custom training loop, you will need to call a few more methods, see the guide:
- Start by creating a
tf.data.Dataset
normally. Use
tf.distribute.Strategy.experimental_distribute_dataset
to convert atf.data.Dataset
to something that produces "per-replica" values. If you want to manually specify how the dataset should be partitioned across replicas, usetf.distribute.Strategy.distribute_datasets_from_function
instead.Use
tf.distribute.Strategy.run
to run a function once per replica, taking values that may be "per-replica" (e.g. from atf.distribute.DistributedDataset
object) and returning "per-replica" values. This function is executed in "replica context", which means each operation is performed separately on each replica.Finally use a method (such as
tf.distribute.Strategy.reduce
) to convert the resulting "per-replica" values into ordinaryTensor
s.
- Start by creating a
A custom training loop can be as simple as:
with my_strategy.scope():
@tf.function
def distribute_train_epoch(dataset):
def replica_fn(input):
# process input and return result
return result
total_result = 0
for x in dataset:
per_replica_result = my_strategy.run(replica_fn, args=(x,))
total_result += my_strategy.reduce(tf.distribute.ReduceOp.SUM,
per_replica_result, axis=None)
return total_result
dist_dataset = my_strategy.experimental_distribute_dataset(dataset)
for _ in range(EPOCHS):
train_result = distribute_train_epoch(dist_dataset)
This takes an ordinary dataset
and replica_fn
and runs it
distributed using a particular tf.distribute.Strategy
named
my_strategy
above. Any variables created in replica_fn
are created
using my_strategy
's policy, and library functions called by
replica_fn
can use the get_replica_context()
API to implement
distributed-specific behavior.
You can use the reduce
API to aggregate results across replicas and use
this as a return value from one iteration over a
tf.distribute.DistributedDataset
. Or
you can use tf.keras.metrics
(such as loss, accuracy, etc.) to
accumulate metrics across steps in a given epoch.
See the custom training loop tutorial for a more detailed example.
Attributes | |
---|---|
cluster_resolver
|
Returns the cluster resolver associated with this strategy.
In general, when using a multi-worker Strategies that intend to have an associated
Single-worker strategies usually do not have a
The
For more information, please see
|
extended
|
tf.distribute.StrategyExtended with additional methods.
|
num_replicas_in_sync
|
Returns number of replicas over which gradients are aggregated. |
Methods
distribute_datasets_from_function
distribute_datasets_from_function(
dataset_fn, options=None
)
Distributes tf.data.Dataset
instances created by calls to dataset_fn
.
The argument dataset_fn
that users pass in is an input function that has a
tf.distribute.InputContext
argument and returns a tf.data.Dataset
instance. It is expected that the returned dataset from dataset_fn
is
already batched by per-replica batch size (i.e. global batch size divided by
the number of replicas in sync) and sharded.
tf.distribute.Strategy.distribute_datasets_from_function
does
not batch or shard the tf.data.Dataset
instance
returned from the input function. dataset_fn
will be called on the CPU
device of each of the workers and each generates a dataset where every
replica on that worker will dequeue one batch of inputs (i.e. if a worker
has two replicas, two batches will be dequeued from the Dataset
every
step).
This method can be used for several purposes. First, it allows you to
specify your own batching and sharding logic. (In contrast,
tf.distribute.experimental_distribute_dataset
does batching and sharding
for you.) For example, where
experimental_distribute_dataset
is unable to shard the input files, this
method might be used to manually shard the dataset (avoiding the slow
fallback behavior in experimental_distribute_dataset
). In cases where the
dataset is infinite, this sharding can be done by creating dataset replicas
that differ only in their random seed.
The dataset_fn
should take an tf.distribute.InputContext
instance where
information about batching and input replication can be accessed.
You can use element_spec
property of the
tf.distribute.DistributedDataset
returned by this API to query the
tf.TypeSpec
of the elements returned by the iterator. This can be used to
set the input_signature
property of a tf.function
. Follow
tf.distribute.DistributedDataset.element_spec
to see an example.
For a tutorial on more usage and properties of this method, refer to the tutorial on distributed input). If you are interested in last partial batch handling, read this section.
Args | |
---|---|
dataset_fn
|
A function taking a tf.distribute.InputContext instance and
returning a tf.data.Dataset .
|
options
|
tf.distribute.InputOptions used to control options on how this
dataset is distributed.
|
Returns | |
---|---|
A tf.distribute.DistributedDataset .
|
experimental_distribute_dataset
experimental_distribute_dataset(
dataset, options=None
)
Creates tf.distribute.DistributedDataset
from tf.data.Dataset
.
The returned tf.distribute.DistributedDataset
can be iterated over
similar to regular datasets.
NOTE: The user cannot add any more transformations to a
tf.distribute.DistributedDataset
. You can only create an iterator or
examine the tf.TypeSpec
of the data generated by it. See API docs of
tf.distribute.DistributedDataset
to learn more.
The following is an example:
global_batch_size = 2
# Passing the devices is optional.
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "GPU:1"])
# Create a dataset
dataset = tf.data.Dataset.range(4).batch(global_batch_size)
# Distribute that dataset
dist_dataset = strategy.experimental_distribute_dataset(dataset)
@tf.function
def replica_fn(input):
return input*2
result = []
# Iterate over the `tf.distribute.DistributedDataset`
for x in dist_dataset:
# process dataset elements
result.append(strategy.run(replica_fn, args=(x,)))
print(result)
[PerReplica:{
0: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([0])>,
1: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([2])>
}, PerReplica:{
0: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([4])>,
1: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([6])>
}]
Three key actions happending under the hood of this method are batching, sharding, and prefetching.
In the code snippet above, dataset
is batched by global_batch_size
, and
calling experimental_distribute_dataset
on it rebatches dataset
to a
new batch size that is equal to the global batch size divided by the number
of replicas in sync. We iterate through it using a Pythonic for loop.
x
is a tf.distribute.DistributedValues
containing data for all replicas,
and each replica gets data of the new batch size.
tf.distribute.Strategy.run
will take care of feeding the right per-replica
data in x
to the right replica_fn
executed on each replica.
Sharding contains autosharding across multiple workers and within every
worker. First, in multi-worker distributed training (i.e. when you use
tf.distribute.experimental.MultiWorkerMirroredStrategy
or tf.distribute.TPUStrategy
), autosharding a dataset over a set of
workers means that each worker is assigned a subset of the entire dataset
(if the right tf.data.experimental.AutoShardPolicy
is set). This is to
ensure that at each step, a global batch size of non-overlapping dataset
elements will be processed by each worker. Autosharding has a couple of
different options that can be specified using
tf.data.experimental.DistributeOptions
. Then, sharding within each worker
means the method will split the data among all the worker devices (if more
than one a present). This will happen regardless of multi-worker
autosharding.
By default, this method adds a prefetch transformation at the end of the
user provided tf.data.Dataset
instance. The argument to the prefetch
transformation which is buffer_size
is equal to the number of replicas in
sync.
If the above batch splitting and dataset sharding logic is undesirable,
please use
tf.distribute.Strategy.distribute_datasets_from_function
instead, which does not do any automatic batching or sharding for you.
For a tutorial on more usage and properties of this method, refer to the tutorial on distributed input. If you are interested in last partial batch handling, read this section.
Args | |
---|---|
dataset
|
tf.data.Dataset that will be sharded across all replicas using
the rules stated above.
|
options
|
tf.distribute.InputOptions used to control options on how this
dataset is distributed.
|
Returns | |
---|---|
A tf.distribute.DistributedDataset .
|
experimental_distribute_values_from_function
experimental_distribute_values_from_function(
value_fn
)