Ciclo de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar libreta

Descripción general

Este tutorial demuestra la capacitación de varios trabajadores con una API de ciclo de capacitación personalizada, distribuida a través de MultiWorkerMirroredStrategy, por lo que un modelo de Keras diseñado para ejecutarse en un solo trabajador puede funcionar sin problemas en varios trabajadores con un cambio de código mínimo.

Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos brindan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento. Hay información más detallada disponible en Escritura de un ciclo de entrenamiento desde cero .

Si está buscando cómo usar MultiWorkerMirroredStrategy con keras model.fit , consulte este tutorial .

La guía Capacitación distribuida en TensorFlow está disponible para obtener una descripción general de las estrategias de distribución que admite TensorFlow para aquellos interesados ​​en una comprensión más profunda de las API de tf.distribute.Strategy .

Configuración

Primero, algunas importaciones necesarias.

import json
import os
import sys

Antes de importar TensorFlow, realice algunos cambios en el entorno.

Deshabilite todas las GPU. Esto evita errores causados ​​por los trabajadores que intentan usar la misma GPU. Para una aplicación real, cada trabajador estaría en una máquina diferente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Restablezca la variable de entorno TF_CONFIG , verá más sobre esto más adelante.

os.environ.pop('TF_CONFIG', None)

Asegúrese de que el directorio actual esté en la ruta de python. Esto permite que el portátil importe los archivos escritos por %%writefile más tarde.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ahora importe TensorFlow.

import tensorflow as tf

Definición de conjunto de datos y modelo

A continuación, cree un archivo mnist.py con un modelo simple y una configuración de conjunto de datos. Este archivo python será utilizado por los procesos de trabajo en este tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configuración de varios trabajadores

Ahora entremos en el mundo de la formación multitrabajador. En TensorFlow, la variable de entorno TF_CONFIG es necesaria para el entrenamiento en varias máquinas, cada una de las cuales posiblemente tenga una función diferente. TF_CONFIG , que se usa a continuación, es una cadena JSON que se usa para especificar la configuración del clúster en cada trabajador que forma parte del clúster. Este es el método predeterminado para especificar un clúster, utilizando cluster_resolver.TFConfigClusterResolver , pero hay otras opciones disponibles en el módulo distribute.cluster_resolver .

Describa su clúster

Aquí hay una configuración de ejemplo:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí está el mismo TF_CONFIG serializado como una cadena JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Hay dos componentes de TF_CONFIG : cluster y task .

  • cluster es el mismo para todos los trabajadores y brinda información sobre el clúster de capacitación, que es un dict que consta de diferentes tipos de trabajos, como worker . En la capacitación de varios trabajadores con MultiWorkerMirroredStrategy , generalmente hay un worker que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un worker normal. Dicho trabajador se denomina trabajador chief , y es costumbre que el worker con index 0 sea designado como worker jefe (de hecho, así es como se implementa tf.distribute.Strategy ).

  • task proporciona información de la tarea actual y es diferente en cada trabajador. Especifica el type y el index de ese trabajador.

En este ejemplo, establece el type de tarea en "worker" y el index de la tarea en 0 . Esta máquina es el primer trabajador y será designado como el trabajador principal y hará más trabajo que los demás. Tenga en cuenta que otras máquinas necesitarán tener configurada la variable de entorno TF_CONFIG también, y debería tener el mismo dict de cluster , pero diferente type de tarea o index de tarea dependiendo de cuáles sean las funciones de esas máquinas.

Con fines ilustrativos, este tutorial muestra cómo se puede configurar un TF_CONFIG con 2 trabajadores en localhost . En la práctica, los usuarios crearían varios trabajadores en puertos/direcciones IP externas y establecerían TF_CONFIG en cada trabajador de forma adecuada.

En este ejemplo, utilizará 2 trabajadores, el TF_CONFIG del primer trabajador se muestra arriba. Para el segundo trabajador, establecería tf_config['task']['index']=1

Arriba, tf_config es solo una variable local en python. Para usarlo realmente para configurar el entrenamiento, este diccionario debe serializarse como JSON y colocarse en la variable de entorno TF_CONFIG .

Variables de entorno y subprocesos en cuadernos

Los subprocesos heredan variables de entorno de su padre. Entonces, si configura una variable de entorno en este proceso de jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Puede acceder a la variable de entorno desde un subproceso:

echo ${GREETINGS}
Hello TensorFlow!

