Module: tf.data.experimental.service

API for using the tf.data service.

This module contains:

  1. tf.data server implementations for running the tf.data service.
  2. APIs for registering datasets with the tf.data service and reading from the registered datasets.

The tf.data service provides the following benefits:

  • Horizontal scaling of tf.data input pipeline processing to solve input bottlenecks.
  • Data coordination for distributed training. Coordinated reads enable all replicas to train on similar-length examples across each global training step, improving step times in synchronous training.
  • Dynamic balancing of data across training replicas.
dispatcher = tf.data.experimental.service.DispatchServer()
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
    tf.data.experimental.service.WorkerConfig(
        dispatcher_address=dispatcher_address))
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Setup

This section goes over how to set up the tf.data service.

Run tf.data servers

The tf.data service consists of one dispatch server and n worker servers. tf.data servers should be brought up alongside your training jobs, then brought down when the jobs are finished. Use tf.data.experimental.service.DispatchServer to start a dispatch server, and tf.data.experimental.service.WorkerServer to start worker servers. Servers can be run in the same process for testing purposes, or scaled up on separate machines.

See https://github.com/tensorflow/ecosystem/tree/master/data_service for an example of using Google Kubernetes Engine (GKE) to manage the tf.data service. Note that the server implementation in tf_std_data_server.py is not GKE-specific, and can be used to run the tf.data service in other contexts.

Custom ops

If your dataset uses custom ops, these ops need to be made available to tf.data servers by calling load_op_library from the dispatcher and worker processes at startup.

Usage

Users interact with tf.data service by programmatically registering their datasets with tf.data service, then creating datasets that read from the registered datasets. The register_dataset function registers a dataset, then the from_dataset_id function creates a new dataset which reads from the registered dataset. The distribute function wraps register_dataset and from_dataset_id into a single convenient transformation which registers its input dataset and then reads from it. distribute enables tf.data service to be used with a one-line code change. However, it assumes that the dataset is created and consumed by the same entity and this assumption might not always be valid or desirable. In particular, in certain scenarios, such as distributed training, it might be desirable to decouple the creation and consumption of the dataset (via register_dataset and from_dataset_id respectively) to avoid having to create the dataset on each of the training workers.

Example

distribute

To use the distribute transformation, apply the transformation after the prefix of your input pipeline that you would like to be executed using tf.data service (typically at the end).

dataset = ...  # Define your dataset here.
# Move dataset processing from the local machine to the tf.data service
dataset = dataset.apply(
    tf.data.experimental.service.distribute(
        processing_mode="parallel_epochs",
        service=FLAGS.tf_data_service_address,
        job_name="shared_job"))
# Any transformations added after `distribute` will be run on the local machine.
dataset = dataset.prefetch(1)

The above code will create a tf.data service "job", which iterates through the dataset to generate data. To share the data from a job across multiple clients (e.g. when using TPUStrategy or MultiWorkerMirroredStrategy), set a common job_name across all clients.

register_dataset and from_dataset_id

register_dataset registers a dataset with the tf.data service, returning a dataset id for the registered dataset. from_dataset_id creates a dataset that reads from the registered dataset. These APIs can be used to reduce dataset building time for distributed training. Instead of building the dataset on all training workers, we can build the dataset just once and then register the dataset using register_dataset. Then all workers can call from_dataset_id without needing to build the dataset themselves.

dataset = ...  # Define your dataset here.
dataset_id = tf.data.experimental.service.register_dataset(
    service=FLAGS.tf_data_service_address,
    dataset=dataset)
# Use `from_dataset_id` to create per-worker datasets.
per_worker_datasets = {}
for worker in workers:
  per_worker_datasets[worker] = tf.data.experimental.service.from_dataset_id(
      processing_mode="parallel_epochs",
      service=FLAGS.tf_data_service_address,
      dataset_id=dataset_id,
      job_name="shared_job"))

Processing Modes

tf.data service supports two processing modes: "parallel_epochs" and "distributed_epoch". "parallel_epochs" is suitable for training which does not require visitation guarantees (i.e. clean separation of epoch boundaries), while "distributed_epoch" is suitable for training which require clean separation of epoch boundaries, where instead of processing multiple epochs of data in a parallel fashion the aim is to process a single epoch of data in a distributed fashion. "parallel_epochs" mode is in general more performant, because it requires less coordination. "parallel_epochs" mode also supports a wider range of input pipelines, not requiring splittability like "distributed_epoch" mode.

Parallel Epochs

In "parallel_epochs" mode, the entire input dataset will be processed independently by each of the tf.data service workers. For this reason, it is important to shuffle data (e.g. filenames) non-deterministically, so that each worker will process the elements of the dataset in a different order. "parallel_epochs" can be used to distribute datasets that aren't splittable.

Distributed Epoch