Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Entrenamiento de múltiples trabajadores con Keras

Ver en TensorFlow.org Ejecutar en Google Colab Ver código fuente en GitHub Descargar cuaderno

Visión general

Este tutorial demuestra la capacitación distribuida de varios trabajadores con el modelo Keras utilizando tf.distribute.Strategy API, específicamente tf.distribute.experimental.MultiWorkerMirroredStrategy . Con la ayuda de esta estrategia, un modelo de Keras que fue diseñado para ejecutarse en un solo trabajador puede funcionar sin problemas en varios trabajadores con un mínimo cambio de código.

La guía de Capacitación distribuida en TensorFlow está disponible para obtener una descripción general de las estrategias de distribución que TensorFlow admite para aquellos interesados ​​en una comprensión más profunda de tf.distribute.Strategy API de tf.distribute.Strategy .

Preparar

Primero, configure TensorFlow y las importaciones necesarias.

 import os
import tensorflow as tf
import numpy as np
 

Preparando conjunto de datos

Ahora, preparemos el conjunto de datos MNIST. El conjunto de datos MNIST comprende 60,000 ejemplos de entrenamiento y 10,000 ejemplos de prueba de los dígitos escritos a mano 0–9, formateados como imágenes monocromas de 28x28 píxeles. En este ejemplo, tomaremos la parte de capacitación de los conjuntos de datos para demostrar.

 def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset
 

Construye el modelo Keras

Aquí usamos tf.keras.Sequential API para construir y compilar un modelo Keras de redes neuronales convolucionales simples para entrenar con nuestro conjunto de datos MNIST.

 def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
 

Primero intentemos entrenar el modelo para un pequeño número de épocas y observemos los resultados en un solo trabajador para asegurarnos de que todo funcione correctamente. Debería esperar ver que la pérdida se reduce y la precisión se acerca a 1.0 a medida que avanza la época.

 per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 0s 2ms/step - loss: 2.2701 - accuracy: 0.2451
Epoch 2/3
70/70 [==============================] - 0s 2ms/step - loss: 2.1827 - accuracy: 0.4777
Epoch 3/3
70/70 [==============================] - 0s 2ms/step - loss: 2.0865 - accuracy: 0.5955

<tensorflow.python.keras.callbacks.History at 0x7fc59381ac50>

Configuración de múltiples trabajadores

Ahora entremos al mundo de la capacitación de múltiples trabajadores. En TensorFlow, se TF_CONFIG la variable de entorno TF_CONFIG para el entrenamiento en múltiples máquinas, cada una de las cuales posiblemente tenga un rol diferente. TF_CONFIG es una cadena JSON utilizada para especificar la configuración del clúster en cada trabajador que forma parte del clúster.

Hay dos componentes de TF_CONFIG : cluster y task . cluster proporciona información sobre el clúster de formación, que es un dict que consta de diferentes tipos de trabajos, como el de worker . En la capacitación de múltiples trabajadores con MultiWorkerMirroredStrategy , generalmente hay un worker que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un worker regular. Dicho trabajador se conoce como el trabajador chief , y es habitual que el worker con index 0 sea designado como el worker principal (de hecho, así es como se implementa tf.distribute.Strategy ). task otro lado, la tarea proporciona información de la tarea actual. El primer cluster componentes es el mismo para todos los trabajadores, y la task del segundo componente es diferente en cada trabajador y especifica el type y el index de ese trabajador.

En este ejemplo, establecemos el type tarea en "worker" y el index tarea en 0 . Esto significa que la máquina que tiene esa configuración es el primer trabajador, que será designado como el trabajador principal y hará más trabajo que otros trabajadores. Tenga en cuenta que otras máquinas también necesitarán tener TF_CONFIG la variable de entorno TF_CONFIG , y debe tener el mismo dict cluster , pero diferente type tarea o index tareas dependiendo de cuáles sean los roles de esas máquinas.

Con fines ilustrativos, este tutorial muestra cómo se puede configurar un TF_CONFIG con 2 trabajadores en localhost . En la práctica, los usuarios crearían múltiples trabajadores en direcciones / puertos IP externos y establecerían TF_CONFIG en cada trabajador de manera adecuada.

 os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
 

