Coordinator and QueueRunner

See Threading and Queues for how to use threads and queues. For documentation on the Queue API, see Queues.

class tf.train.Coordinator

A coordinator for threads.

This class implements a simple mechanism to coordinate the termination of a set of threads.

Usage:

# Create a coordinator.
coord = Coordinator()
# Start a number of threads, passing the coordinator to each of them.
...start thread 1...(coord, ...)
...start thread N...(coord, ...)
# Wait for all the threads to terminate.
coord.join(threads)

Any of the threads can call coord.request_stop() to ask for all the threads to stop. To cooperate with the requests, each thread must check for coord.should_stop() on a regular basis. coord.should_stop() returns True as soon as coord.request_stop() has been called.

A typical thread running with a coordinator will do something like:

while not coord.should_stop():
  ...do some work...

Exception handling:

A thread can report an exception to the coordinator as part of the should_stop() call. The exception will be re-raised from the coord.join() call.

Thread code:

try:
  while not coord.should_stop():
    ...do some work...
except Exception as e:
  coord.request_stop(e)

Main code:

try:
  ...
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate.
  coord.join(threads)
except Exception as e:
  ...exception that was passed to coord.request_stop()

To simplify the thread implementation, the Coordinator provides a context handler stop_on_exception() that automatically requests a stop if an exception is raised. Using the context handler the thread code above can be written as:

with coord.stop_on_exception():
  while not coord.should_stop():
    ...do some work...

Grace period for stopping:

After a thread has called coord.request_stop() the other threads have a fixed time to stop, this is called the 'stop grace period' and defaults to 2 minutes. If any of the threads is still alive after the grace period expires coord.join() raises a RuntimeException reporting the laggards.

try:
  ...
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate, give them 10s grace period
  coord.join(threads, stop_grace_period_secs=10)
except RuntimeException:
  ...one of the threads took more than 10s to stop after request_stop()
  ...was called.
except Exception:
  ...exception that was passed to coord.request_stop()

