¡Confirme su asistencia a su evento local de TensorFlow Everywhere hoy!
Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Entrenamiento del servidor de parámetros

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Visión general

El entrenamiento del servidor de parámetros es un método común de datos paralelos para escalar el entrenamiento del modelo en varias máquinas. Un clúster de entrenamiento del servidor de parámetros consta de trabajadores y servidores de parámetros. Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. De forma predeterminada, los trabajadores leen y actualizan estas variables de forma independiente sin sincronizarse entre sí. Esta es la razón por la que a veces el entrenamiento de estilo de servidor de parámetros se denomina entrenamiento asincrónico.

El entrenamiento del servidor de parámetros de TensorFlow 2 usa un coordinador central a través de la clase tf.distribute.experimental.coordinator.ClusterCoordinator .

En esta implementación, las tareas del parameter server worker y de parameter server ejecutan tf.distribute.Server s que escuchan las solicitudes del coordinador. El coordinador crea recursos, distribuye tareas de capacitación, escribe puntos de control y se ocupa de las fallas de tareas.

Creemos que esta arquitectura y la nueva clase ClusterCoordinator proporcionan un modelo de programación más flexible y simple.

ClusterCoordinator

La clase ClusterCoordinator debe funcionar junto con un objeto tf.distribute.Strategy . Este objeto tf.distribute.Strategy es necesario para pasar la información del clúster y se utiliza para definir un paso de entrenamiento como hemos visto en el entrenamiento personalizado con MirroredStrategy . El objeto ClusterCoordinator luego envía la ejecución de estos pasos de capacitación a los trabajadores remotos. Actualmente, ClusterCoordinator solo funciona con tf.distribute.experimental.ParameterServerStrategy .

La API más importante proporcionada por el objeto ClusterCoordinator es la schedule . La API de schedule pone en cola una función tf.function y devuelve un valor RemoteValue similar al RemoteValue inmediato. Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y sus RemoteValue s se completarán de forma asincrónica. Dado que el schedule no requiere la asignación de un trabajador, la función tf.function pasada se puede ejecutar en cualquier trabajador disponible. Si el trabajador en el que se ejecuta deja de estar disponible antes de su finalización, la función se reintentará en otro trabajador disponible. Debido a este hecho y al hecho de que la ejecución de la función no es atómica, una función puede ejecutarse más de una vez.

Además de enviar funciones remotas, ClusterCoordinator también ayuda a crear conjuntos de datos en todos los trabajadores y reconstruir estos conjuntos de datos cuando un trabajador se recupera de un error.

Configuración del tutorial

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Configuración de clúster

Como se mencionó anteriormente, un clúster de entrenamiento del servidor de parámetros requiere una tarea de coordinador que ejecuta su programa de entrenamiento, uno o varios trabajadores y tareas del servidor de parámetros que ejecutan servidores TensorFlow, es decir, tf.distribute.Server , y posiblemente una tarea de evaluación adicional que ejecuta side-car evaluación (consulte la sección de evaluación del sidecar a continuación). Los requisitos para configurarlos son:

  • La tarea del coordinador debe conocer las direcciones y los puertos de todos los demás servidores de TensorFlow, excepto el evaluador.
  • Los trabajadores y los servidores de parámetros necesitan saber qué puerto necesitan escuchar. En aras de la simplicidad, generalmente pasamos la información completa del clúster cuando creamos servidores TensorFlow en estas tareas.
  • La tarea del evaluador no tiene que conocer la configuración del grupo de capacitación. Si es así, no debería intentar conectarse al clúster de entrenamiento.
  • Los trabajadores y los servidores de parámetros deben tener tipos de tareas como "trabajador" y "ps" respectivamente. El coordinador debe utilizar "jefe" como tipo de tarea por motivos heredados.

En este tutorial, crearemos un clúster en proceso para que todo el entrenamiento del servidor de parámetros se pueda ejecutar en colab. Introduciremos cómo configurar clústeres reales en una sección posterior.

Clúster en proceso

En este tutorial, iniciaremos varios servidores de TensorFlow con anticipación y nos conectaremos a ellos más tarde:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Entrenamiento con bucle de entrenamiento personalizado

Bucle de entrenamiento personalizado con tf.distribute.Strategy proporciona una gran flexibilidad para definir bucles de entrenamiento. Actualmente, para el entrenamiento del servidor de parámetros en TensorFlow 2, solo se admite el ciclo de entrenamiento personalizado. Aquí usamos ParameterServerStrategy para definir un paso de entrenamiento y luego usamos ClusterCoordinator para enviar la ejecución de pasos de entrenamiento a trabajadores remotos.