En la siguiente sección, usará esto para pasar TF_CONFIG a los subprocesos de trabajo. Realmente nunca lanzaría sus trabajos de esta manera, pero es suficiente para los propósitos de este tutorial: para demostrar un ejemplo mínimo de varios trabajadores.

MultiWorkerMirroredStrategy

Para entrenar el modelo, use una instancia de tf.distribute.MultiWorkerMirroredStrategy , que crea copias de todas las variables en las capas del modelo en cada dispositivo en todos los trabajadores. La guía tf.distribute.Strategy tiene más detalles sobre esta estrategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Use tf.distribute.Strategy.scope para especificar que se debe usar una estrategia al construir su modelo. Esto lo coloca en el " contexto de réplica cruzada " para esta estrategia, lo que significa que la estrategia tiene el control de cosas como la ubicación de variables.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Fragmentación automática de sus datos entre trabajadores

En el entrenamiento de varios trabajadores, la fragmentación del conjunto de datos no es necesariamente necesaria; sin embargo, le brinda una semántica exactamente una vez que hace que más entrenamiento sea más reproducible, es decir, el entrenamiento en varios trabajadores debe ser lo mismo que el entrenamiento en un trabajador. Nota: el rendimiento puede verse afectado en algunos casos.

Ver: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Definir bucle de entrenamiento personalizado y entrenar el modelo

Especificar un optimizador

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Definir un paso de entrenamiento con tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Guardar y restaurar puntos de control

La implementación de puntos de control en un ciclo de entrenamiento personalizado requiere que el usuario lo maneje en lugar de usar una devolución de llamada de keras. Le permite guardar los pesos del modelo y restaurarlos sin tener que guardar todo el modelo.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Aquí, creará un tf.train.Checkpoint que rastrea el modelo, que es administrado por un tf.train.CheckpointManager para que solo se conserve el último punto de control.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Ahora, cuando necesite restaurar, puede encontrar el último punto de control guardado usando la conveniente función tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Después de restaurar el punto de control, puede continuar entrenando su bucle de entrenamiento personalizado.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Configuración de código completo en trabajadores

Para ejecutar realmente con MultiWorkerMirroredStrategy , deberá ejecutar procesos de trabajo y pasarles un TF_CONFIG .

Al igual que el archivo mnist.py escrito anteriormente, aquí está el main.py que contiene el mismo código que vimos paso a paso anteriormente en esta colaboración, solo lo estamos escribiendo en un archivo para que cada uno de los trabajadores lo ejecute:

Archivo: main.py

Writing main.py

Capacitar y Evaluar

El directorio actual ahora contiene ambos archivos de Python:

ls *.py
main.py
mnist.py

Así que json serialice el TF_CONFIG y agréguelo a las variables de entorno:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora, puede iniciar un proceso de trabajo que ejecutará main.py y usará TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Hay algunas cosas a tener en cuenta sobre el comando anterior:

  1. Utiliza %%bash , que es un cuaderno "mágico" para ejecutar algunos comandos de bash.
  2. Utiliza el indicador --bg para ejecutar el proceso bash en segundo plano, porque este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.

El proceso de trabajo en segundo plano no imprimirá la salida en este cuaderno, por lo que &> redirige su salida a un archivo, para que pueda ver lo que sucedió.

Entonces, espere unos segundos para que el proceso se inicie:

import time
time.sleep(20)

Ahora mire lo que se ha enviado al archivo de registro del trabajador hasta ahora:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

La última línea del archivo de registro debería decir: Started server with target: grpc://localhost:12345 . El primer trabajador ahora está listo y está esperando que todos los demás trabajadores estén listos para continuar.

Así que actualice tf_config para que el proceso del segundo trabajador lo recoja:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora lanza el segundo trabajador. Esto iniciará la capacitación ya que todos los trabajadores están activos (por lo que no es necesario poner en segundo plano este proceso):

python main.py > /dev/null 2>&1

Ahora, si vuelve a verificar los registros escritos por el primer trabajador, verá que participó en el entrenamiento de ese modelo:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formación multitrabajador en profundidad

Este tutorial ha demostrado un flujo de trabajo de Custom Training Loop de la configuración de varios trabajadores. Una descripción detallada de otros temas está disponible en la model.fit's guide de la configuración de varios trabajadores y aplicable a los CTL.

Ver también

  1. La guía Capacitación distribuida en TensorFlow brinda una descripción general de las estrategias de distribución disponibles.
  2. Modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.
  3. La sección Rendimiento de la guía brinda información sobre otras estrategias y herramientas que puede usar para optimizar el rendimiento de sus modelos de TensorFlow.