tf.train.Coordinator.__init__(clean_stop_exception_types=None) {:#Coordinator.init}

Create a new Coordinator.

Args:
  • clean_stop_exception_types: Optional tuple of Exception types that should cause a clean stop of the coordinator. If an exception of one of these types is reported to request_stop(ex) the coordinator will behave as if request_stop(None) was called. Defaults to (tf.errors.OutOfRangeError,) which is used by input queues to signal the end of input. When feeding training data from a Python iterator it is common to add StopIteration to this list.

tf.train.Coordinator.clear_stop()

Clears the stop flag.

After this is called, calls to should_stop() will return False.


tf.train.Coordinator.join(threads=None, stop_grace_period_secs=120)

Wait for threads to terminate.

This call blocks until a set of threads have terminated. The set of thread is the union of the threads passed in the threads argument and the list of threads that registered with the coordinator by calling Coordinator.register_thread().

After the threads stop, if an exc_info was passed to request_stop, that exception is re-raised.

Grace period handling: When request_stop() is called, threads are given 'stop_grace_period_secs' seconds to terminate. If any of them is still alive after that period expires, a RuntimeError is raised. Note that if an exc_info was passed to request_stop() then it is raised instead of that RuntimeError.

Args:
  • threads: List of threading.Threads. The started threads to join in addition to the registered threads.
  • stop_grace_period_secs: Number of seconds given to threads to stop after request_stop() has been called.
Raises:
  • RuntimeError: If any thread is still alive after request_stop() is called and the grace period expires.

tf.train.Coordinator.joined


tf.train.Coordinator.register_thread(thread)

Register a thread to join.

Args:
  • thread: A Python thread to join.

tf.train.Coordinator.request_stop(ex=None)

Request that the threads stop.

After this is called, calls to should_stop() will return True.

Args:
  • ex: Optional Exception, or Python exc_info tuple as returned by sys.exc_info(). If this is the first call to request_stop() the corresponding exception is recorded and re-raised from join().

tf.train.Coordinator.should_stop()

Check if stop was requested.

Returns:

True if a stop was requested.


tf.train.Coordinator.stop_on_exception()

Context manager to request stop when an Exception is raised.

Code that uses a coordinator must catch exceptions and pass them to the request_stop() method to stop the other threads managed by the coordinator.

This context handler simplifies the exception handling. Use it as follows:

with coord.stop_on_exception():
  # Any exception raised in the body of the with
  # clause is reported to the coordinator before terminating
  # the execution of the body.
  ...body...

This is completely equivalent to the slightly longer code:

try:
  ...body...
exception Exception as ex:
  coord.request_stop(ex)
Yields:

nothing.


tf.train.Coordinator.wait_for_stop(timeout=None)

Wait till the Coordinator is told to stop.

Args:
  • timeout: Float. Sleep for up to that many seconds waiting for should_stop() to become True.
Returns:

True if the Coordinator is told stop, False if the timeout expired.


class tf.train.QueueRunner

Holds a list of enqueue operations for a queue, each to be run in a thread.

Queues are a convenient TensorFlow mechanism to compute tensors asynchronously using multiple threads. For example in the canonical 'Input Reader' setup one set of threads generates filenames in a queue; a second set of threads read records from the files, processes them, and enqueues tensors on a second queue; a third set of threads dequeues these input records to construct batches and runs them through training operations.

There are several delicate issues when running multiple threads that way: closing the queues in sequence as the input is exhausted, correctly catching and reporting exceptions, etc.

The QueueRunner, combined with the Coordinator, helps handle these issues.


tf.train.QueueRunner.__init__(queue=None, enqueue_ops=None, close_op=None, cancel_op=None, queue_closed_exception_types=None, queue_runner_def=None) {:#QueueRunner.init}

Create a QueueRunner.

On construction the QueueRunner adds an op to close the queue. That op will be run if the enqueue ops raise exceptions.

When you later call the create_threads() method, the QueueRunner will create one thread for each op in enqueue_ops. Each thread will run its enqueue op in parallel with the other threads. The enqueue ops do not have to all be the same op, but it is expected that they all enqueue tensors in queue.

Args:
  • queue: A Queue.
  • enqueue_ops: List of enqueue ops to run in threads later.
  • close_op: Op to close the queue. Pending enqueue ops are preserved.
  • cancel_op: Op to close the queue and cancel pending enqueue ops.
  • queue_closed_exception_types: Optional tuple of Exception types that indicate that the queue has been closed when raised during an enqueue operation. Defaults to (tf.errors.OutOfRangeError,). Another common case includes (tf.errors.OutOfRangeError, tf.errors.CancelledError), when some of the enqueue ops may dequeue from other Queues.
  • queue_runner_def: Optional QueueRunnerDef protocol buffer. If specified, recreates the QueueRunner from its contents. queue_runner_def and the other arguments are mutually exclusive.
Raises:
  • ValueError: If both queue_runner_def and queue are both specified.
  • ValueError: If queue or enqueue_ops are not provided when not restoring from queue_runner_def.

tf.train.QueueRunner.cancel_op


tf.train.QueueRunner.close_op


tf.train.QueueRunner.create_threads(sess, coord=None, daemon=False, start=False)

Create threads to run the enqueue ops.

This method requires a session in which the graph was launched. It creates a list of threads, optionally starting them. There is one thread for each op passed in enqueue_ops.

The coord argument is an optional coordinator, that the threads will use to terminate together and report exceptions. If a coordinator is given, this method starts an additional thread to close the queue when the coordinator requests a stop.

This method may be called again as long as all threads from a previous call have stopped.

Args:
  • sess: A Session.
  • coord: Optional Coordinator object for reporting errors and checking stop conditions.
  • daemon: Boolean. If True make the threads daemon threads.
  • start: Boolean. If True starts the threads. If False the caller must call the start() method of the returned threads.
Returns:

A list of threads.

Raises:
  • RuntimeError: If threads from a previous call to create_threads() are still running.

tf.train.QueueRunner.enqueue_ops


tf.train.QueueRunner.exceptions_raised

Exceptions raised but not handled by the QueueRunner threads.

Exceptions raised in queue runner threads are handled in one of two ways depending on whether or not a Coordinator was passed to create_threads():

  • With a Coordinator, exceptions are reported to the coordinator and forgotten by the QueueRunner.
  • Without a Coordinator, exceptions are captured by the QueueRunner and made available in this exceptions_raised property.
Returns:

A list of Python Exception objects. The list is empty if no exception was captured. (No exceptions are captured when using a Coordinator.)


tf.train.QueueRunner.from_proto(queue_runner_def)

Returns a QueueRunner object created from queue_runner_def.


tf.train.QueueRunner.name

The string name of the underlying Queue.


tf.train.QueueRunner.queue


tf.train.QueueRunner.queue_closed_exception_types


tf.train.QueueRunner.to_proto()

Converts this QueueRunner to a QueueRunnerDef protocol buffer.

Returns:

A QueueRunnerDef protocol buffer.


tf.train.add_queue_runner(qr, collection='queue_runners')

Adds a QueueRunner to a collection in the graph.

When building a complex model that uses many queues it is often difficult to gather all the queue runners that need to be run. This convenience function allows you to add a queue runner to a well known collection in the graph.

The companion method start_queue_runners() can be used to start threads for all the collected queue runners.

Args:
  • qr: A QueueRunner.
  • collection: A GraphKey specifying the graph collection to add the queue runner to. Defaults to GraphKeys.QUEUE_RUNNERS.

tf.train.start_queue_runners(sess=None, coord=None, daemon=True, start=True, collection='queue_runners')

Starts all queue runners collected in the graph.

This is a companion method to add_queue_runner(). It just starts threads for all queue runners collected in the graph. It returns the list of all threads.

Args:
  • sess: Session used to run the queue ops. Defaults to the default session.
  • coord: Optional Coordinator for coordinating the started threads.
  • daemon: Whether the threads should be marked as daemons, meaning they don't block program exit.
  • start: Set to False to only create the threads, not start them.
  • collection: A GraphKey specifying the graph collection to get the queue runners from. Defaults to GraphKeys.QUEUE_RUNNERS.
Returns:

A list of threads.