Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

Formación de varios trabajadores con Keras

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar libreta

Descripción general

Este tutorial demuestra cómo realizar un entrenamiento distribuido de varios trabajadores con un modelo Keras y la API Model.fit usando la API tf.distribute.Strategy , específicamente la clase tf.distribute.MultiWorkerMirroredStrategy . Con la ayuda de esta estrategia, un modelo de Keras diseñado para ejecutarse en un solo trabajador puede funcionar sin problemas en varios trabajadores con cambios mínimos en el código.

Para aquellos interesados ​​en una comprensión más profunda de las API de tf.distribute.Strategy , la guía de capacitación distribuida en TensorFlow está disponible para obtener una descripción general de las estrategias de distribución compatibles con TensorFlow.

Para aprender a usar MultiWorkerMirroredStrategy con Keras y un bucle de entrenamiento personalizado, consulte Bucle de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy .

Tenga en cuenta que el propósito de este tutorial es demostrar un ejemplo mínimo de varios trabajadores con dos trabajadores.

Configuración

Comience con algunas importaciones necesarias:

import json
import os
import sys

Antes de importar TensorFlow, realice algunos cambios en el entorno:

  1. Deshabilite todas las GPU. Esto evita errores causados ​​por los trabajadores que intentan usar la misma GPU. En una aplicación del mundo real, cada trabajador estaría en una máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Restablezca la variable de entorno TF_CONFIG (aprenderá más sobre esto más adelante):
os.environ.pop('TF_CONFIG', None)
  1. Asegúrese de que el directorio actual esté en la ruta de Python; esto permite que el cuaderno importe los archivos escritos por %%writefile más tarde:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Ahora importa TensorFlow:

import tensorflow as tf

Definición de conjunto de datos y modelo

A continuación, cree un archivo mnist.py con un modelo simple y una configuración de conjunto de datos. Este archivo de Python será utilizado por los procesos de trabajo en este tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Modelo de formación en un solo trabajador

Intente entrenar el modelo para una pequeña cantidad de épocas y observe los resultados de un solo trabajador para asegurarse de que todo funcione correctamente. A medida que avanza el entrenamiento, la pérdida debe disminuir y la precisión debe aumentar.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-01-14 02:21:05.798076: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-14 02:21:05.798303: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Epoch 1/3
70/70 [==============================] - 1s 14ms/step - loss: 2.2710 - accuracy: 0.1937
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1970 - accuracy: 0.4402
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1147 - accuracy: 0.5864
<keras.callbacks.History at 0x7fd212d52a90>

Configuración de varios trabajadores

Ahora entremos en el mundo de la formación multitrabajador.

Un clúster con trabajos y tareas

En TensorFlow, el entrenamiento distribuido involucra: un 'cluster' con varios trabajos, y cada uno de los trabajos puede tener una o más 'task' .

Necesitará la variable de entorno de configuración TF_CONFIG para entrenar en varias máquinas, cada una de las cuales posiblemente tenga una función diferente. TF_CONFIG es una cadena JSON que se usa para especificar la configuración del clúster para cada trabajador que forma parte del clúster.

Hay dos componentes de una variable TF_CONFIG : 'cluster' y 'task' .

  • Un 'cluster' es el mismo para todos los trabajadores y brinda información sobre el cluster de capacitación, que es un dict que consta de diferentes tipos de trabajos, como 'worker' o 'chief' .

    • En la capacitación de varios trabajadores con tf.distribute.MultiWorkerMirroredStrategy , generalmente hay un 'worker' que asume responsabilidades, como guardar un punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un 'worker' normal. Dicho 'worker' se denomina trabajador jefe (con el nombre de trabajo 'chief' ).
    • Es habitual que el 'chief' tenga 'index' 0 asignado (de hecho, así es como se implementa tf.distribute.Strategy ).
  • Una 'task' proporciona información de la tarea actual y es diferente para cada trabajador. Especifica el 'type' y 'index' de ese trabajador.

A continuación se muestra una configuración de ejemplo:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí está el mismo TF_CONFIG serializado como una cadena JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Tenga en cuenta que tf_config es solo una variable local en Python. Para poder usarlo para una configuración de entrenamiento, este dictado debe serializarse como JSON y colocarse en una variable de entorno TF_CONFIG .

