tf.contrib.distribute.ParameterServerStrategy

Class ParameterServerStrategy

Inherits From: DistributionStrategy

Defined in tensorflow/contrib/distribute/python/parameter_server_strategy.py.

A parameter server DistributionStrategy.

This strategy class works for both local training and between-graph replicated training for multiple workers. If cluster_spec is specified, either passed in to init() method or parsed from the "TF_CONFIG" environment variable, variables and updates to those variables are assigned to parameter servers and other operations are assigned to workers. If cluster_spec is not set, it becomes local training where variables are assigned to local CPU or the only GPU. When each worker has more than one GPU, operations will be replicated on these GPUs. In both cases, operations are replicated but variables are not and these workers share a common view for which paramater server a variable is assigned to.

This class assumes between-graph replication will be used and works on a graph for a particular worker. Note that each graph and worker is independent. This means that while each worker will synchronously compute a single gradient update across all GPUs, updates between workers proceed asynchronously. Operations that occur only on the first tower (such as incrementing the global step), will occur on the first tower of every worker.

It is expected to call call_for_each_tower(fn, *args, **kwargs) for any operations which potentially can be replicated across towers (i.e. multiple GPUs) even if there is only CPU or one GPU. When defining the fn, extra caution needs to be taken:

1) Always use tf.get_variable instead of tf.Variable which is not able to refer to the same variable on different towers.

2) It is generally not recommended to open a device scope under the strategy's scope. A device scope (i.e. calling tf.device) will be merged with or override the device for operations but will not change the device for variables.

3) It is also not recommended to open a colocation scope (i.e. calling tf.colocate_with) under the strategy's scope. For colocating variables, use distribution.colocate_vars_with instead. Colocation of ops will possibly create conflicts of device assignment.

__init__

__init__(num_gpus_per_worker=0)

Initializes this strategy.

Args:

  • num_gpus_per_worker: number of local GPUs or GPUs per worker, the default is 0 meaning CPU only.

Raises:

  • ValueError: if cluster_spec is given but task_type or task_id is not.

Properties

between_graph

is_single_tower

Returns whether there is a single tower or multiple.

Returns:

A boolean. If True, call_for_each_tower(fn) will only call fn once. If False, call_for_each_tower(fn) may call fn multiple times.

num_towers

parameter_devices

should_checkpoint

should_init

should_save_summary

worker_device_index

An object mapping worker device to an id.

This might be passed as an argument to call_for_each_tower(), as in:

with distribution_strategy.scope():

  def fn(device_id):
    # device_id is an integer. `fn` is being executed on device:
    #    distribution_strategy.worker_devices[device_id].

  distribution_strategy.call_for_each_tower(
      fn, distribution_strategy.worker_device_index)

Returns:

An index object, or the integer 0 if there is only a single tower.

worker_devices

Methods

batch_reduce

batch_reduce(
    aggregation,
    value_destination_pairs
)

Combine multiple reduce calls into one for faster execution.

Args:

Returns:

A list of mirrored values, one per pair in value_destination_pairs.

broadcast

broadcast(
    tensor,
    destinations=None
)

Mirror a tensor on one device to all worker devices.

Args:

  • tensor: A Tensor value to broadcast.
  • destinations: An optional mirrored variable, device string, or list of device strings, specifying the destination devices to copy tensor to. Defaults to self.worker_devices.

Returns:

A value mirrored to destinations devices.

call_for_each_tower

call_for_each_tower(
    fn,
    *args,
    **kwargs
)

Run fn once per tower.

fn may call tf.get_tower_context() to access methods such as tower_id() and merge_call().

merge_call() is used to communicate between the towers and re-enter the cross-tower context. All towers pause their execution having encountered a merge_call() call. After that the merge_fn-function is executed. Its results are then unwrapped and given back to each tower call. After that execution resumes until fn is complete or encounters another merge_call(). Example:

# Called once in "cross-tower" context.
def merge_fn(distribution, three_plus_tower_id):
  # sum the values across towers
  return sum(distribution.unwrap(three_plus_tower_id))

# Called once per tower in `distribution`, in a "tower" context.
def fn(three):
  tower_ctx = tf.get_tower_context()
  v = three + tower_ctx.tower_id
  # Computes the sum of the `v` values across all towers.
  s = tower_ctx.merge_call(merge_fn, v)
  return s + v

with distribution.scope():
  # in "cross-tower" context
  ...
  merged_results = distribution.call_for_each_tower(fn, 3)
  # merged_results has the values from every tower execution of `fn`.
  print(distribution.unwrap(merged_results))  # Prints a list

Args:

  • fn: function to run (will be run once per tower).
  • *args: positional arguments for fn
  • **kwargs: keyword arguments for fn. "run_concurrently": Boolean indicating whether executions of fn can be run concurrently (under eager execution only), defaults to True.

Returns:

Merged return value of fn across all towers.

colocate_vars_with

colocate_vars_with(colocate_with_variable)

Scope that controls which devices variables will be created on.

No operations should be added to the graph inside this scope, it should only be used when creating variables (some implementations work by changing variable creation, others work by using a tf.colocate_with() scope).

This may only be used inside self.scope().

Example usage:

with distribution_strategy.scope():
  var1 = tf.get_variable(...)
  with distribution_strategy.colocate_vars_with(v1):
    # var2 and var3 will be created on the same device(s) as var1
    var2 = tf.get_variable(...)
    var3 = tf.get_variable(...)

  def fn(v1, v2, v3):
    # operates on v1 from var1, v2 from var2, and v3 from var3

  # `fn` runs on every device `v1` is on, `v2` and `v3` will be there too.
  distribution_strategy.update(v1, fn, v2, v3)

