Entrenamiento del servidor de parámetros con ParameterServerStrategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar libreta

Descripción general

El entrenamiento del servidor de parámetros es un método paralelo de datos común para escalar el entrenamiento del modelo en varias máquinas.

Un clúster de entrenamiento de servidor de parámetros consta de trabajadores y servidores de parámetros . Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. De forma predeterminada, los trabajadores leen y actualizan estas variables de forma independiente sin sincronizarse entre sí. Esta es la razón por la cual, a veces, el entrenamiento estilo servidor de parámetros se denomina entrenamiento asíncrono .

En TensorFlow 2, el entrenamiento del servidor de parámetros funciona con la clase tf.distribute.experimental.ParameterServerStrategy , que distribuye los pasos de entrenamiento a un clúster que se amplía a miles de trabajadores (acompañado de servidores de parámetros).

Métodos de entrenamiento compatibles

Hay dos métodos principales de entrenamiento admitidos:

Un clúster con trabajos y tareas

Independientemente de la API elegida ( Model.fit o un ciclo de entrenamiento personalizado), el entrenamiento distribuido en TensorFlow 2 implica: un 'cluster' con varios 'jobs' , y cada uno de los trabajos puede tener una o más 'tasks' .

Al usar el entrenamiento del servidor de parámetros, se recomienda tener:

  • Un trabajo de coordinador (que tiene el nombre de trabajo chief )
  • Múltiples trabajos de trabajador (nombre de trabajo worker ); y
  • Varios trabajos de servidor de parámetros (nombre de trabajo ps )

Mientras que el coordinador crea recursos, distribuye tareas de capacitación, escribe puntos de control y se ocupa de fallas de tareas, los trabajadores y los servidores de parámetros ejecutan tf.distribute.Server que escucha las solicitudes del coordinador.

Entrenamiento del servidor de parámetros con la API Model.fit

El entrenamiento del servidor de parámetros con la API de Model.fit requiere que el coordinador use un objeto tf.distribute.experimental.ParameterServerStrategy y un tf.keras.utils.experimental.DatasetCreator como entrada. Similar al uso de Model.fit sin estrategia, o con otras estrategias, el flujo de trabajo implica crear y compilar el modelo, preparar las devoluciones de llamada, seguidas de una llamada Model.fit .

Entrenamiento del servidor de parámetros con un bucle de entrenamiento personalizado

Con bucles de entrenamiento personalizados, la clase tf.distribute.experimental.coordinator.ClusterCoordinator es el componente clave que se usa para el coordinador.

La API más importante proporcionada por el objeto ClusterCoordinator es la schedule :

  • La API de schedule pone en cola una tf.function y devuelve un RemoteValue similar al futuro inmediatamente.
  • Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y sus RemoteValue s se completarán de forma asíncrona.
  • Dado que la schedule no requiere la asignación de trabajadores, la tf.function se puede ejecutar en cualquier trabajador disponible.
  • Si el trabajador en el que se ejecuta deja de estar disponible antes de su finalización, la función se volverá a intentar en otro trabajador disponible.
  • Debido a este hecho y al hecho de que la ejecución de la función no es atómica, una función puede ejecutarse más de una vez.

Además de enviar funciones remotas, ClusterCoordinator también ayuda a crear conjuntos de datos sobre todos los trabajadores y reconstruir estos conjuntos de datos cuando un trabajador se recupera de una falla.

Configuración del tutorial

El tutorial se ramificará en Model.fit y rutas de bucle de entrenamiento personalizadas, y puede elegir la que mejor se adapte a sus necesidades. Las secciones que no sean "Entrenamiento con X" son aplicables a ambos caminos.

pip install portpicker

Configuración del clúster