En el ejemplo de configuración anterior, establece la tarea 'type' en 'worker' y la tarea 'index' en 0 . Por lo tanto, esta máquina es el primer trabajador. Será designado como el trabajador 'chief' y hará más trabajo que los demás.

Con fines ilustrativos, este tutorial muestra cómo puede configurar una variable TF_CONFIG con dos trabajadores en un host localhost .

En la práctica, crearía varios trabajadores en puertos/direcciones IP externas y establecería una variable TF_CONFIG en cada trabajador en consecuencia.

En este tutorial, utilizará dos trabajadores:

  • El TF_CONFIG del primer trabajador ( 'chief' ) se muestra arriba.
  • Para el segundo trabajador, establecerá tf_config['task']['index']=1

Variables de entorno y subprocesos en cuadernos

Los subprocesos heredan variables de entorno de su padre.

Por ejemplo, puede establecer una variable de entorno en este proceso de Jupyter Notebook de la siguiente manera:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Luego, puede acceder a la variable de entorno desde un subproceso:

echo ${GREETINGS}
Hello TensorFlow!

En la siguiente sección, utilizará un método similar para pasar TF_CONFIG a los subprocesos de trabajo. En un escenario del mundo real, no lanzaría sus trabajos de esta manera, pero es suficiente en este ejemplo.

Elige la estrategia adecuada

En TensorFlow, hay dos formas principales de entrenamiento distribuido:

  • Entrenamiento sincrónico , donde los pasos del entrenamiento se sincronizan entre los trabajadores y las réplicas, y
  • Entrenamiento asíncrono , donde los pasos de entrenamiento no están estrictamente sincronizados (por ejemplo, entrenamiento del servidor de parámetros ).

Este tutorial demuestra cómo realizar un entrenamiento síncrono de varios trabajadores mediante una instancia de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copias de todas las variables en las capas del modelo en cada dispositivo de todos los trabajadores. Utiliza CollectiveOps , una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas. La guía tf.distribute.Strategy tiene más detalles sobre esta estrategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy proporciona múltiples implementaciones a través del parámetro CommunicationOptions : 1) RING implementa colectivos basados ​​en anillos usando gRPC como la capa de comunicación entre hosts; 2) NCCL utiliza la biblioteca de comunicación colectiva de NVIDIA para implementar colectivos; y 3) AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende de la cantidad y el tipo de GPU y la interconexión de red en el clúster.

entrenar al modelo

Con la integración de tf.distribute.Strategy API en tf.keras , el único cambio que hará para distribuir la capacitación a varios trabajadores es encerrar la construcción del modelo y la llamada model.compile() dentro de la strategy.scope() . El alcance de la estrategia de distribución dicta cómo y dónde se crean las variables y, en el caso de MultiWorkerMirroredStrategy , las variables creadas son MirroredVariable y se replican en cada uno de los trabajadores.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Para ejecutar realmente con MultiWorkerMirroredStrategy , deberá ejecutar procesos de trabajo y pasarles un TF_CONFIG .

Al igual que el archivo mnist.py escrito anteriormente, aquí está el main.py que ejecutará cada uno de los trabajadores:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

En el fragmento de código anterior, tenga en cuenta que global_batch_size , que se pasa a Dataset.batch , se establece en per_worker_batch_size * num_workers . Esto garantiza que cada trabajador procese lotes de ejemplos per_worker_batch_size independientemente del número de trabajadores.

El directorio actual ahora contiene ambos archivos de Python:

ls *.py
main.py
mnist.py

Así que json serialice el TF_CONFIG y agréguelo a las variables de entorno:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora, puede iniciar un proceso de trabajo que ejecutará main.py y usará TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Hay algunas cosas a tener en cuenta sobre el comando anterior:

  1. Utiliza %%bash , que es un cuaderno "mágico" para ejecutar algunos comandos de bash.
  2. Utiliza el indicador --bg para ejecutar el proceso bash en segundo plano, porque este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.

El proceso de trabajo en segundo plano no imprimirá la salida en este cuaderno, por lo que &> redirige su salida a un archivo para que pueda inspeccionar lo que sucedió en un archivo de registro más tarde.

Entonces, espere unos segundos para que el proceso se inicie:

