|TensorFlow 1 version||View source on GitHub|
A state & compute distribution policy on a list of devices.
tf.distribute.Strategy( extended )
- To use it with Keras
fit, please read.
- You may pass descendant of
tf.estimator.RunConfigto specify how a
tf.estimator.Estimatorshould distribute its computation. See guide.
- Otherwise, use
tf.distribute.Strategy.scopeto specify that a strategy should be used when building an executing your model. (This puts you in the "cross-replica context" for this strategy, which means the strategy is put in control of things like variable placement.)
If you are writing a custom training loop, you will need to call a few more methods, see the guide:
Start by either creating a
tf.data.Datasetnormally or using
tf.distribute.experimental_make_numpy_datasetto make a dataset out of a
tf.distribute.Strategy.experimental_distribute_datasetto convert a
tf.data.Datasetto something that produces "per-replica" values. If you want to manually specify how the dataset should be partitioned across replicas, use
tf.distribute.Strategy.runto run a function once per replica, taking values that may be "per-replica" (e.g. from a
tf.distribute.DistributedDatasetobject) and returning "per-replica" values. This function is executed in "replica context", which means each operation is performed separately on each replica.
Finally use a method (such as
tf.distribute.Strategy.reduce) to convert the resulting "per-replica" values into ordinary
A custom training loop can be as simple as:
with my_strategy.scope(): @tf.function def distribute_train_epoch(dataset): def replica_fn(input): # process input and return result return result total_result = 0 for x in dataset: per_replica_result = my_strategy.run(replica_fn, args=(x,)) total_result += my_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_result, axis=None) return total_result dist_dataset = my_strategy.experimental_distribute_dataset(dataset) for _ in range(EPOCHS): train_result = distribute_train_epoch(dist_dataset)
This takes an ordinary
replica_fn and runs it
distributed using a particular
my_strategy above. Any variables created in
replica_fn are created
my_strategy's policy, and library functions called by
replica_fn can use the
get_replica_context() API to implement
You can use the
reduce API to aggregate results across replicas and use
this as a return value from one iteration over a
you can use
tf.keras.metrics (such as loss, accuracy, etc.) to
accumulate metrics across steps in a given epoch.
See the custom training loop tutorial for a more detailed example.
Returns the cluster resolver associated with this strategy.
In general, when using a multi-worker
Strategies that intend to have an associated
Single-worker strategies usually do not have a
For more information, please see
||Returns number of replicas over which gradients are aggregated.|
experimental_assign_to_logical_device( tensor, logical_device_id )
Adds annotation that
tensor will be assigned to a logical device.
# Initializing TPU system with 2 logical devices and 4 replicas. resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='') tf.config.experimental_connect_to_cluster(resolver) topology = tf.tpu.experimental.initialize_tpu_system(resolver) device_assignment = tf.tpu.experimental.DeviceAssignment.build( topology, computation_shape=[1, 1, 1, 2], num_replicas=4) strategy = tf.distribute.TPUStrategy( resolver, experimental_device_assignment=device_assignment) iterator = iter(inputs) @tf.function() def step_fn(inputs): output = tf.add(inputs, inputs) # Add operation will be executed on logical device 0. output = strategy.experimental_assign_to_logical_device(output, 0) return output strategy.run(step_fn, args=(next(iterator),))
||Input tensor to annotate.|
||Id of the logical core to which the tensor will be assigned.|
||The logical device id presented is not consistent with total number of partitions specified by the device assignment.|
Annotated tensor with idential value as
experimental_distribute_dataset( dataset, options=None )
The following is an example:
strategy = tf.distribute.MirroredStrategy() # Create a dataset dataset = dataset_ops.Dataset.TFRecordDataset([ "/a/1.tfr", "/a/2.tfr", "/a/3.tfr", "/a/4.tfr"]) # Distribute that dataset dist_dataset = strategy.experimental_distribute_dataset(dataset) # Iterate over the `tf.distribute.DistributedDataset` for x in dist_dataset: # process dataset elements strategy.run(replica_fn, args=(x,))
In the code snippet above, the
dist_dataset is batched by
GLOBAL_BATCH_SIZE, and we iterate through it
for x in dist_dataset.
containing data for all replicas, which aggregates to a batch of
tf.distribute.Strategy.run will take care of feeding
the right per-replica data in
x to the right
replica_fn executed on each
What's under the hood of this method, when we say the
dataset - gets distributed? It depends on how you set the
tf.data.experimental.DistributeOptions. By default, it is set to
tf.data.experimental.AutoShardPolicy.AUTO. In a multi-worker setting, we
will first attempt to distribute
dataset by detecting whether
being created out of reader datasets (e.g.
tf.data.TextLineDataset, etc.) and if so, try to shard the input files.
Note that there has to be at least one input file per worker. If you have
less than one input file per worker, we suggest that you disable dataset
sharding across workers, by setting the
tf.data.experimental.DistributeOptions.auto_shard_policy to be
If the attempt to shard by file is unsuccessful (i.e. the dataset is not
read from files), we will shard the dataset evenly at the end by
.shard operation to the end of the processing pipeline. This
will cause the entire preprocessing pipeline for all the data to be run on
every worker, and each worker will do redundant work. We will print a
warning if this route is selected.
As mentioned before, within each worker, we will also split the data among all the worker devices (if more than one a present). This will happen even if