Como se mencionó anteriormente, un clúster de entrenamiento de servidor de parámetros requiere una tarea de coordinador que ejecute su programa de entrenamiento, uno o varios trabajadores y tareas de servidor de parámetros que ejecuten servidores de TensorFlow ( tf.distribute.Server y posiblemente una tarea de evaluación adicional que ejecute una evaluación adicional. (consulte la sección de evaluación del sidecar a continuación). Los requisitos para configurarlos son:

  • La tarea del coordinador necesita conocer las direcciones y los puertos de todos los demás servidores de TensorFlow, excepto el evaluador.
  • Los trabajadores y los servidores de parámetros necesitan saber qué puerto deben escuchar. En aras de la simplicidad, generalmente puede pasar la información completa del clúster al crear servidores TensorFlow en estas tareas.
  • La tarea del evaluador no tiene que conocer la configuración del clúster de entrenamiento. Si lo hace, no debería intentar conectarse al clúster de entrenamiento.
  • Los trabajadores y los servidores de parámetros deben tener tipos de tareas como "worker" y "ps" , respectivamente. El coordinador debe usar "chief" como tipo de tarea por razones heredadas.

En este tutorial, creará un clúster en proceso para que todo el entrenamiento del servidor de parámetros se pueda ejecutar en Colab. Aprenderá a configurar clústeres reales en una sección posterior.

Clúster en proceso

Comenzará creando varios servidores TensorFlow por adelantado y se conectará a ellos más tarde. Tenga en cuenta que esto es solo para el propósito de la demostración de este tutorial, y en el entrenamiento real, los servidores se iniciarán en máquinas "worker" y "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

La configuración del clúster en proceso se usa con frecuencia en las pruebas unitarias, como aquí .

Otra opción para las pruebas locales es iniciar procesos en la máquina local; consulte la capacitación para varios trabajadores con Keras para ver un ejemplo de este enfoque.

Crear una instancia de ParameterServerStrategy

Antes de sumergirse en el código de entrenamiento, creemos una instancia de un objeto ParameterServerStrategy . Tenga en cuenta que esto es necesario independientemente de si está procediendo con Model.fit o con un ciclo de entrenamiento personalizado. El argumento variable_partitioner se explicará en la sección Fragmentación de variables .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Para usar GPU para capacitación, asigne GPU visibles para cada trabajador. ParameterServerStrategy usará todas las GPU disponibles en cada trabajador, con la restricción de que todos los trabajadores deben tener la misma cantidad de GPU disponibles.

Fragmentación variable

La fragmentación de variables se refiere a dividir una variable en múltiples variables más pequeñas, que se denominan fragmentos . La fragmentación variable puede ser útil para distribuir la carga de la red al acceder a estos fragmentos. También es útil para distribuir el cálculo y el almacenamiento de una variable normal entre varios servidores de parámetros.

Para habilitar la fragmentación de variables, puede pasar un variable_partitioner al construir un objeto ParameterServerStrategy . Se invocará variable_partitioner cada vez que se cree una variable y se espera que devuelva el número de fragmentos a lo largo de cada dimensión de la variable. Se proporcionan algunos variable_partitioner listos para usar, como tf.distribute.experimental.partitioners.MinSizePartitioner . Se recomienda utilizar particiones basadas en el tamaño como tf.distribute.experimental.partitioners.MinSizePartitioner para evitar la partición de variables pequeñas, lo que podría tener un impacto negativo en la velocidad de entrenamiento del modelo.

Cuando se pasa un variable_partitioner y si crea una variable directamente en la strategy.scope() , se convertirá en un tipo de contenedor con una propiedad de variables que brinda acceso a la lista de fragmentos. En la mayoría de los casos, este contenedor se convertirá automáticamente en un tensor al concatenar todos los fragmentos. Como resultado, se puede utilizar como una variable normal. Por otro lado, algunos métodos de TensorFlow como tf.nn.embedding_lookup brindan una implementación eficiente para este tipo de contenedor y en estos métodos se evitará la concatenación automática.

Consulte los documentos de la API de tf.distribute.experimental.ParameterServerStrategy para obtener más detalles.

Entrenamiento con Model.fit

Keras proporciona una API de capacitación fácil de usar a través Model.fit que maneja el ciclo de capacitación bajo el capó, con la flexibilidad de anular train_step y devoluciones de llamada, que brindan funcionalidades como guardar puntos de control o guardar resúmenes para TensorBoard. Con Model.fit , el mismo código de entrenamiento se puede usar para otras estrategias con un simple intercambio del objeto de la estrategia.

Datos de entrada

Model.fit con el entrenamiento del servidor de parámetros requiere que los datos de entrada se proporcionen en un invocable que tome un solo argumento de tipo tf.distribute.InputContext y devuelva un tf.data.Dataset . Luego, cree un objeto tf.keras.utils.experimental.DatasetCreator que tome dicho objeto callable y un objeto tf.distribute.InputOptions opcional a través del argumento input_options .

Tenga en cuenta que se recomienda mezclar y repetir los datos con el entrenamiento del servidor de parámetros y especificar steps_per_epoch en la llamada de fit para que la biblioteca conozca los límites de la época.

Consulte el tutorial de entrada distribuida para obtener más información sobre el argumento InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

El código en dataset_fn se invocará en el dispositivo de entrada, que suele ser la CPU, en cada una de las máquinas de trabajo.

Construcción y compilación de modelos.

Ahora, creará un tf.keras.Model , un modelo tf.keras.models.Sequential trivial con fines de demostración, seguido de una llamada Model.compile para incorporar componentes, como un optimizador, métricas o parámetros como steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Devoluciones de llamadas y capacitación

Antes de llamar a model.fit para la capacitación real, preparemos las devoluciones de llamada necesarias para tareas comunes, como:

  • ModelCheckpoint : para guardar los pesos del modelo.
  • BackupAndRestore : para asegurarse de que el progreso de la capacitación se respalde automáticamente y se recupere si el clúster experimenta falta de disponibilidad (como cancelación o preferencia); o
  • TensorBoard : para guardar los informes de progreso en archivos de resumen, que se visualizan en la herramienta TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Uso directo con ClusterCoordinator (opcional)

Incluso si elige la ruta de entrenamiento Model.fit , puede crear una instancia opcional de un objeto tf.distribute.experimental.coordinator.ClusterCoordinator para programar otras funciones que le gustaría que se ejecutaran en los trabajadores. Consulte la sección Entrenamiento con un bucle de entrenamiento personalizado para obtener más detalles y ejemplos.

Entrenamiento con un bucle de entrenamiento personalizado

El uso de bucles de entrenamiento personalizados con tf.distribute.Strategy proporciona una gran flexibilidad para definir bucles de entrenamiento. Con la ParameterServerStrategy definida anteriormente (como strategy ), utilizará un tf.distribute.experimental.coordinator.ClusterCoordinator para enviar la ejecución de los pasos de capacitación a los trabajadores remotos.

Luego, creará un modelo, definirá un conjunto de datos y una función de paso, como lo ha hecho en el ciclo de entrenamiento con otros tf.distribute.Strategy s. Puede encontrar más detalles en el tutorial Entrenamiento personalizado con tf.distribute.Strategy .

Para garantizar una obtención previa eficiente del conjunto de datos, use las API de creación de conjuntos de datos distribuidos recomendadas que se mencionan en la sección Pasos de capacitación de envío a trabajadores remotos a continuación. Además, asegúrese de llamar a Strategy.run dentro de worker_fn para aprovechar al máximo las GPU asignadas a los trabajadores. El resto de pasos son los mismos para entrenar con o sin GPU.

Vamos a crear estos componentes en los siguientes pasos:

configurar los datos

Primero, escriba una función que cree un conjunto de datos que incluya la lógica de preprocesamiento implementada por las capas de preprocesamiento de Keras .

Creará estas capas fuera de dataset_fn pero aplicará la transformación dentro de dataset_fn , ya que envolverá dataset_fn en una tf.function , que no permite que se creen variables dentro de él.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Genere ejemplos de juguetes en un conjunto de datos:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Luego, crea el conjunto de datos de entrenamiento envuelto en un dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construye el modelo

A continuación, cree el modelo y otros objetos. Asegúrese de crear todas las variables en la strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Confirmemos que el uso de FixedShardsPartitioner dividió todas las variables en dos fragmentos y cada fragmento se asignó a diferentes servidores de parámetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Definir el paso de entrenamiento

Tercero, cree el paso de entrenamiento envuelto en una tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

En la función de paso de entrenamiento anterior, llamar a Strategy.run y Strategy.reduce en step_fn puede admitir varias GPU por trabajador. Si los trabajadores tienen GPU asignadas, Strategy.run distribuirá los conjuntos de datos en varias réplicas.

Envíe los pasos de capacitación a los trabajadores remotos

Después de que ParameterServerStrategy defina todos los cálculos, utilizará la clase tf.distribute.experimental.coordinator.ClusterCoordinator para crear recursos y distribuir los pasos de capacitación a los trabajadores remotos.

Primero creemos un objeto ClusterCoordinator y pasemos el objeto de estrategia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Luego, cree un conjunto de datos por trabajador y un iterador. En per_worker_dataset_fn continuación, se recomienda incluir strategy.distribute_datasets_from_function dataset_fn para permitir una captación previa eficiente a las GPU sin problemas.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

El paso final es distribuir el cómputo a los trabajadores remotos usando ClusterCoordinator.schedule :

  • El método de schedule pone en cola una tf.function y devuelve un RemoteValue similar al futuro inmediatamente. Las funciones en cola se enviarán a los trabajadores remotos en subprocesos en segundo plano y RemoteValue se completará de forma asíncrona.
  • El método de join ( ClusterCoordinator.join ) se puede usar para esperar hasta que se ejecuten todas las funciones programadas.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Así es como puede obtener el resultado de un RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternativamente, puede iniciar todos los pasos y hacer algo mientras espera que se completen:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para ver el flujo de trabajo completo de capacitación y servicio para este ejemplo en particular, consulte esta prueba .

Más sobre la creación de conjuntos de datos

El conjunto de datos del código anterior se crea mediante la API ClusterCoordinator.create_per_worker_dataset ). Crea un conjunto de datos por trabajador y devuelve un objeto contenedor. Puede llamar al método iter para crear un iterador por trabajador. El iterador por trabajador contiene un iterador por trabajador y la porción correspondiente de un trabajador se sustituirá en el argumento de entrada de la función que se pasa al método ClusterCoordinator.schedule antes de que la función se ejecute en un trabajador en particular.

