Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

Bucle de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Visión general

Este tutorial muestra la formación de múltiples trabajador con API bucle de formación personalizada, distribuido a través de MultiWorkerMirroredStrategy, por lo que un modelo Keras diseñado para ejecutarse en un solo trabajador puede perfectamente trabajar en varios trabajadores con cambio de código mínima.

Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos dan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento. Una información más detallada está disponible en Grabación de un bucle de formación a partir de cero .

Si usted está buscando la manera de utilizar MultiWorkerMirroredStrategy con Keras model.fit , consulte este tutorial en su lugar.

Distribuido Formación en TensorFlow guía está disponible para una visión general de las estrategias de distribución TensorFlow soportes para los interesados en una comprensión más profunda de tf.distribute.Strategy API.

Configuración

Primero, algunas importaciones necesarias.

import json
import os
import sys

Antes de importar TensorFlow, realice algunos cambios en el entorno.

Deshabilite todas las GPU. Esto evita errores causados ​​por todos los trabajadores que intentan usar la misma GPU. Para una aplicación real, cada trabajador estaría en una máquina diferente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Restablecer el TF_CONFIG variable de entorno, verá más sobre esto más adelante.

os.environ.pop('TF_CONFIG', None)

Asegúrese de que el directorio actual esté en la ruta de Python. Esto permite que el bloc de notas para importar los archivos escritos por %%writefile más tarde.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ahora importa TensorFlow.

import tensorflow as tf

Definición de modelo y conjunto de datos

A continuación, cree una mnist.py archivo con un modelo simple y conjunto de datos de configuración. Este archivo de Python será utilizado por los procesos de trabajo en este tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configuración de varios trabajadores

Ahora entremos en el mundo de la formación de varios trabajadores. En TensorFlow, la TF_CONFIG se requiere variable de entorno para la formación en varios equipos, cada uno de los cuales posiblemente tiene un papel diferente. TF_CONFIG utiliza a continuación, es una cadena JSON usado para especificar la configuración de clúster en cada trabajador que es parte de la agrupación. Este es el método por defecto para especificar un clúster, utilizando cluster_resolver.TFConfigClusterResolver , pero hay otras opciones disponibles en el distribute.cluster_resolver módulo.

Describe tu clúster

Aquí hay una configuración de ejemplo:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí es lo mismo TF_CONFIG serializado como una cadena JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Hay dos componentes de TF_CONFIG : cluster y task .

  • cluster es el mismo para todos los trabajadores y proporciona información sobre el grupo de entrenamiento, el cual es un diccionario que consiste en diferentes tipos de puestos de trabajo como worker . En el entrenamiento de múltiples trabajador con MultiWorkerMirroredStrategy , por lo general hay un worker que lleva en un poco más de responsabilidad como el ahorro puesto de control y escribir archivo de resumen para TensorBoard además de lo que un habitual worker hace. Un trabajador de este tipo se denomina como el chief los trabajadores, y es costumbre que el worker con el index 0 es designado como el principal worker (de hecho esta es la forma tf.distribute.Strategy se implementa).

  • task proporciona información de la tarea actual y es diferente en cada trabajador. Se especifica el type y el index de este trabajador.

En este ejemplo, se establece la tarea type de "worker" y la tarea index a 0 . Esta máquina es el primer trabajador y será designado como el trabajador principal y hará más trabajo que los demás. Tenga en cuenta que otras máquinas tendrán que tener la TF_CONFIG variable establecida medio ambiente, así, y debe tener el mismo cluster dict, pero diferentes tareas type o tarea index dependiendo de cuáles son las funciones de esas máquinas son.

Con fines ilustrativos, Este tutorial muestra cómo se puede establecer un TF_CONFIG con 2 trabajadores en localhost . En la práctica, los usuarios podrían crear varios trabajadores de direcciones IP / puertos, y el conjunto externa TF_CONFIG de cada trabajador de manera apropiada.

En este ejemplo se utilizará 2 trabajadores, del primer trabajador TF_CONFIG se muestra arriba. Para el segundo trabajador deberá ajustar tf_config['task']['index']=1

Por encima, tf_config es sólo una variable local en pitón. Para realmente se utilicen para la formación de configuración, que necesita este diccionario para ser serializados como JSON, y se colocan en el TF_CONFIG variable de entorno.

Variables de entorno y subprocesos en cuadernos

Los subprocesos heredan las variables de entorno de su padre. Así que si se establece una variable de entorno en este jupyter notebook proceso:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Puede acceder a la variable de entorno desde un subproceso:

echo ${GREETINGS}
Hello TensorFlow!

En la siguiente sección, vamos a usar este para pasar el TF_CONFIG a los subprocesos de los trabajadores. Realmente nunca lanzaría sus trabajos de esta manera, pero es suficiente para los propósitos de este tutorial: Para demostrar un ejemplo mínimo de varios trabajadores.

MultiWorkerMirroredStrategy