Args:

  • colocate_with_variable: A created in self.scope(). Variables created while in the returned context manager will be on the same set of devices as colocate_with_variable.

Returns:

A context manager.

configure

configure(
    session_config=None,
    cluster_spec=None,
    task_type=None,
    task_id=None
)

Configures the strategy class.

The strategy object will be re-initialized if cluster_spec is given but was not passed in the constructor.

Args:

  • session_config: not used currently.
  • cluster_spec: a dict, ClusterDef or ClusterSpec object specifying the cluster configurations.
  • task_type: the current task type.
  • task_id: the current task id.

Raises:

  • ValueError: if cluster_spec is given but task_type or task_id is not.

distribute_dataset

distribute_dataset(dataset_fn)

Distributes the dataset to each local GPU.

finalize

finalize()

Any final actions to be done at the end of all computations.

In eager mode, it executes any finalize actions as a side effect. In graph mode, it creates the finalize ops and returns them.

For example, TPU shutdown ops.

Returns:

In eager mode, returns None. In graph mode, a list of ops to execute. Empty list if nothing to be done.

group

group(
    value,
    name=None
)

Shortcut for tf.group(distribution.unwrap(value)).

initialize

initialize()

Any initialization to be done before running any computations.

In eager mode, it executes any initialization as a side effect. In graph mode, it creates the initialization ops and returns them.

For example, TPU initialize_system ops.

Returns:

In eager mode, returns None. In graph mode, a list of ops to execute. Empty list if nothing to be done.

non_slot_devices

non_slot_devices(var_list)

read_var

read_var(var)

reduce

reduce(
    aggregation,
    value,
    destinations
)

Combine (via e.g. sum or mean) values across towers.

Args:

  • aggregation: Indicates how a variable will be aggregated. Accepted values are tf.VariableAggregation.SUM, tf.VariableAggregation.MEAN, tf.VariableAggregation.ONLY_FIRST_TOWER.
  • value: A per-device value with one value per tower.
  • destinations: A mirrored variable, a per-device tensor, a device string, or list of device strings. The return value will be copied to all destination devices (or all the devices where the destinations value resides). To perform an all-reduction, pass value to destinations.

Returns:

A value mirrored to destinations.

run_steps_on_dataset

run_steps_on_dataset(
    fn,
    iterator,
    iterations=1,
    initial_loop_values=None
)

Run fn with input from iterator for iterations times.

This method can be used to run a step function for training a number of times using input from a dataset.

Args:

  • fn: function to run using this distribution strategy. The function must have the following signature: def fn(context, *inputs). context is an instance of MultiStepContext that will be passed when fn is run. context can be used to specify the outputs to be returned from fn by calling context.set_last_step_output. It can also be used to capture non tensor outputs by context.set_non_tensor_output. See MultiStepContext documentation for more information. inputs will have same type/structure as iterator.get_next(). If the iterator.get_next() returns a tuple say return x, y then whose will be unpacked and passed to the step_fn; and step_fn signature would look like def step_fn(context, x, y). If the iterator returns a single value say return x then the value is passed as is; the step_fn signature would look like def step_fn(context, x). Typically, fn will use call_for_each_tower method of the strategy to distribute the computation over multiple towers.
  • iterator: Iterator of a dataset that represents the input for fn. The caller is responsible for initializing the iterator as needed.
  • iterations: (Optional) Number of iterations that fn should be run. Defaults to 1.
  • initial_loop_values: (Optional) Initial values to be passed into the loop that runs fn. Defaults to None. # TODO(priyag): Remove initial_loop_values argument when we have a mechanism to infer the outputs of fn.

Returns:

Returns the MultiStepContext object which has the following properties, among other things: - run_op: An op that runs fn iterations times. - last_step_outputs: A dictionary containing tensors set using context.set_last_step_output. Evaluating this returns the value of the tensors after the last iteration. - non_tensor_outputs: A dictionatry containing anything that was set by fn by calling context.set_non_tensor_output.

scope

scope()

Returns a context manager selecting this DistributionStrategy as current.

Inside a with distribution_strategy.scope(): code block, this thread will use a variable creator set by distribution_strategy, and will enter its "cross-tower context".

Returns:

A context manager.

unwrap

unwrap(value)

Returns the list of all per-device values contained in value.

Args:

  • value: A value returned by call_for_each_tower() or a variable created in scope().

Returns:

A list of values contained in value. If value represents a single value, this returns [value].

update

update(
    var,
    fn,
    *args,
    **kwargs
)

Run fn to update var using inputs mirrored to the same devices.

If var is mirrored across multiple devices, then this implements logic like:

results = {}
for device, v in var:
  with tf.device(device):
    # *args and **kwargs will be unwrapped if they are mirrored.
    results[device] = fn(v, *args, **kwargs)
return merged(results)

Otherwise this returns fn(var, *args, **kwargs) colocated with var.'

Neither *args nor **kwargs may contain per-device values. If they contain mirrored values, they will be unwrapped before calling fn.

Args:

  • var: Variable, possibly mirrored to multiple devices, to operate on.
  • fn: Function to call. Should take the variable as the first argument.
  • *args: Additional positional arguments to pass to fn().
  • **kwargs: Keyword arguments to pass to fn().

Returns:

Merged return value of fn across all towers.

update_non_slot

update_non_slot(
    colocate_with,
    fn,
    *args,
    **kwargs
)

Runs fn(*args, **kwargs) on colocate_with devices.

Args:

  • colocate_with: The return value of non_slot_devices().
  • fn: Function to execute.
  • *args: Positional arguments to pass to fn().
  • **kwargs: Keyword arguments to pass to fn().

Returns:

Return value of fn, possibly merged across devices.

value_container

value_container(val)