¡Google I / O regresa del 18 al 20 de mayo! Reserva espacio y crea tu horario Regístrate ahora

Entrenamiento del servidor de parámetros

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Descripción general

El entrenamiento del servidor de parámetros es un método común de datos paralelos para escalar el entrenamiento de modelos en varias máquinas. Un clúster de formación del servidor de parámetros consta de trabajadores y servidores de parámetros. Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. De forma predeterminada, los trabajadores leen y actualizan estas variables de forma independiente sin sincronizarse entre sí. Esta es la razón por la que a veces el entrenamiento de estilo de servidor de parámetros se denomina entrenamiento asincrónico.

En TF2, el entrenamiento del servidor de parámetros está impulsado por la clase tf.distribute.experimental.ParameterServerStrategy , que distribuye los pasos del entrenamiento a un clúster que escala hasta miles de trabajadores (acompañado por servidores de parámetros). Hay dos API de entrenamiento principales compatibles: Keras Training API, también conocida como Model.fit , y Custom Training Loop (CTL). Model.fit se recomienda cuando los usuarios prefieren una abstracción y manejo de entrenamiento de alto nivel, mientras que CTL se recomienda cuando los usuarios prefieren definir los detalles de su ciclo de entrenamiento.

Independientemente de la API de elección, el entrenamiento distribuido en TF2 implica un "clúster" con varios "trabajos", y cada uno de los trabajos puede tener una o más "tareas". Cuando se utiliza la formación del servidor de parámetros, se recomienda tener un trabajo de coordinador (que tiene el nombre del trabajo chief ), varios trabajos de trabajador (nombre de trabajo worker ) y varios trabajos de servidor de parámetros (nombre de trabajo ps ).

Mientras que el coordinador crea recursos, distribuye tareas de capacitación, escribe puntos de control y se ocupa de fallas en las tareas, los trabajadores y los servidores de parámetros ejecutan tf.distribute.Server que escucha las solicitudes del coordinador.

Entrenamiento del servidor de parámetros con la API Model.fit

El entrenamiento del servidor de parámetros con la API Model.fit requiere que el coordinador use un objeto tf.distribute.experimental.ParameterServerStrategy y un tf.keras.utils.experimental.DatasetCreator como entrada. De manera similar al uso de Model.fit sin estrategia, o con otras estrategias, el flujo de trabajo implica la creación y compilación del modelo, la preparación de las devoluciones de llamada, seguido de una llamada de Model.fit .

Entrenamiento del servidor de parámetros con API de bucle de entrenamiento personalizado (CTL)

Con CTL, la clase tf.distribute.experimental.coordinator.ClusterCoordinator es el componente clave que se utiliza para el coordinador. La clase ClusterCoordinator debe funcionar junto con un objeto tf.distribute.Strategy . Este objeto tf.distribute.Strategy es necesario para proporcionar la información del clúster y se utiliza para definir un paso de entrenamiento como hemos visto en el entrenamiento personalizado con MirroredStrategy . A ClusterCoordinator objeto ClusterCoordinator envía la ejecución de estos pasos de formación a los trabajadores remotos. Para el entrenamiento del servidor de parámetros, ClusterCoordinator necesita trabajar con un tf.distribute.experimental.ParameterServerStrategy .

La API más importante proporcionada por el objeto ClusterCoordinator es la schedule . La API de schedule pone en cola una función tf.function y devuelve inmediatamente un valor RemoteValue similar al RemoteValue . Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y sus RemoteValue se completarán de forma asincrónica. Dado que el schedule no requiere la asignación de un trabajador, la función tf.function pasada se puede ejecutar en cualquier trabajador disponible. Si el trabajador en el que se ejecuta deja de estar disponible antes de su finalización, la función se reintentará en otro trabajador disponible. Debido a este hecho y al hecho de que la ejecución de la función no es atómica, una función puede ejecutarse más de una vez.

Además de enviar funciones remotas, ClusterCoordinator también ayuda a crear conjuntos de datos en todos los trabajadores y reconstruir estos conjuntos de datos cuando un trabajador se recupera de una falla.

Configuración del tutorial

El tutorial se ramificará en rutas CTL o Model.fit , y puede elegir la que se adapte a sus necesidades. Las secciones distintas de "Entrenamiento con X" se pueden aplicar a ambas rutas.