Actualmente, el método ClusterCoordinator.schedule asume que los trabajadores son equivalentes y, por lo tanto, asume que los conjuntos de datos de diferentes trabajadores son los mismos, excepto que se pueden barajar de manera diferente si contienen una operación Dataset.shuffle . Debido a esto, también se recomienda que los conjuntos de datos se repitan indefinidamente y programe un número finito de pasos en lugar de depender del OutOfRangeError de un conjunto de datos.

Otra nota importante es que los conjuntos de datos tf.data no admiten la serialización y deserialización implícitas a través de los límites de la tarea. Por lo tanto, es importante crear todo el conjunto de datos dentro de la función que se pasa a ClusterCoordinator.create_per_worker_dataset .

Evaluación

Hay más de una forma de definir y ejecutar un ciclo de evaluación en el entrenamiento distribuido. Cada uno tiene sus pros y sus contras, como se describe a continuación. Se recomienda el método de evaluación en línea si no tiene preferencia.

Evaluación en línea

En este método, el coordinador alterna entre capacitación y evaluación y por eso se le llama evaluación en línea .

Hay varios beneficios de la evaluación en línea. Por ejemplo:

  • Puede admitir grandes modelos de evaluación y conjuntos de datos de evaluación que una sola tarea no puede contener.
  • Los resultados de la evaluación se pueden utilizar para tomar decisiones para entrenar la próxima época.