Tenga en cuenta que si bien la tasa de aprendizaje es fija en este ejemplo, en general puede ser necesario ajustar la tasa de aprendizaje en función del tamaño de lote global.

Elige la estrategia correcta

En TensorFlow, la capacitación distribuida consiste en capacitación sincrónica, donde los pasos de la capacitación se sincronizan entre los trabajadores y las réplicas, y la capacitación asincrónica, donde los pasos de la capacitación no se sincronizan estrictamente.

MultiWorkerMirroredStrategy , que es la estrategia recomendada para la capacitación sincronizada de múltiples trabajadores, se demostrará en esta guía. Para entrenar el modelo, use una instancia de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy crea copias de todas las variables en las capas del modelo en cada dispositivo en todos los trabajadores. Utiliza CollectiveOps , una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas. La guía tf.distribute.Strategy tiene más detalles sobre esta estrategia.

 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy proporciona múltiples implementaciones a través del parámetro CollectiveCommunication . RING implementa colectivos basados ​​en anillos usando gRPC como la capa de comunicación entre hosts. NCCL usa NCCL Nvidia para implementar colectivos. AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y la interconexión de red en el clúster.

Entrene al modelo con MultiWorkerMirroredStrategy

Con la integración de tf.distribute.Strategy API en tf.keras , el único cambio que hará para distribuir la capacitación a múltiples trabajadores es encerrar la construcción de modelos y la llamada model.compile() dentro de model.compile() strategy.scope() . El alcance de la estrategia de distribución dicta cómo y dónde se crean las variables, y en el caso de MultiWorkerMirroredStrategy , las variables creadas son MirroredVariable s, y se replican en cada uno de los trabajadores.

 num_workers = 4

# Here the batch size scales up by number of workers since 
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, 
# and now this becomes 128.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
 
Epoch 1/3
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
70/70 [==============================] - 0s 3ms/step - loss: 2.2682 - accuracy: 0.2265
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1714 - accuracy: 0.4954
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.0638 - accuracy: 0.6232

<tensorflow.python.keras.callbacks.History at 0x7fc5f4f062e8>

Fragmento de conjunto de datos y tamaño de lote

En la capacitación de múltiples trabajadores con MultiWorkerMirroredStrategy , se necesita fragmentar el conjunto de datos para garantizar la convergencia y el rendimiento. Sin embargo, tenga en cuenta que en el fragmento de código anterior, los conjuntos de datos se pasan directamente a model.fit() sin necesidad de fragmentar; Esto se debe a que la API tf.distribute.Strategy se encarga del fragmentación del conjunto de datos automáticamente. Fragmenta el conjunto de datos a nivel de archivo que puede crear fragmentos sesgados. En casos extremos donde solo hay un archivo, solo el primer fragmento (es decir, el trabajador) obtendrá datos de capacitación o evaluación y, como resultado, todos los trabajadores obtendrán errores.

Si prefiere el fragmentación manual para su entrenamiento, el fragmentado automático se puede desactivar a través de la API tf.data.experimental.DistributeOptions . Concretamente

 options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
 

Otra cosa a tener en cuenta es el tamaño del lote para los datasets . En el fragmento de código anterior, utilizamos global_batch_size = per_worker_batch_size * num_workers , que es num_workers veces más grande que el caso para un solo trabajador, porque el tamaño de lote efectivo por trabajador es el tamaño de lote global (el parámetro pasado en tf.data.Dataset.batch() ) dividido por el número de trabajadores, y con este cambio mantenemos el tamaño de lote por trabajador igual que antes.

Evaluación

Si pasa validation_data a model.fit , alternará entre capacitación y evaluación para cada época. La evaluación que toma validation_data se distribuye en el mismo conjunto de trabajadores y los resultados de la evaluación se agregan y están disponibles para todos los trabajadores. Similar al entrenamiento, el conjunto de datos de validación se fragmenta automáticamente a nivel de archivo. Debe establecer un tamaño de lote global en el conjunto de datos de validación y establecer validation_steps . También se recomienda un conjunto de datos repetido para la evaluación.

Alternativamente, también puede crear otra tarea que lea periódicamente los puntos de control y ejecute la evaluación. Esto es lo que hace Estimator. Pero esta no es una forma recomendada de realizar una evaluación y, por lo tanto, se omiten sus detalles.

Predicción

Actualmente model.predict no funciona con MultiWorkerMirroredStrategy.