pip install -q portpicker
pip install -q tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Configuración de clúster

Como se mencionó anteriormente, un clúster de entrenamiento del servidor de parámetros requiere una tarea de coordinador que ejecuta su programa de entrenamiento, uno o varios trabajadores y tareas del servidor de parámetros que ejecutan servidores de TensorFlow, es decir, tf.distribute.Server , y posiblemente una tarea de evaluación adicional que ejecuta side-car evaluación (consulte la sección de evaluación del sidecar a continuación). Los requisitos para configurarlos son:

  • La tarea del coordinador debe conocer las direcciones y los puertos de todos los demás servidores de TensorFlow, excepto el evaluador.
  • Los trabajadores y los servidores de parámetros necesitan saber qué puerto necesitan escuchar. En aras de la simplicidad, generalmente pasamos la información completa del clúster cuando creamos servidores TensorFlow en estas tareas.
  • La tarea del evaluador no tiene que conocer la configuración del grupo de capacitación. Si es así, no debería intentar conectarse al clúster de entrenamiento.
  • Los trabajadores y los servidores de parámetros deben tener tipos de tareas como "trabajador" y "ps" respectivamente. El coordinador debe utilizar "jefe" como tipo de tarea por motivos heredados.

En este tutorial, crearemos un clúster en proceso para que todo el entrenamiento del servidor de parámetros se pueda ejecutar en colab. Introduciremos cómo configurar clústeres reales en una sección posterior.

Clúster en proceso

En este tutorial, iniciaremos varios servidores de TensorFlow con anticipación y nos conectaremos a ellos más tarde. Tenga en cuenta que esto es solo para el propósito de la demostración de este tutorial, y en el entrenamiento real, los servidores se iniciarán en las máquinas worker y ps.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

La configuración del clúster en proceso se utiliza con frecuencia en nuestras pruebas unitarias. He aquí un ejemplo .

Crear una instancia de ParameterServerStrategy

Antes de sumergirnos en el código de entrenamiento, creemos una instancia de un objeto ParameterServerStrategy . Tenga en cuenta que esto es necesario independientemente de si está procediendo con un ciclo de entrenamiento personalizado o Model.fit . variable_partitioner argumento variable_partitioner se explicará en la siguiente sección .

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:23712', 'localhost:20335'], 'worker': ['localhost:18895', 'localhost:15135', 'localhost:19616']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:23712', 'localhost:20335'], 'worker': ['localhost:18895', 'localhost:15135', 'localhost:19616']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0

Para usar GPU para la capacitación, asigne GPU visibles para cada trabajador. ParameterServerStrategy utilizará todas las GPU disponibles en cada trabajador, con la restricción de que todos los trabajadores deben tener la misma cantidad de GPU disponibles.

Fragmentación variable

La fragmentación de variables se refiere a dividir una variable en varias variables más pequeñas. A estas variables más pequeñas las llamamos fragmentos . La fragmentación variable puede resultar útil para distribuir la carga de la red al acceder a estas fragmentos. También es útil distribuir el cálculo y el almacenamiento de una variable normal entre varios servidores de parámetros.

Para habilitar la fragmentación de variables, puede pasar un variable_partitioner al construir un objeto ParameterServerStrategy . El variable_partitioner se invocará cada vez que se cree una variable y se espera que devuelva el número de fragmentos a lo largo de cada dimensión de la variable. Se proporcionan algunos tf.distribute.experimental.partitioners.FixedShardsPartitioner variable_partitioner tf.distribute.experimental.partitioners.FixedShardsPartitioner para tf.distribute.experimental.partitioners.FixedShardsPartitioner , como tf.distribute.experimental.partitioners.FixedShardsPartitioner .

Cuando se pasa un variable_partitioner y si crea una variable directamente en strategy.scope() , se convertirá en un tipo de contenedor con una propiedad de variables que proporciona acceso a la lista de fragmentos. En la mayoría de los casos, este contenedor se convertirá automáticamente en un tensor concatenando todos los fragmentos. Como resultado, se puede utilizar como una variable normal. Por otro lado, algunos métodos de TensorFlow como tf.nn.embedding_lookup proporcionan una implementación eficiente para este tipo de contenedor y en estos métodos se evitará la concatenación automática.

Consulte la cadena de documentación API de ParameterServerStrategy para obtener más detalles.