Cree la ParameterServerStrategy

Para escribir un paso de entrenamiento en un ciclo de entrenamiento personalizado, el primer paso es crear un ParameterServerStrategy . Explicaremos el variable_partitioner más adelante.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

Luego, creará un modelo, definirá un conjunto de datos y una función de paso como hemos visto en el ciclo de entrenamiento con otras tf.distribute.Strategy s. Puede encontrar más detalles en este tutorial . Creemos estos componentes en los siguientes pasos:

Configurar los datos

Primero, escriba una función que cree un conjunto de datos que incluya lógica de preprocesamiento implementada por capas de preprocesamiento de Keras. Crearemos estas capas fuera del dataset_fn pero aplicaremos la transformación dentro del dataset_fn ya que envolverás el dataset_fn en una función tf.function que no permite que se creen variables dentro de él.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Genere ejemplos de juguetes en un conjunto de datos:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Luego creamos el conjunto de datos de entrenamiento envuelto en un dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construye el modelo

En segundo lugar, creamos el modelo y otros objetos. Asegúrese de crear todas las variables en strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Definir el paso de entrenamiento

En tercer lugar, cree el paso de entrenamiento envuelto en una función tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

En la función de paso por encima, llamando strategy.run y strategy.reduce en el step_fn son útiles para apoyar las GPU o múltiples réplicas trabajador en el futuro, aunque tienen aplicación trivial en este momento.

Envío de pasos de formación a trabajadores remotos

Después de que ParameterServerStrategy defina todos los cálculos, usaremos la clase ClusterCoordinator para crear recursos y distribuir los pasos de capacitación a los trabajadores remotos.

Primero ClusterCoordinator un objeto ClusterCoordinator y ClusterCoordinator objeto de estrategia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Luego creamos un conjunto de datos por trabajador y un iterador. En el per_worker_dataset_fn continuación, envolver el dataset_fn en strategy.distribute_datasets_from_function es opcional, pero permitirá admitir la dataset_fn eficiente a GPU sin problemas en el futuro cuando las GPU sean compatibles con ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

El último paso es distribuir el cálculo a los trabajadores remotos mediante el schedule . El método de schedule pone en cola una función tf.function y devuelve inmediatamente un valor RemoteValue similar al RemoteValue . Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y RemoteValue se completará de forma asincrónica. El método de join se puede utilizar para esperar hasta que se ejecuten todas las funciones programadas.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Así es como puede obtener el resultado de un valor RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

Alternativamente, puede iniciar todos los pasos y hacer algo mientras espera su finalización:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para obtener el flujo de trabajo completo de capacitación y servicio para este ejemplo en particular, consulte esta prueba .

Más sobre la creación de conjuntos de datos

El conjunto de datos en el código anterior se crea utilizando la API create_per_worker_dataset . Crea un conjunto de datos por trabajador y devuelve un objeto contenedor. Puede llamar al método iter para crear un iterador por trabajador. El iterador por trabajador contiene un iterador por trabajador y el segmento correspondiente de un trabajador se sustituirá en el argumento de entrada de la función pasada al método de schedule antes de que la función se ejecute en un trabajador en particular.

Actualmente, el método de schedule asume que los trabajadores son equivalentes y, por lo tanto, asume que los conjuntos de datos de diferentes trabajadores son los mismos, excepto que pueden mezclarse de manera diferente si contienen una operación dataset.shuffle . Debido a esto, también recomendamos que los conjuntos de datos se repitan indefinidamente y programar un número finito de pasos en lugar de depender del OutOfRangeError de un conjunto de datos.

Otra nota importante es que los tf.data datos tf.data no admiten la serialización implícita y la deserialización a través de los límites de las tareas. Por eso es importante crear el conjunto de datos completo dentro de la función pasada a create_per_worker_dataset .

Fragmentación variable

La fragmentación de variables se refiere a dividir una variable en varias variables más pequeñas. A estas variables más pequeñas las llamamos fragmentos . La fragmentación variable puede resultar útil para distribuir la carga de la red al acceder a estas fragmentos. También es útil distribuir el cálculo y el almacenamiento de una variable normal entre varios servidores de parámetros.