Hay dos formas de implementar la evaluación en línea: evaluación directa y evaluación distribuida.

  • Evaluación directa : para modelos pequeños y conjuntos de datos de evaluación, el coordinador puede ejecutar la evaluación directamente en el modelo distribuido con el conjunto de datos de evaluación en el coordinador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Evaluación distribuida : para modelos grandes o conjuntos de datos que no son factibles de ejecutar directamente en el coordinador, la tarea del coordinador puede distribuir tareas de evaluación a los trabajadores a través de los métodos ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Evaluación del sidecar

Otro método se llama evaluación de sidecar, donde crea una tarea de evaluador dedicada que lee repetidamente los puntos de control y ejecuta la evaluación en un punto de control más reciente. Permite que su programa de entrenamiento termine antes si no necesita cambiar su ciclo de entrenamiento en función de los resultados de la evaluación. Sin embargo, requiere una tarea de evaluador adicional y controles periódicos para activar la evaluación. A continuación se muestra un posible ciclo de evaluación del sidecar:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clústeres en el mundo real

En un entorno de producción real, ejecutará todas las tareas en diferentes procesos en diferentes máquinas. La forma más sencilla de configurar la información del clúster en cada tarea es configurar las variables de entorno "TF_CONFIG" y usar un tf.distribute.cluster_resolver.TFConfigClusterResolver para analizar "TF_CONFIG" .

