|TensorFlow 1 version||View source on GitHub|
Synchronous training on TPUs and TPU Pods.
tf.distribute.experimental.TPUStrategy( tpu_cluster_resolver=None, device_assignment=None )
To construct a TPUStrategy object, you need to run the initialization code as below:
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
strategy = tf.distribute.experimental.TPUStrategy(resolver)
While using distribution strategies, the variables created within the strategy's scope will be replicated across all the replicas and can be kept in sync using all-reduce algorithms.
To run TF2 programs on TPUs, you can either use
.fit APIs in
tf.keras with TPUStrategy, or write your own customized
training loop by calling
strategy.run directly. Note that
TPUStrategy doesn't support pure eager execution, so please make sure the
function passed into
strategy.run is a
strategy.run is called inside a
tf.function if eager
behavior is enabled.
||A tf.distribute.cluster_resolver.TPUClusterResolver, which provides information about the TPU cluster.|
Returns the cluster resolver associated with this strategy.
||Returns number of replicas over which gradients are aggregated.|
experimental_assign_to_logical_device( tensor, logical_device_id )
Adds annotation that
tensor will be assigned to a logical device.
# Initializing TPU system with 2 logical devices and 4 replicas. resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='') tf.config.experimental_connect_to_cluster(resolver) topology = tf.tpu.experimental.initialize_tpu_system(resolver) device_assignment = tf.tpu.experimental.DeviceAssignment.build( topology, computation_shape=[1, 1, 1, 2], num_replicas=4) strategy = tf.distribute.TPUStrategy( resolver, experimental_device_assignment=device_assignment) iterator = iter(inputs) @tf.function() def step_fn(inputs): output = tf.add(inputs, inputs) # Add operation will be executed on logical device 0. output = strategy.experimental_assign_to_logical_device(output, 0) return output strategy.run(step_fn, args=(next(iterator),))
||Input tensor to annotate.|
||Id of the logical core to which the tensor will be assigned.|
||The logical device id presented is not consistent with total number of partitions specified by the device assignment.|
Annotated tensor with idential value as
experimental_distribute_dataset( dataset, options=None )
The following is an example:
strategy = tf.distribute.MirroredStrategy() # Create a dataset dataset = dataset_ops.Dataset.TFRecordDataset([ "/a/1.tfr", "/a/2.tfr", "/a/3.tfr", "/a/4.tfr"]) # Distribute that dataset dist_dataset = strategy.experimental_distribute_dataset(dataset) # Iterate over the `tf.distribute.DistributedDataset` for x in dist_dataset: # process dataset elements strategy.run(replica_fn, args=(x,))
In the code snippet above, the
dist_dataset is batched by
GLOBAL_BATCH_SIZE, and we iterate through it
for x in dist_dataset.
containing data for all replicas, which aggregates to a batch of
tf.distribute.Strategy.run will take care of feeding
the right per-replica data in
x to the right
replica_fn executed on each
What's under the hood of this method, when we say the
dataset - gets distributed? It depends on how you set the
tf.data.experimental.DistributeOptions. By default, it is set to
tf.data.experimental.AutoShardPolicy.AUTO. In a multi-worker setting, we
will first attempt to distribute
dataset by detecting whether
being created out of reader datasets (e.g.
tf.data.TextLineDataset, etc.) and if so, try to shard the input files.
Note that there has to be at least one input file per worker. If you have
less than one input file per worker, we suggest that you disable dataset
sharding across workers, by setting the
tf.data.experimental.DistributeOptions.auto_shard_policy to be
If the attempt to shard by file is unsuccessful (i.e. the dataset is not
read from files), we will shard the dataset evenly at the end by
.shard operation to the end of the processing pipeline. This
will cause the entire preprocessing pipeline for all the data to be run on
every worker, and each worker will do redundant work. We will print a
warning if this route is selected.
As mentioned before, within each worker, we will also split the data among all the worker devices (if more than one a present). This will happen even if multi-worker sharding is disabled.
If the above batch splitting and dataset sharding logic is undesirable,
instead, which does not do any automatic splitting or sharding.
You can also use the
element_spec property of the
tf.distribute.DistributedDataset instance returned by this API to query
tf.TypeSpec of the elements returned
by the iterator. This can be used to set the
strategy = tf.distribute.MirroredStrategy() # Create a dataset dataset = dataset_ops.Dataset.TFRecordDataset([ "/a/1.tfr", "/a/2.tfr", "/a/3.tfr", "/a/4.tfr"]) # Distribute that dataset dist_dataset = strategy.experimental_distribute_dataset(dataset) @tf.function(input_signature=[dist_dataset.element_spec]) def train_step(inputs): # train model with inputs return # Iterate over the `tf.distribute.DistributedDataset` for x in dist_dataset: # process dataset elements strategy.run(train_step, args=(x,))