Para habilitar la fragmentación de variables, puede pasar un variable_partitioner al construir un objeto ParameterServerStrategy . El variable_partitioner se invocará cada vez que se cree una variable y se espera que devuelva el número de fragmentos a lo largo de cada dimensión de la variable. Se proporcionan algunos tf.distribute.experimental.partitioners.FixedShardsPartitioner variable_partitioner tf.distribute.experimental.partitioners.FixedShardsPartitioner para tf.distribute.experimental.partitioners.FixedShardsPartitioner , como tf.distribute.experimental.partitioners.FixedShardsPartitioner .

En el ejemplo anterior, usamos FixedShardsPartitioner que dividirá todas las variables en dos fragmentos y cada fragmento se asignará a diferentes servidores de parámetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Cuando se pasa un variable_partitioner y si crea una variable directamente en strategy.scope() , se convertirá en un tipo de contenedor con una propiedad de variables que proporciona acceso a la lista de fragmentos. En la mayoría de los casos, este contenedor se convertirá automáticamente en un tensor al concatenar todos los fragmentos. Como resultado, se puede utilizar como una variable normal. Por otro lado, algunos métodos de TensorFlow como tf.nn.embedding_lookup proporcionan una implementación eficiente para este tipo de contenedor y en estos métodos se evitará la concatenación automática.

Consulte la cadena de documentación API de ParameterServerStrategy para obtener más detalles.

Evaluación

Hay más de una forma de definir y ejecutar un ciclo de evaluación en el entrenamiento distribuido. Cada uno tiene sus pros y sus contras, como se describe a continuación. Se recomienda el método de evaluación en línea si no tiene una preferencia.

Evaluación en línea

En este método, el coordinador alterna entre capacitación y evaluación y, por lo tanto, lo llamamos evaluación en línea. Hay varios beneficios de la evaluación en línea. Por ejemplo, puede admitir grandes modelos de evaluación y conjuntos de datos de evaluación que una sola tarea no puede contener. Por otro ejemplo, los resultados de la evaluación se pueden utilizar para tomar decisiones para entrenar en la próxima época.

Hay dos formas de implementar la evaluación en línea:

  • Evaluación directa : para modelos pequeños y conjuntos de datos de evaluación, el coordinador puede ejecutar la evaluación directamente en el modelo distribuido con el conjunto de datos de evaluación en el coordinador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • Evaluación distribuida : para modelos grandes o conjuntos de datos que no se pueden ejecutar directamente en el coordinador, la tarea del coordinador puede distribuir las tareas de evaluación a los trabajadores mediante los métodos de schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Evaluación de sidecar

Otro método se llama evaluación de sidecar, que consiste en crear una tarea de evaluador dedicada que lee repetidamente los puntos de control y ejecuta la evaluación en un punto de control más reciente. Permite que su programa de entrenamiento finalice antes si no necesita cambiar su ciclo de entrenamiento según los resultados de la evaluación. Sin embargo, requiere una tarea de evaluador adicional y puntos de control periódicos para activar la evaluación. A continuación se muestra un posible bucle de evaluación del sidecar:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clústeres en el mundo real

En un entorno de producción real, ejecutará todas las tareas en diferentes procesos en diferentes máquinas. La forma más sencilla de configurar la información del clúster en cada tarea es establecer las variables de entorno "TF_CONFIG" y usar TFConfigClusterResolver para analizar "TF_CONFIG". Para obtener una descripción general sobre las variables de entorno "TF_CONFIG", consulte la guía de formación distribuida .

Si comienza sus tareas de entrenamiento usando Kubernetes u otras plantillas de configuración, es muy probable que estas plantillas ya hayan configurado “TF_CONFIG” para usted.

Establecer la variable de entorno "TF_CONFIG"

Suponga que tiene 3 trabajadores y 2 servidores de parámetros, el "TF_CONFIG" del trabajador 1 puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

El "TF_CONFIG" del evaluador puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

La parte del "grupo" en la cadena "TF_CONFIG" anterior para el evaluador es opcional.

Si usa el mismo binario para todas las tareas

Si prefiere ejecutar todas estas tareas utilizando un solo binario, deberá dejar que su programa se ramifique en diferentes roles desde el principio:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

El siguiente código inicia un servidor de TensorFlow y espera:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Fallo de la tarea de manejo

Fracaso del trabajador

Como se mencionó anteriormente, ClusterCoordinator tiene tolerancia a fallas incorporada para fallas del trabajador. Tras la recuperación del trabajador, la porción correspondiente de conjuntos de datos creados por create_per_worker_dataset que todavía están dentro del alcance se volverá a crear invocando su dataset_fn original pasado a create_per_worker_dataset .

