tf.train.Coordinator

A coordinator for threads.

This class implements a simple mechanism to coordinate the termination of a set of threads.

Usage:

# Create a coordinator.
coord = Coordinator()
# Start a number of threads, passing the coordinator to each of them.
...start thread 1...(coord, ...)
...start thread N...(coord, ...)
# Wait for all the threads to terminate.
coord.join(threads)

Any of the threads can call coord.request_stop() to ask for all the threads to stop. To cooperate with the requests, each thread must check for coord.should_stop() on a regular basis. coord.should_stop() returns True as soon as coord.request_stop() has been called.

A typical thread running with a coordinator will do something like:

while not coord.should_stop():
  ...do some work...

Exception handling:

A thread can report an exception to the coordinator as part of the request_stop() call. The exception will be re-raised from the coord.join() call.

Thread code:

try:
  while not coord.should_stop():
    ...do some work...
except Exception as e:
  coord.request_stop(e)

Main code:

try:
  ...
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate.
  coord.join(threads)
except Exception as e:
  ...exception that was passed to coord.request_stop()

To simplify the thread implementation, the Coordinator provides a context handler stop_on_exception() that automatically requests a stop if an exception is raised. Using the context handler the thread code above can be written as:

with coord.stop_on_exception():
  while not coord.should_stop():
    ...do some work...

Grace period for stopping:

After a thread has called coord.request_stop() the other threads have a fixed time to stop, this is called the 'stop grace period' and defaults to 2 minutes. If any of the threads is still alive after the grace period expires coord.join() raises a RuntimeError reporting the laggards.

try:
  ...
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate, give them 10s grace period
  coord.join(threads, stop_grace_period_secs=10)
except RuntimeError:
  ...one of the threads took more than 10s to stop after request_stop()
  ...was called.
except Exception:
  ...exception that was passed to coord.request_stop()