|View source on GitHub|
An object to schedule and coordinate remote function execution.
tf.distribute.experimental.coordinator.ClusterCoordinator( strategy )
Used in the notebooks
|Used in the tutorials|
This class is used to create fault-tolerant resources and dispatch functions to remote TensorFlow servers.
Currently, this class is not supported to be used in a standalone manner. It
should be used in conjunction with a
tf.distribute strategy that is designed
to work with it. The
ClusterCoordinator class currently only works
The most important APIs provided by this class is the
schedule API is non-blocking in that it queues a
RemoteValue immediately. The queued functions will be dispatched
to remote workers in background threads and their
RemoteValues will be
filled asynchronously. Since
schedule doesn’t require worker assignment, the
tf.function passed in can be executed on any available worker. If the worker
it is executed on becomes unavailable before its completion, it will be
migrated to another worker. Because of this fact and function execution is not
atomic, a function may be executed more than once.
Handling Task Failure
This class when used with
tf.distribute.experimental.ParameterServerStrategy, comes with built-in
fault tolerance for worker failures. That is, when some workers are not
available for any reason to be reached from the coordinator, the training
progress continues to be made with the remaining workers. Upon recovery of a
failed worker, it will be added for function execution after datasets created
create_per_worker_dataset are re-built on it.
When a parameter server fails, a
tf.errors.UnavailableError is raised by
done. In this case, in addition to bringing back the
failed parameter server, users should restart the coordinator so that it
reconnects to workers and parameter servers, re-creates the variables, and
loads checkpoints. If the coordinator fails, after the user brings it back,
the program will automatically connect to workers and parameter servers, and
continue the progress from a checkpoint.
It is thus essential that in user's program, a checkpoint file is periodically
saved, and restored at the start of the program. If an
tf.keras.optimizers.Optimizer is checkpointed, after restoring from a
iterations property roughly indicates the number of steps
that have been made. This can be used to decide how many epochs and steps are
needed before the training completion.
tf.distribute.experimental.ParameterServerStrategy docstring for an
example usage of this API.
This is currently under development, and the API as well as implementation are subject to changes.
||if the strategy being used is not supported.|
create_per_worker_dataset( dataset_fn )
Create dataset on workers by calling
dataset_fn on worker devices.
This creates the given dataset generated by dataset_fn on workers
and returns an object that represents the collection of those individual
iter on such collection of datasets returns a
tf.distribute.experimental.coordinator.PerWorkerValues, which is a
collection of iterators, where the iterators have been placed on respective
next on a
PerWorkerValues of iterator is unsupported. The
iterator is meant to be passed as an argument into
the scheduled function is about to be executed by a worker, the
function will receive the individual iterator that corresponds to the
next method can be called on an iterator inside a
scheduled function when the iterator is an input of the function.
schedule method assumes workers are all the same and thus
assumes the datasets on different workers are the same, except they may be
shuffled differently if they contain a
dataset.shuffle operation and a
random seed is not set. Because of this, we also recommend the datasets to
be repeated indefinitely and schedule a finite number of steps instead of
relying on the
OutOfRangeError from a dataset.
strategy = tf.distribute.experimental.ParameterServerStrategy( cluster_resolver=...) coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator( strategy=strategy) @tf.function def worker_fn(iterator): return next(iterator) def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function( lambda x: tf.data.Dataset.from_tensor_slices( * 3)) per_worker_dataset = coordinator.create_per_worker_dataset( per_worker_dataset_fn) per_worker_iter = iter(per_worker_dataset) remote_value = coordinator.schedule(worker_fn, args=(per_worker_iter,)) assert remote_value.fetch() == 3