Entrenamiento con Model.fit

Keras proporciona una API de entrenamiento fácil de usar a través de Model.fit que maneja el ciclo de entrenamiento debajo del capó, con la flexibilidad de train_step train_step y devoluciones de llamada que brindan funcionalidades como el guardado de puntos de control o el guardado de resumen para TensorBoard. Con Model.fit , el mismo código de entrenamiento se puede utilizar para otras estrategias con un simple intercambio del objeto de estrategia.

Datos de entrada

Model.fit con el entrenamiento del servidor de parámetros requiere que los datos de entrada se proporcionen en un invocable que tome un solo argumento de tipo tf.distribute.InputContext y devuelva untf.data.Dataset . Luego, cree un objeto tf.keras.utils.experimental.DatasetCreator que tome dicho objeto callable y un objeto tf.distribute.InputOptions opcional a través del argumento input_options . Tenga en cuenta que se recomienda mezclar y repetir los datos con el entrenamiento del servidor de parámetros y especificar steps_per_epoch en fit call para que la biblioteca conozca los límites de la época.

Consulte la guía de entrada distribuida para obtener más información sobre el argumento InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

El código en dataset_fn se invocará en el dispositivo de entrada, que suele ser la CPU, en cada una de las máquinas trabajadoras.

Construcción y compilación de modelos

Ahora, creará un tf.keras.Model con las API de su elección ( tf.keras.models.Sequential se utiliza un modelo trivial tf.keras.models.Sequential como demostración), seguido de una llamada Model.compile para incorporar componentes como el optimizador, métricas o parámetros como steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Devolución de llamada y formación

Antes de llamar a model.fit para el entrenamiento real, preparemos las devoluciones de llamada necesarias para tareas comunes como:

  • ModelCheckpoint : para guardar los pesos del modelo.

  • BackupAndRestore : para asegurarse de que el progreso del entrenamiento se BackupAndRestore automáticamente y se recupere si el clúster experimenta indisponibilidad (como aborto o preferencia), o

  • TensorBoard : para guardar los informes de progreso en archivos de resumen que se visualizan en la herramienta TensorBoard.

Tenga en cuenta que, debido a la consideración del rendimiento, las devoluciones de llamada personalizadas no pueden anular las devoluciones de llamada a nivel de lote cuando se utilizan con ParameterServerStrategy . Modifique sus devoluciones de llamada personalizadas para convertirlas en llamadas de nivel de época y ajuste steps_per_epoch a un valor adecuado. Además, steps_per_epoch es un argumento obligatorio para Model.fit cuando se usa con ParameterServerStrategy .

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 4s - loss: 0.4724
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.4312
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.3188
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f2986a82b70> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f299e0cf510> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.3544
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.2721
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<tensorflow.python.keras.callbacks.History at 0x7f29a24334a8>

Uso directo con ClusterCoordinator (opcional)

Incluso si elige la Model.fit entrenamiento Model.fit , opcionalmente puede crear una instancia de un objeto ClusterCoordinator para programar otras funciones que le gustaría que se ejecuten en los trabajadores. Consulte la sección Entrenamiento con bucle de entrenamiento personalizado a continuación para obtener más detalles y ejemplos.

Entrenamiento con bucle de entrenamiento personalizado

El ciclo de entrenamiento personalizado con tf.distribute.Strategy proporciona una gran flexibilidad para definir los ciclos de entrenamiento. Con la ParameterServerStrategy definida anteriormente, utilizará un ClusterCoordinator para enviar la ejecución de los pasos de capacitación a los trabajadores remotos.

Luego, creará un modelo, definirá un conjunto de datos y una función de paso como hemos visto en el ciclo de entrenamiento con otras tf.distribute.Strategy s. Puede encontrar más detalles en este tutorial .

Para garantizar una captura previa de conjuntos de datos eficiente, use las API de creación de conjuntos de datos distribuidos recomendadas que se mencionan en la sección Pasos de capacitación de envío a trabajadores remotos a continuación. Además, asegúrese de llamar a strategy.run dentro de worker_fn para aprovechar al máximo las GPU asignadas a los trabajadores. El resto de los pasos son los mismos para entrenar con o sin GPU.

Creemos estos componentes en los siguientes pasos:

Configurar los datos

