Save the date! Google I/O returns May 18-20 Register now

tf.data.experimental.service.DispatchServer

View source on GitHub

An in-process tf.data service dispatch server.

A tf.data.experimental.service.DispatchServer coordinates a cluster of tf.data.experimental.service.WorkerServers. When the workers start, they register themselves with the dispatcher.

dispatcher = tf.data.experimental.service.DispatchServer(port=0)
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
    port=0, dispatcher_address=dispatcher_address)
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

When starting a dedicated tf.data dispatch process, use join() to block indefinitely after starting up the server.

dispatcher = tf.data.experimental.service.DispatchServer(port=5050)
dispatcher.join()

port Specifies the port to bind to.
protocol (Optional.) Specifies the protocol to be used by the server. Acceptable values include "grpc", "grpc+local". Defaults to "grpc".
start (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True.

tf.errors.OpError Or one of its subclasses if an error occurs while creating the TensorFlow server.

target Returns a target that can be used to connect to the server.

dispatcher = tf.data.experimental.service.DispatchServer(port=0)
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))

The returned string will be in the form protocol://address, e.g. "grpc://localhost:5050".

Methods

join

View source

Blocks until the server has shut down.

This is useful when starting a dedicated dispatch process.

dispatcher = tf.data.experimental.service.DispatchServer(port=5050)
dispatcher.join()

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while joining the server.

start

View source

Starts this server.

dispatcher = tf.data.experimental.service.DispatchServer(port=0,
                                                         start=False)
dispatcher.start()

Raises
tf.errors.OpError Or one of its subclasses if an error occurs while starting the server.