Actuación

Ahora tiene un modelo Keras que está configurado para ejecutarse en varios trabajadores con MultiWorkerMirroredStrategy . Puede probar las siguientes técnicas para ajustar el rendimiento de la capacitación de múltiples trabajadores con MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy proporciona múltiples implementaciones de comunicación colectiva . RING implementa colectivos basados ​​en anillos usando gRPC como la capa de comunicación entre hosts. NCCL usa NCCL Nvidia para implementar colectivos. AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y la interconexión de red en el clúster. Para anular la elección automática, especifique un valor válido para el parámetro de communication del constructor de MultiWorkerMirroredStrategy , por ejemplo, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .
  • Transmita las variables a tf.float si es posible. El modelo oficial de ResNet incluye un ejemplo de cómo se puede hacer esto.

Tolerancia a fallos

En el entrenamiento sincrónico, el clúster fallaría si uno de los trabajadores falla y no existe un mecanismo de recuperación de fallas. El uso de Keras con tf.distribute.Strategy tiene la ventaja de la tolerancia a fallas en los casos en que los trabajadores mueren o son inestables. Hacemos esto preservando el estado de entrenamiento en el sistema de archivos distribuido de su elección, de modo que al reiniciar la instancia que falló o se adelantó, el estado de entrenamiento se recupera.

Dado que todos los trabajadores se mantienen sincronizados en términos de épocas y pasos de capacitación, otros trabajadores tendrían que esperar a que el trabajador fallido o preventivo se reinicie para continuar.

Devolución de llamada de ModelCheckpoint

ModelCheckpoint devolución de llamada ModelCheckpoint ya no proporciona la funcionalidad de tolerancia a fallas, utilice la BackupAndRestore llamada BackupAndRestore lugar.

La ModelCheckpoint llamada ModelCheckpoint todavía se puede utilizar para guardar puntos de control. Pero con esto, si la capacitación se interrumpió o finalizó con éxito, para continuar la capacitación desde el punto de control, el usuario es responsable de cargar el modelo manualmente. Opcionalmente, el usuario puede elegir guardar y restaurar el modelo / peso fuera de la ModelCheckpoint llamada ModelCheckpoint .

Guardar y cargar modelos

Para guardar su modelo usando model.save o tf.saved_model.save , el destino para guardar debe ser diferente para cada trabajador. En el caso de los trabajadores no jefes, deberá guardar el modelo en un directorio temporal, y en el jefe, deberá guardarlo en el directorio del modelo proporcionado. Los directorios temporales en el trabajador deben ser únicos para evitar errores resultantes de que varios trabajadores intenten escribir en la misma ubicación. El modelo guardado en todos los directorios es idéntico y, por lo general, solo se debe hacer referencia al modelo guardado por el jefe para restaurarlo o servirlo. Le recomendamos que tenga alguna lógica de limpieza que elimine los directorios temporales creados por los trabajadores una vez que haya completado su capacitación.

La razón por la que necesita ahorrar en el jefe y los trabajadores al mismo tiempo, es porque puede estar agregando variables durante la verificación, lo que requiere que tanto el jefe como los trabajadores participen en el protocolo de comunicación de reducción total. Por otro lado, dejar que el jefe y los trabajadores guarden en el mismo directorio modelo generará errores debido a la contención.

Con MultiWorkerMirroredStrategy , el programa se ejecuta en cada trabajador, y para saber si el trabajador actual es el jefe, aprovechamos el objeto de resolución de clúster que tiene atributos task_type y task_id . task_type le dice cuál es el trabajo actual (por ejemplo, 'trabajador'), y task_id le dice el identificador del trabajador. El trabajador con id 0 se designa como el trabajador principal.

En el fragmento de código a continuación, write_filepath proporciona la ruta del archivo para escribir, que depende de la identificación del trabajador. En el caso del jefe (trabajador con id 0), escribe en la ruta del archivo original; para otros, crea un directorio temporal (con id en la ruta del directorio) para escribir:

 model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works 
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
 

Con eso, ahora estás listo para guardar:

 multi_worker_model.save(write_model_path)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Como describimos anteriormente, más adelante el modelo solo debe cargarse desde el camino en el que se guardó el jefe, así que eliminemos los temporales que los trabajadores no jefes guardaron:

 if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))
 