Primero, escriba una función que cree un conjunto de datos que incluya la lógica de preprocesamiento implementada por las capas de preprocesamiento de Keras. Crearemos estas capas fuera del dataset_fn pero aplicaremos la transformación dentro del dataset_fn ya que envolverás el dataset_fn en una función tf.function que no permite que se creen variables dentro de él.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab,
                                          mask_token=None)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Genere ejemplos de juguetes en un conjunto de datos:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Luego creamos el conjunto de datos de entrenamiento envuelto en un dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construye el modelo

En segundo lugar, creamos el modelo y otros objetos. Asegúrese de crear todas las variables en strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Confirmemos que el uso de FixedShardsPartitioner dividió todas las variables en dos fragmentos y que cada fragmento se asignó a diferentes servidores de parámetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Definir el paso de entrenamiento

En tercer lugar, cree el paso de entrenamiento envuelto en una función tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

En la función de paso por encima, llamando strategy.run y strategy.reduce en el step_fn puede soportar múltiples GPU por trabajador. Si los trabajadores tienen GPU asignadas, strategy.run distribuirá los conjuntos de datos en varias réplicas.

Envío de pasos de formación a trabajadores remotos

Después de que ParameterServerStrategy defina todos los cálculos, usaremos la clase ClusterCoordinator para crear recursos y distribuir los pasos de capacitación a los trabajadores remotos.

Primero ClusterCoordinator un objeto ClusterCoordinator y ClusterCoordinator objeto de estrategia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Luego creamos un conjunto de datos por trabajador y un iterador. En el per_worker_dataset_fn continuación, se recomienda envolver el dataset_fn en strategy.distribute_datasets_from_function para permitir una dataset_fn eficiente a las GPU sin problemas.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

El último paso es distribuir el cálculo a los trabajadores remotos mediante el schedule . El método de schedule pone en cola una función tf.function y devuelve inmediatamente un valor RemoteValue similar al RemoteValue . Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y RemoteValue se completará de forma asincrónica. El método de join se puede utilizar para esperar hasta que se ejecuten todas las funciones programadas.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.412500.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Así es como puede obtener el resultado de un valor RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.003233

Alternativamente, puede iniciar todos los pasos y hacer algo mientras espera su finalización:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para obtener el flujo de trabajo completo de capacitación y servicio para este ejemplo en particular, consulte esta prueba .

Más sobre la creación de conjuntos de datos

El conjunto de datos en el código anterior se crea utilizando la API create_per_worker_dataset . Crea un conjunto de datos por trabajador y devuelve un objeto contenedor. Puede llamar al método iter para crear un iterador por trabajador. El iterador por trabajador contiene un iterador por trabajador y la porción correspondiente de un trabajador se sustituirá en el argumento de entrada de la función pasada al método de schedule antes de que la función se ejecute en un trabajador en particular.

Actualmente, el método de schedule asume que los trabajadores son equivalentes y, por lo tanto, asume que los conjuntos de datos de diferentes trabajadores son los mismos, excepto que pueden mezclarse de manera diferente si contienen una operación dataset.shuffle . Debido a esto, también recomendamos que los conjuntos de datos se repitan indefinidamente y programar un número finito de pasos en lugar de depender del OutOfRangeError de un conjunto de datos.

Otra nota importante es que los tf.data datos tf.data no admiten la serialización implícita y la deserialización a través de los límites de las tareas. Por eso es importante crear el conjunto de datos completo dentro de la función pasada a create_per_worker_dataset .

Evaluación

Hay más de una forma de definir y ejecutar un ciclo de evaluación en el entrenamiento distribuido. Cada uno tiene sus pros y sus contras, como se describe a continuación. Se recomienda el método de evaluación en línea si no tiene una preferencia.

Evaluación en línea

En este método, el coordinador alterna entre capacitación y evaluación y, por lo tanto, lo llamamos evaluación en línea. Hay varios beneficios de la evaluación en línea. Por ejemplo, puede admitir grandes modelos de evaluación y conjuntos de datos de evaluación que una sola tarea no puede contener. Por otro ejemplo, los resultados de la evaluación se pueden utilizar para tomar decisiones para entrenar en la próxima época.