import time
time.sleep(10)

Ahora, inspeccione lo que se ha enviado al archivo de registro del trabajador hasta el momento:

cat job_0.log
2022-01-14 02:21:12.858444: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-14 02:21:12.858673: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

La última línea del archivo de registro debería decir: Started server with target: grpc://localhost:12345 . El primer trabajador ahora está listo y está esperando que todos los demás trabajadores estén listos para continuar.

Así que actualice tf_config para que el proceso del segundo trabajador lo recoja:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Inicie el segundo trabajador. Esto iniciará la capacitación ya que todos los trabajadores están activos (por lo que no es necesario poner en segundo plano este proceso):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 55ms/step - loss: 2.2799 - accuracy: 0.1521
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2228 - accuracy: 0.3297
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1550 - accuracy: 0.4943
2022-01-14 02:21:22.933470: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-14 02:21:22.933684: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-14 02:21:23.870632: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-14 02:21:24.116271: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Si vuelve a verificar los registros escritos por el primer trabajador, aprenderá que participó en el entrenamiento de ese modelo:

cat job_0.log
2022-01-14 02:21:12.858444: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-14 02:21:12.858673: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-14 02:21:23.868234: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-14 02:21:24.105110: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 55ms/step - loss: 2.2799 - accuracy: 0.1521
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2228 - accuracy: 0.3297
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1550 - accuracy: 0.4943

Como era de esperar, esto funcionó más lento que la ejecución de prueba al comienzo de este tutorial.

Ejecutar varios trabajadores en una sola máquina solo agrega gastos generales.

El objetivo aquí no era mejorar el tiempo de formación, sino solo dar un ejemplo de formación de varios trabajadores.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formación multitrabajador en profundidad

Hasta ahora, ha aprendido a realizar una configuración básica de varios trabajadores.

Durante el resto del tutorial, aprenderá en detalle sobre otros factores que pueden ser útiles o importantes para casos de uso reales.

Fragmentación de conjuntos de datos

En el entrenamiento de varios trabajadores, se necesita la fragmentación del conjunto de datos para garantizar la convergencia y el rendimiento.

El ejemplo de la sección anterior se basa en la fragmentación automática predeterminada proporcionada por la API tf.distribute.Strategy . Puede controlar la fragmentación configurando tf.data.experimental.AutoShardPolicy de tf.data.experimental.DistributeOptions .

Para obtener más información sobre la fragmentación automática , consulte la guía de entrada distribuida .