Ahora, cuando tf.keras.models.load_model el momento de cargar, usemos la conveniente API tf.keras.models.load_model y continuemos con más trabajo. Aquí, asumimos que solo usamos un solo trabajador para cargar y continuar la capacitación, en cuyo caso no llama a tf.keras.models.load_model dentro de otro strategy.scope() .

 loaded_model = tf.keras.models.load_model(model_path)

# Now that we have the model restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9825 - accuracy: 0.1102
Epoch 2/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9367 - accuracy: 0.1117

<tensorflow.python.keras.callbacks.History at 0x7fc5f4b0d8d0>

Punto de control guardar y restaurar

Por otro lado, la comprobación de puntos le permite guardar los pesos del modelo y restaurarlos sin tener que guardar todo el modelo. Aquí, creará un tf.train.Checkpoint que rastrea el modelo, que es administrado por un tf.train.CheckpointManager para que solo se conserve el último punto de control.

 checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
  checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
 

Una vez que el CheckpointManager está configurado, ahora está listo para guardar y eliminar los puntos de control guardados por los no jefes de trabajo.

 checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)
 

Ahora, cuando necesite restaurar, puede encontrar el último punto de control guardado utilizando la conveniente función tf.train.latest_checkpoint . Después de restaurar el punto de control, puede continuar con el entrenamiento.

 latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9841 - accuracy: 0.6561
Epoch 2/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9445 - accuracy: 0.6805

<tensorflow.python.keras.callbacks.History at 0x7fc5f49d9d30>

Devolución de llamada de BackupAndRestore

BackupAndRestore devolución de llamada proporciona la funcionalidad de tolerancia a fallos, mediante la copia del modelo y el número época actual en un archivo de controles temporal bajo backup_dir argumento para BackupAndRestore . Esto se hace al final de cada época.

Una vez que los trabajos se interrumpen y reinician, la devolución de llamada restaura el último punto de control y la capacitación continúa desde el comienzo de la época interrumpida. Cualquier entrenamiento parcial ya realizado en la época inacabada antes de la interrupción será descartado, de modo que no afecte el estado final del modelo.

Para usarlo, proporcione una instancia de tf.keras.callbacks.experimental.BackupAndRestore en la llamada tf.keras.Model.fit() .

Con MultiWorkerMirroredStrategy, si un trabajador se interrumpe, todo el clúster se detiene hasta que se reinicia el trabajador interrumpido. Otros trabajadores también se reiniciarán, y el trabajador interrumpido vuelve a unirse al clúster. Luego, cada trabajador lee el archivo de punto de control que se guardó previamente y recupera su estado anterior, lo que permite que el clúster vuelva a sincronizarse. Luego el entrenamiento continúa.

BackupAndRestore devolución de llamada BackupAndRestore usa CheckpointManager para guardar y restaurar el estado de entrenamiento, lo que genera un archivo llamado punto de control que rastrea los puntos de control existentes junto con el último. Por esta razón, backup_dir no debe reutilizarse para almacenar otros puntos de control para evitar la colisión de nombres.

Actualmente, la BackupAndRestore llamada de BackupAndRestore admite un solo trabajador sin estrategia, MirroredStrategy, y varios trabajadores con MultiWorkerMirroredStrategy. A continuación hay dos ejemplos para la capacitación de múltiples trabajadores y la capacitación de un solo trabajador.

 # Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
 
Epoch 1/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2837 - accuracy: 0.1836
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2131 - accuracy: 0.4091
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1310 - accuracy: 0.5485

<tensorflow.python.keras.callbacks.History at 0x7fc5f49a3080>

Si inspecciona el directorio de backup_dir que especificó en BackupAndRestore , puede observar algunos archivos de punto de control generados temporalmente. Esos archivos son necesarios para recuperar las instancias perdidas anteriormente, y la biblioteca los eliminará al final de tf.keras.Model.fit() al salir con éxito de su entrenamiento.

Ver también

  1. La guía Capacitación distribuida en TensorFlow proporciona una descripción general de las estrategias de distribución disponibles.
  2. Modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.
  3. La sección Rendimiento de la guía proporciona información sobre otras estrategias y herramientas que puede utilizar para optimizar el rendimiento de sus modelos TensorFlow.