Hay dos formas de implementar la evaluación en línea:

  • Evaluación directa : para modelos pequeños y conjuntos de datos de evaluación, el coordinador puede ejecutar la evaluación directamente en el modelo distribuido con el conjunto de datos de evaluación en el coordinador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Evaluación distribuida : para modelos grandes o conjuntos de datos que no se pueden ejecutar directamente en el coordinador, la tarea del coordinador puede distribuir las tareas de evaluación a los trabajadores a través de los métodos de schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Evaluación de sidecar

Otro método se llama evaluación de sidecar, que consiste en crear una tarea de evaluador dedicada que lee repetidamente los puntos de control y ejecuta la evaluación en un punto de control más reciente. Permite que su programa de entrenamiento finalice antes si no necesita cambiar su ciclo de entrenamiento en función de los resultados de la evaluación. Sin embargo, requiere una tarea de evaluador adicional y puntos de control periódicos para desencadenar la evaluación. A continuación se muestra un posible bucle de evaluación del sidecar:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clústeres en el mundo real

En un entorno de producción real, ejecutará todas las tareas en diferentes procesos en diferentes máquinas. La forma más sencilla de configurar la información del clúster en cada tarea es establecer las variables de entorno "TF_CONFIG" y utilizar un tf.distribute.cluster_resolver.TFConfigClusterResolver para analizar "TF_CONFIG". Para obtener una descripción general sobre las variables de entorno "TF_CONFIG", consulte la guía de formación distribuida .

Si comienza sus tareas de entrenamiento usando Kubernetes u otras plantillas de configuración, es muy probable que estas plantillas ya hayan configurado “TF_CONFIG” para usted.

Establecer la variable de entorno "TF_CONFIG"

Supongamos que tiene 3 trabajadores y 2 servidores de parámetros, el "TF_CONFIG" del trabajador 1 puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

El "TF_CONFIG" del evaluador puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

La parte del "grupo" en la cadena "TF_CONFIG" anterior para el evaluador es opcional.

Si usa el mismo binario para todas las tareas

Si prefiere ejecutar todas estas tareas utilizando un solo binario, deberá dejar que su programa se ramifique en diferentes roles desde el principio:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

El siguiente código inicia un servidor de TensorFlow y espera:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Fallo de la tarea de manejo

Fracaso del trabajador

ClusterCoordinator o Model.fit proporciona tolerancia a fallas incorporada para fallas del trabajador. Tras la recuperación del trabajador, la función de conjunto de datos proporcionada anteriormente (ya sea para create_per_worker_dataset para CTL o DatasetCreator para Model.fit ) se invocará en los trabajadores para recrear los conjuntos de datos.

Fallo del coordinador o del servidor de parámetros

Sin embargo, cuando el coordinador ve un error del servidor de parámetros, generará un error UnavailableError o AbortedError inmediatamente. Puede reiniciar el coordinador en este caso. El propio coordinador también puede dejar de estar disponible. Por lo tanto, se recomiendan ciertas herramientas para no perder el progreso del entrenamiento:

  • Para Model.fit , debe usar una BackupAndRestore llamada BackupAndRestore , que maneja el guardado y la restauración del progreso automáticamente. Consulte la sección Devolución de llamadas y capacitación anterior para ver un ejemplo.

  • Para los CTL, debe verificar periódicamente las variables del modelo y cargar las variables del modelo desde un punto de control, si lo hubiera, antes de que comience el entrenamiento. El progreso del entrenamiento se puede inferir aproximadamente de optimizer.iterations si se marca un optimizador:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue un valor RemoteValue

Se garantiza que la RemoteValue se realizará correctamente si una función se ejecuta correctamente. Esto se debe a que actualmente el valor de retorno se copia inmediatamente al coordinador después de que se ejecuta una función. Si hay algún error de trabajador durante la copia, la función se reintentará en otro trabajador disponible. Por lo tanto, si desea optimizar el rendimiento, puede programar funciones sin un valor de retorno.

Error al reportar

Una vez que el coordinador ve un error como UnavailableError de los servidores de parámetros u otros errores de la aplicación como un InvalidArgument de tf.debugging.check_numerics , cancelará todas las funciones pendientes y en cola antes de generar el error. Obtener sus correspondientes RemoteValue s generará un CancelledError .

Después de que se genera un error, el coordinador no generará el mismo error ni ningún error de las funciones canceladas.

Mejora del rendimiento