Aquí hay un ejemplo rápido de cómo desactivar la fragmentación automática, para que cada réplica procese cada ejemplo ( no recomendado ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Evaluación

Si pasa los datos de validation_data a Model.fit , alternará entre entrenamiento y evaluación para cada época. La evaluación que toma los datos de validation_data se distribuye entre el mismo conjunto de trabajadores y los resultados de la evaluación se agregan y están disponibles para todos los trabajadores.

De forma similar al entrenamiento, el conjunto de datos de validación se fragmenta automáticamente a nivel de archivo. Debe establecer un tamaño de lote global en el conjunto de datos de validación y establecer la validation_steps .

También se recomienda un conjunto de datos repetido para la evaluación.

Como alternativa, también puede crear otra tarea que lea periódicamente los puntos de control y ejecute la evaluación. Esto es lo que hace Estimador. Pero esta no es una forma recomendada de realizar la evaluación y, por lo tanto, se omiten sus detalles.

Rendimiento

Ahora tiene un modelo de Keras que está configurado para ejecutarse en varios trabajadores con MultiWorkerMirroredStrategy .

Para modificar el rendimiento de la capacitación de varios trabajadores, puede probar lo siguiente:

  • tf.distribute.MultiWorkerMirroredStrategy proporciona múltiples implementaciones de comunicación colectiva :

    • RING implementa colectivos basados ​​en anillos mediante gRPC como capa de comunicación entre hosts.
    • NCCL utiliza la biblioteca de comunicación colectiva de NVIDIA para implementar colectivos.
    • AUTO difiere la elección al tiempo de ejecución.

    La mejor opción de implementación colectiva depende de la cantidad de GPU, el tipo de GPU y la interconexión de red en el clúster. Para anular la elección automática, especifique el parámetro communication_options del constructor de MultiWorkerMirroredStrategy . Por ejemplo:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Transmita las variables a tf.float si es posible:

    • El modelo oficial de ResNet incluye un ejemplo de cómo se puede hacer esto.

Tolerancia a fallos

En el entrenamiento síncrono, el clúster fallaría si uno de los trabajadores falla y no existe un mecanismo de recuperación de fallas.

El uso de Keras con tf.distribute.Strategy tiene la ventaja de la tolerancia a fallas en los casos en que los trabajadores mueren o son inestables. Puede hacer esto conservando el estado de entrenamiento en el sistema de archivos distribuido de su elección, de modo que al reiniciar la instancia que falló o se adelantó anteriormente, se recupera el estado de entrenamiento.

Cuando un trabajador deja de estar disponible, otros trabajadores fallarán (posiblemente después de un tiempo de espera). En tales casos, el trabajador no disponible debe reiniciarse, así como otros trabajadores que hayan fallado.

Devolución de llamada de ModelCheckpoint

La devolución de llamada ModelCheckpoint ya no proporciona la funcionalidad de tolerancia a fallas, utilice la devolución de llamada BackupAndRestore en su lugar.

La devolución de llamada ModelCheckpoint todavía se puede usar para guardar puntos de control. Pero con esto, si el entrenamiento se interrumpió o terminó con éxito, para continuar entrenando desde el punto de control, el usuario es responsable de cargar el modelo manualmente.

Opcionalmente, el usuario puede optar por guardar y restaurar modelos/pesos fuera de la devolución de llamada de ModelCheckpoint .

Guardar y cargar modelos

Para guardar su modelo usando model.save o tf.saved_model.save , el destino de guardado debe ser diferente para cada trabajador.

  • Para los trabajadores que no son jefes, deberá guardar el modelo en un directorio temporal.
  • Para el jefe, deberá guardar en el directorio de modelos provisto.

Los directorios temporales en el trabajador deben ser únicos para evitar errores resultantes de varios trabajadores que intentan escribir en la misma ubicación.

El modelo guardado en todos los directorios es idéntico y, por lo general, solo se debe hacer referencia al modelo guardado por el jefe para restaurarlo o servirlo.

Debe tener alguna lógica de limpieza que elimine los directorios temporales creados por los trabajadores una vez que se haya completado su capacitación.

La razón para ahorrar en el jefe y los trabajadores al mismo tiempo es que podría estar agregando variables durante el control, lo que requiere que tanto el jefe como los trabajadores participen en el protocolo de comunicación de allreduce. Por otro lado, permitir que el jefe y los trabajadores guarden en el mismo directorio de modelo generará errores debido a la contención.

Con MultiWorkerMirroredStrategy , el programa se ejecuta en cada trabajador y, para saber si el trabajador actual es el jefe, aprovecha el objeto de resolución del clúster que tiene los atributos task_type y task_id :

  • task_type le dice cuál es el trabajo actual (por ejemplo 'worker' ).
  • task_id le dice el identificador del trabajador.
  • El trabajador con task_id == 0 se designa como trabajador principal.

En el fragmento de código a continuación, la función write_filepath proporciona la ruta del archivo para escribir, que depende del task_id del trabajador:

  • Para el trabajador principal (con task_id == 0 ), escribe en la ruta del archivo original.
  • Para otros trabajadores, crea un directorio temporal, temp_dir , con task_id en la ruta del directorio para escribir:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con eso, ya está listo para guardar:

multi_worker_model.save(write_model_path)
2022-01-14 02:21:39.251087: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Como se describió anteriormente, más adelante, el modelo solo debe cargarse desde la ruta en la que se guardó el jefe, así que eliminemos los temporales que guardaron los trabajadores que no son jefes:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ahora, cuando sea el momento de cargar, usemos la conveniente API tf.keras.models.load_model y continuemos con el trabajo adicional.

Aquí, suponga que solo usa un trabajador para cargar y continuar con la capacitación, en cuyo caso no llama a tf.keras.models.load_model dentro de otra strategy.scope() (tenga en cuenta que la strategy = tf.distribute.MultiWorkerMirroredStrategy() , como se definió anteriormente ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 14ms/step - loss: 2.2892 - accuracy: 0.0031
Epoch 2/2
20/20 [==============================] - 0s 14ms/step - loss: 2.2636 - accuracy: 0.0016
<keras.callbacks.History at 0x7fd212444590>

Guardar y restaurar puntos de control

Por otro lado, la creación de puntos de control le permite guardar los pesos de su modelo y restaurarlos sin tener que guardar todo el modelo.

Aquí, creará un tf.train.Checkpoint que rastrea el modelo, que es administrado por tf.train.CheckpointManager , de modo que solo se conserve el último punto de control:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una vez que se configura el CheckpointManager , ahora está listo para guardar y eliminar los puntos de control que los trabajadores que no son jefes habían guardado:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ahora, cuando necesite restaurar el modelo, puede encontrar el último punto de control guardado usando la conveniente función tf.train.latest_checkpoint . Después de restaurar el punto de control, puede continuar con el entrenamiento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-01-14 02:21:41.227409: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-01-14 02:21:41.604068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 14ms/step - loss: 2.2893 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 14ms/step - loss: 2.2680 - accuracy: 0.0039
<keras.callbacks.History at 0x7fd211c4f150>

Devolución de llamada de copia de seguridad y restauración

La devolución de llamada tf.keras.callbacks.BackupAndRestore proporciona la funcionalidad de tolerancia a fallas al hacer una copia de seguridad del modelo y el número de época actual en un archivo de punto de control temporal bajo el argumento backup_dir para BackupAndRestore . Esto se hace al final de cada época.

Una vez que los trabajos se interrumpen y se reinician, la devolución de llamada restaura el último punto de control y el entrenamiento continúa desde el comienzo de la época interrumpida. Cualquier entrenamiento parcial ya realizado en la época inacabada antes de la interrupción se descartará, para que no afecte el estado final del modelo.

Para usarlo, proporcione una instancia de tf.keras.callbacks.BackupAndRestore en la llamada Model.fit .

Con MultiWorkerMirroredStrategy , si se interrumpe a un trabajador, todo el clúster se detiene hasta que se reinicia el trabajador interrumpido. Otros trabajadores también se reiniciarán y el trabajador interrumpido vuelve a unirse al clúster. Luego, cada trabajador lee el archivo de punto de control que se guardó previamente y recupera su estado anterior, lo que permite que el clúster vuelva a estar sincronizado. Luego, el entrenamiento continúa.

La devolución de llamada BackupAndRestore usa CheckpointManager para guardar y restaurar el estado de entrenamiento, lo que genera un archivo llamado punto de control que rastrea los puntos de control existentes junto con el último. Por esta razón, backup_dir no debe reutilizarse para almacenar otros puntos de control para evitar la colisión de nombres.

Actualmente, la devolución de llamada de BackupAndRestore admite un solo trabajador sin estrategia, MirroredStrategy y varios trabajadores con MultiWorkerMirroredStrategy. A continuación, se muestran dos ejemplos de capacitación para varios trabajadores y capacitación para un solo trabajador.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-01-14 02:21:44.822708: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 14ms/step - loss: 2.2857 - accuracy: 0.1960
Epoch 2/3
70/70 [==============================] - 1s 15ms/step - loss: 2.2336 - accuracy: 0.3212
Epoch 3/3
70/70 [==============================] - 1s 15ms/step - loss: 2.1733 - accuracy: 0.4092
<keras.callbacks.History at 0x7fd212d9b990>

Si inspecciona el directorio de backup_dir que especificó en BackupAndRestore , es posible que observe algunos archivos de puntos de control generados temporalmente. Esos archivos son necesarios para recuperar las instancias perdidas anteriormente, y la biblioteca los eliminará al final de Model.fit al salir con éxito de su entrenamiento.

Recursos adicionales

  1. La guía de capacitación distribuida en TensorFlow proporciona una descripción general de las estrategias de distribución disponibles.
  2. El tutorial de bucle de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy muestra cómo usar MultiWorkerMirroredStrategy con Keras y un bucle de entrenamiento personalizado.
  3. Consulte los modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.
  4. La guía Mejor rendimiento con tf.function brinda información sobre otras estrategias y herramientas, como TensorFlow Profiler , que puede usar para optimizar el rendimiento de sus modelos de TensorFlow.