Para obtener una descripción general sobre las variables de entorno "TF_CONFIG" , consulte la guía de capacitación distribuida .

Si comienza sus tareas de capacitación utilizando Kubernetes u otras plantillas de configuración, es muy probable que estas plantillas ya hayan configurado “TF_CONFIG" para usted.

Establecer la variable de entorno "TF_CONFIG"

Supongamos que tiene 3 trabajadores y 2 servidores de parámetros, el "TF_CONFIG" del trabajador 1 puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

El "TF_CONFIG" del evaluador puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

La parte "cluster" en la cadena "TF_CONFIG" anterior para el evaluador es opcional.

Si usa el mismo binario para todas las tareas

Si prefiere ejecutar todas estas tareas usando un solo binario, deberá dejar que su programa se divida en diferentes roles desde el principio:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

El siguiente código inicia un servidor TensorFlow y espera:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Manejo de fallas en la tarea

fracaso del trabajador

tf.distribute.experimental.coordinator.ClusterCoordinator o Model.fit brindan tolerancia a fallas integrada para fallas del trabajador. Tras la recuperación del trabajador, la función del conjunto de datos proporcionada anteriormente (ya sea a ClusterCoordinator.create_per_worker_dataset para un bucle de entrenamiento personalizado o tf.keras.utils.experimental.DatasetCreator para Model.fit ) se invocará en los trabajadores para volver a crear los conjuntos de datos.

Fallo del servidor de parámetros o del coordinador

Sin embargo, cuando el coordinador ve un error del servidor de parámetros, generará un error no UnavailableError o un AbortedError inmediato. Puede reiniciar el coordinador en este caso. El propio coordinador también puede dejar de estar disponible. Por lo tanto, se recomienda cierto utillaje para no perder el progreso del entrenamiento:

  • Para Model.fit , debe usar una devolución de llamada BackupAndRestore , que maneja el guardado y la restauración del progreso automáticamente. Consulte la sección Devolución de llamada y capacitación anterior para ver un ejemplo.

  • Para un ciclo de entrenamiento personalizado, debe verificar las variables del modelo periódicamente y cargar las variables del modelo desde un punto de control, si lo hay, antes de que comience el entrenamiento. El progreso del entrenamiento se puede deducir aproximadamente de las iteraciones del optimizer.iterations si se marca un optimizador:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Obtener un RemoteValue

Se garantiza que la obtención de un RemoteValue tenga éxito si una función se ejecuta correctamente. Esto se debe a que actualmente el valor de retorno se copia inmediatamente al coordinador después de ejecutar una función. Si hay algún error en el trabajador durante la copia, la función se volverá a intentar en otro trabajador disponible. Por lo tanto, si desea optimizar el rendimiento, puede programar funciones sin valor de retorno.

Error al reportar

Una vez que el coordinador ve un error como UnavailableError de los servidores de parámetros u otros errores de la aplicación, como un argumento no tf.debugging.check_numerics InvalidArgument cancelará todas las funciones pendientes y en cola antes de generar el error. Obtener sus RemoteValue correspondientes generará un CancelledError .

Después de que se genera un error, el coordinador no generará el mismo error ni ningún error de las funciones canceladas.

Mejora del rendimiento