Hay varias razones posibles si ve problemas de rendimiento cuando entrena con ParameterServerStrategy y ClusterResolver .

Una razón común es que los servidores de parámetros tienen una carga desequilibrada y algunos servidores de parámetros muy cargados han alcanzado su capacidad. También puede haber varias causas fundamentales. Algunos métodos sencillos para mitigar este problema son

  1. fragmenta las variables de tu modelo grande especificando un variable_partitioner al construir una ParameterServerStrategy .
  2. Evite crear una variable de punto de acceso que sea requerida por todos los servidores de parámetros en un solo paso si es posible. Por ejemplo, use una tasa de aprendizaje constante o la subclase tf.keras.optimizers.schedules.LearningRateSchedule en optimizadores, ya que el comportamiento predeterminado es que la tasa de aprendizaje se convertirá en una variable colocada en un servidor de parámetros en particular y solicitada por todos los demás servidores de parámetros en cada paso. .
  3. baraja tus vocabularios extensos antes de pasarlos a las capas de preprocesamiento de Keras.

Otra posible razón de problemas de desempeño es el coordinador. Nuestra primera implementación de schedule / join está basada en Python y, por lo tanto, puede tener una sobrecarga de subprocesos. Además, la latencia entre el coordinador y los trabajadores puede ser grande. Si este es el caso,

  • Para Model.fit , puede establecer el argumento steps_per_execution proporcionado en Model.compile en un valor mayor que 1.

  • Para CTL, puede empaquetar varios pasos en una sola función tf.function :

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

A medida que continuamos optimizando la biblioteca, esperamos que la mayoría de los usuarios no tengan que empaquetar manualmente los pasos en el futuro.

Además, un pequeño truco para mejorar el rendimiento es programar funciones sin un valor de retorno, como se explica en la sección anterior de fallas de tareas de manejo.

Limitaciones conocidas

La mayoría de las limitaciones conocidas se tratan en las secciones anteriores. Esta sección proporciona un resumen.

ParameterServerStrategy general

  • os.environment["grpc_fail_fast"]="use_caller" es necesario en todas las tareas, incluido el coordinador, para que la tolerancia a fallos funcione correctamente.
  • No se admite el entrenamiento del servidor de parámetros síncronos.
  • Por lo general, es necesario agrupar varios pasos en una sola función para lograr un rendimiento óptimo.
  • No se admite cargar un modelo guardado a través de tf.saved_model.load que contenga variables fragmentadas. Tenga en cuenta que se espera que la carga de un modelo guardado con TensorFlow Serving funcione.
  • No se admite cargar un punto de control que contenga variables de ranura del optimizador fragmentadas en un número diferente de fragmentos.
  • No se admite la recuperación de una falla del servidor de parámetros sin reiniciar la tarea del coordinador.
  • El uso de tf.lookup.StaticHashTable (que es comúnmente empleado por algunas capas de tf.keras.layers.experimental.preprocessing , como IntegerLookup , StringLookup y TextVectorization ) da como resultado recursos colocados en el coordinador en este momento con entrenamiento de PS. Esto tiene una implicación en el rendimiento de la búsqueda de RPC de los trabajadores al coordinador. Esta es una alta prioridad actual para abordar.

Model.fit

  • steps_per_epoch argumento steps_per_epoch es obligatorio en Model.fit . Puede seleccionar un valor que proporcione intervalos apropiados en una época.
  • ParameterServerStrategy no admite devoluciones de llamada personalizadas que tienen llamadas a nivel de lote por motivos de rendimiento. Debe convertir esas llamadas en llamadas de nivel de época con steps_per_epoch seleccionados adecuadamente, de modo que se llamen cada número de pasos de steps_per_epoch . Las devoluciones de llamada integradas no se ven afectadas: sus llamadas a nivel de lote se han modificado para que sean eficaces. Se está planificando la compatibilidad con llamadas a nivel de lote para ParameterServerStrategy .
  • Por la misma razón, a diferencia de otras estrategias, la barra de progreso y las métricas se registran solo en los límites de época.
  • La entrada para Model.fit solo toma el tipo DatasetCreator .
  • run_eagerly no es compatible.
  • La evaluación en Model.fit aún no es compatible. Ésta es una de las prioridades.
  • Model.evaluate y Model.predict aún no son compatibles.

Detalles específicos del ciclo de entrenamiento personalizado