Para entrenar el modelo, utilice una instancia de tf.distribute.MultiWorkerMirroredStrategy , que crea copias de todas las variables en las capas del modelo en cada dispositivo a través de todos los trabajadores. El tf.distribute.Strategy guía tiene más detalles sobre esta estrategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2021-11-23 02:29:16.957442: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:16.957748: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

Uso tf.distribute.Strategy.scope para especificar que una estrategia debe ser utilizado en la construcción de su modelo. Esto le pone en el " contexto cruzada réplica " de esta estrategia, lo que significa que la estrategia se pone en control de cosas como el posicionamiento variable.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Divida automáticamente sus datos entre trabajadores

En la formación de varios trabajadores, la fragmentación del conjunto de datos no es necesariamente necesaria; sin embargo, le proporciona una semántica de una sola vez, lo que hace que la formación sea más reproducible, es decir, la formación de varios trabajadores debería ser lo mismo que la formación de un trabajador. Nota: el rendimiento puede verse afectado en algunos casos.

Ver: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step

Definir bucle de entrenamiento personalizado y entrenar el modelo

Especificar un optimizador

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Definir un paso de entrenamiento con tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Guardar y restaurar puntos de control

La implementación de puntos de control en un ciclo de entrenamiento personalizado requiere que el usuario lo maneje en lugar de usar una devolución de llamada de keras. Le permite guardar los pesos del modelo y restaurarlos sin tener que guardar todo el modelo.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

A continuación, vamos a crear una tf.train.Checkpoint que sigue el modelo, que es administrado por un tf.train.CheckpointManager de manera que sólo el último punto de control se conserva.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Ahora, cuando se necesita restaurar, se puede encontrar las últimas punto de control guardado con el cómodo tf.train.latest_checkpoint función.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Después de restaurar el punto de control, puede continuar con el entrenamiento de su ciclo de entrenamiento personalizado.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2021-11-23 02:29:18.214294: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.826228, train_loss: 0.540131.
Epoch: 1, accuracy: 0.937946, train_loss: 0.207413.
Epoch: 2, accuracy: 0.960603, train_loss: 0.137420.

Configuración de código completo en los trabajadores

Para ejecutar realmente con MultiWorkerMirroredStrategy que necesita para ejecutar procesos de trabajo y pasar un TF_CONFIG a ellos.

Al igual que el mnist.py archivo escrito anteriormente, aquí está el main.py que contiene el mismo código que caminamos paso a paso anteriormente en este colab, sólo estamos escribiendo en un archivo por lo que cada uno de los trabajadores se ejecutarlo:

Archivo: main.py

Writing main.py

Capacitar y evaluar

El directorio actual ahora contiene ambos archivos Python:

ls *.py
main.py
mnist.py

Así JSON-serializar el TF_CONFIG y añadirlo a las variables de entorno:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora, puede iniciar un proceso de trabajo que ejecutará el main.py y utilizar el TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Hay algunas cosas a tener en cuenta sobre el comando anterior:

  1. Utiliza la %%bash que es una "mágica" portátil para ejecutar algunos comandos bash.
  2. Utiliza el --bg bandera para ejecutar el bash proceso en segundo plano, ya que este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.

El proceso de trabajo en segundo plano no se imprimirá la salida a este portátil, por lo que el &> vuelve a dirigir su salida a un archivo, para que pueda ver lo que pasó.

Entonces, espere unos segundos para que se inicie el proceso:

import time
time.sleep(20)

Ahora mire lo que se ha enviado al archivo de registro del trabajador hasta ahora:

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

La última línea del archivo de registro debe decir: Started server with target: grpc://localhost:12345 . El primer trabajador ahora está listo y está esperando que todos los demás trabajadores estén listos para continuar.

Así actualizar el tf_config para el proceso del segundo trabajador para recoger:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora inicie el segundo trabajador. Esto iniciará la capacitación, ya que todos los trabajadores están activos (por lo que no es necesario realizar un trasfondo de este proceso):

python main.py > /dev/null 2>&1

Ahora, si vuelve a verificar los registros escritos por el primer trabajador, verá que participó en la capacitación de ese modelo:

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration
2021-11-23 02:29:50.709898: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.820424, train_loss: 0.575663.
Epoch: 1, accuracy: 0.927344, train_loss: 0.241324.
Epoch: 2, accuracy: 0.953237, train_loss: 0.154762.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formación de varios trabajadores en profundidad

En este tutorial se ha demostrado una Custom Training Loop flujo de trabajo de la configuración multi-trabajador. Una descripción detallada de otros temas está disponible en la model.fit's guide de la configuración multi-trabajador y aplicable a los CTL.

Ver también

  1. Capacitación distribuidos en TensorFlow guía proporciona una visión general de las estrategias de distribución disponibles.
  2. Modelos oficiales , muchos de los cuales pueden ser configurados para ejecutar varias estrategias de distribución.
  3. La sección Rendimiento de la guía proporciona información sobre otras estrategias y herramientas que puede utilizar para optimizar el rendimiento de sus modelos TensorFlow.