Hay varias razones posibles si ve problemas de rendimiento cuando entrena con ParameterServerStrategy y ClusterResolver .

Una razón común es que los servidores de parámetros tienen una carga desequilibrada y algunos servidores de parámetros muy cargados han alcanzado su capacidad máxima. También puede haber múltiples causas raíz. Algunos métodos simples para mitigar este problema son:

  1. Fragmente las variables de su modelo grande especificando un variable_partitioner al construir una ParameterServerStrategy .
  2. Si es posible, evite crear una variable de punto de acceso requerida por todos los servidores de parámetros en un solo paso. Por ejemplo, use una tasa de aprendizaje constante o subclase tf.keras.optimizers.schedules.LearningRateSchedule en los optimizadores, ya que el comportamiento predeterminado es que la tasa de aprendizaje se convertirá en una variable colocada en un servidor de parámetros en particular y solicitada por todos los demás servidores de parámetros en cada paso. .
  3. Mezcla tus grandes vocabularios antes de pasarlos a las capas de preprocesamiento de Keras.

Otra posible razón de los problemas de rendimiento es el coordinador. Su primera implementación de schedule / join está basada en Python y, por lo tanto, puede tener una sobrecarga de subprocesos. Además, la latencia entre el coordinador y los trabajadores puede ser grande. Si este es el caso,

  • Para Model.fit , puede establecer el argumento steps_per_execution proporcionado en Model.compile en un valor mayor que 1.

  • Para un ciclo de entrenamiento personalizado, puede empaquetar varios pasos en una sola tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

A medida que la biblioteca se optimice aún más, es de esperar que la mayoría de los usuarios no tengan que empaquetar manualmente los pasos en el futuro.

Además, un pequeño truco para mejorar el rendimiento es programar funciones sin un valor de retorno, como se explica en la sección de errores de tareas de manejo anterior.

Limitaciones conocidas

La mayoría de las limitaciones conocidas ya están cubiertas en las secciones anteriores. Esta sección proporciona un resumen.

ParameterServerStrategy general

  • os.environment["grpc_fail_fast"]="use_caller" es necesario en todas las tareas, incluido el coordinador, para que la tolerancia a fallas funcione correctamente.
  • No se admite el entrenamiento del servidor de parámetros síncrono.
  • Por lo general, es necesario empaquetar varios pasos en una sola función para lograr un rendimiento óptimo.
  • No se admite cargar un modelo guardado a través tf.saved_model.load que contiene variables fragmentadas. Tenga en cuenta que se espera que funcione la carga de un modelo guardado de este tipo con TensorFlow Serving.
  • No se admite cargar un punto de control que contenga variables de ranura del optimizador fragmentadas en una cantidad diferente de fragmentos.
  • No se admite la recuperación de una falla del servidor de parámetros sin reiniciar la tarea del coordinador.
  • El uso de tf.lookup.StaticHashTable (que comúnmente emplean algunas capas de preprocesamiento de Keras, como tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup y tf.keras.layers.TextVectorization ) da como resultado recursos colocados en el coordinador en este momento con la formación del servidor de parámetros. Esto tiene implicaciones de rendimiento para la búsqueda de RPC desde los trabajadores hasta el coordinador. Esta es una alta prioridad actual para abordar.

Específicos Model.fit

  • Se requiere el argumento steps_per_epoch en Model.fit . Puede seleccionar un valor que proporcione intervalos apropiados en una época.
  • ParameterServerStrategy no admite devoluciones de llamada personalizadas que tengan llamadas de nivel de lote por motivos de rendimiento. Debe convertir esas llamadas en llamadas de nivel de época con los pasos seleccionados steps_per_epoch para que se steps_per_epoch cada número de pasos de pasos por época. Las devoluciones de llamada integradas no se ven afectadas: sus llamadas a nivel de lote se han modificado para que funcionen. Se está planificando admitir llamadas a nivel de lote para ParameterServerStrategy .
  • Por la misma razón, a diferencia de otras estrategias, la barra de progreso y las métricas se registran solo en los límites de época.
  • run_eagerly no es compatible.

Detalles específicos del bucle de entrenamiento personalizado