Error del servidor de parámetros o del coordinador

Sin embargo, cuando el coordinador ve un error del servidor de parámetros, generará un error UnavailableError o AbortedError inmediatamente. Puede reiniciar el coordinador en este caso. El propio coordinador también puede dejar de estar disponible. Por lo tanto, para no perder gran parte del progreso del entrenamiento, es importante verificar periódicamente las variables del modelo y cargar las variables del modelo desde un punto de control, si lo hay, antes de que comience el entrenamiento. El progreso del entrenamiento se puede inferir aproximadamente de optimizer.iterations si se marca un optimizador.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Obtener un valor RemoteValue

Se garantiza que la RemoteValue un valor RemoteValue éxito si una función se ejecuta correctamente. Esto se debe a que actualmente el valor de retorno se copia inmediatamente al coordinador después de que se ejecuta una función. Si hay algún error de trabajador durante la copia, la función se reintentará en otro trabajador disponible. Por lo tanto, si desea optimizar el rendimiento, puede programar funciones sin un valor de retorno.

Error al reportar

Una vez que el coordinador ve un error como UnavailableError de los servidores de parámetros u otros errores de la aplicación como un InvalidArgument de tf.debugging.check_numerics , cancelará todas las funciones pendientes y en cola antes de generar el error. Obtener sus correspondientes RemoteValue s generará un CancelledError .

Después de que se genera un error, el coordinador no generará el mismo error ni ningún error de las funciones canceladas.

Mejora del rendimiento

Hay varias razones posibles si ve problemas de rendimiento cuando entrena con ParameterServerStrategy y ClusterResolver .

Una razón común es que los servidores de parámetros tienen una carga desequilibrada y algunos servidores de parámetros muy cargados han alcanzado su capacidad. También puede haber varias causas fundamentales. Algunos métodos sencillos para mitigar este problema son

  1. fragmenta las variables de tu modelo grande especificando un variable_partitioner al construir una ParameterServerStrategy .
  2. Evite crear una variable de punto de acceso que sea requerida por todos los servidores de parámetros en un solo paso si es posible. Por ejemplo, use una tasa de aprendizaje constante o la subclase tf.keras.optimizers.schedules.LearningRateSchedule en optimizadores, ya que el comportamiento predeterminado es que la tasa de aprendizaje se convertirá en una variable colocada en un servidor de parámetros particular y solicitada por todos los demás servidores de parámetros en cada paso .
  3. baraja tus vocabularios extensos antes de pasarlos a las capas de preprocesamiento de Keras.

Otra posible razón de problemas de desempeño es el coordinador. Nuestra primera implementación de schedule / join está basada en Python y, por lo tanto, puede tener una sobrecarga de subprocesos. Además, la latencia entre el coordinador y los trabajadores puede ser grande. Si este es el caso, puede empaquetar varios pasos en una sola función tf.function .

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Seguiremos optimizando el coordinador y, con suerte, la mayoría de los usuarios no tendrán que empaquetar manualmente los pasos en el futuro.

Además, un pequeño truco para mejorar el rendimiento es programar funciones sin un valor de retorno, como se explica en la sección anterior de fallas de tareas de manejo.

Limitaciones conocidas

La mayoría de las limitaciones conocidas se tratan en las secciones anteriores. He aquí un resumen:

  • os.environment["grpc_fail_fast"]="use_caller" es necesario en cada tarea, incluido el coordinador, para que la tolerancia a fallos funcione correctamente.
  • Los trabajadores de GPU no son compatibles.
  • No se admite el entrenamiento del servidor de parámetros síncronos.
  • ParameterServerStrategy no funciona con las API de compile y fit Keras.
  • ClusterCoordinator.schedule no admite garantías de visitas para un conjunto de datos.
  • Cuando se usa ClusterCoordinator.create_per_worker_dataset , el conjunto de datos completo debe crearse dentro de la función que se le pasa.
  • Por lo general, es necesario agrupar varios pasos en una sola función para lograr un rendimiento óptimo.
  • No se admite cargar un modelo guardado a través de tf.saved_model.load que contenga variables fragmentadas. Tenga en cuenta que se espera que la carga de un modelo guardado con TensorFlow Serving funcione.
  • No se admite cargar un punto de control que contenga variables de ranura del optimizador fragmentadas en un número diferente de fragmentos.
  • No se admite la recuperación de una falla del servidor de parámetros sin reiniciar la tarea del coordinador.