Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

Formación de varios trabajadores con Keras

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Visión general

En este tutorial se muestra cómo realizar la formación distribuida multi-trabajador con un modelo Keras y la Model.fit API mediante el tf.distribute.Strategy -API específicamente la tf.distribute.MultiWorkerMirroredStrategy clase. Con la ayuda de esta estrategia, un modelo de Keras que fue diseñado para ejecutarse en un solo trabajador puede funcionar sin problemas en varios trabajadores con cambios mínimos en el código.

Para aquellos interesados en una comprensión más profunda de tf.distribute.Strategy API, la formación distribuida en TensorFlow guía está disponible para una visión general de las estrategias de distribución TensorFlow soportes.

Para aprender cómo utilizar el MultiWorkerMirroredStrategy con Keras y un circuito de entrenamiento personalizado, consulte bucle de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy .

Tenga en cuenta que el propósito de este tutorial es demostrar un ejemplo mínimo de varios trabajadores con dos trabajadores.

Configuración

Comience con algunas importaciones necesarias:

import json
import os
import sys

Antes de importar TensorFlow, realiza algunos cambios en el entorno:

  1. Deshabilite todas las GPU. Esto evita errores causados ​​por todos los trabajadores que intentan usar la misma GPU. En una aplicación del mundo real, cada trabajador estaría en una máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Restablecer el TF_CONFIG variable de entorno (aprenderá más sobre esto más adelante):
os.environ.pop('TF_CONFIG', None)
  1. Asegúrese de que el directorio actual está en camino de Python-esto permite que el bloc de notas para importar los archivos escritos por %%writefile más tarde:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Ahora importa TensorFlow:

import tensorflow as tf

Definición de modelo y conjunto de datos

A continuación, crear una mnist.py archivo con un modelo simple y conjunto de datos de configuración. Los procesos de trabajo utilizarán este archivo de Python en este tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Modelo de entrenamiento en un solo trabajador

Intenta entrenar el modelo para un pequeño número de épocas y observar los resultados de un solo trabajador para que todo funciona correctamente seguro. A medida que avanza el entrenamiento, la pérdida debería disminuir y la precisión debería aumentar.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

Configuración de varios trabajadores

Ahora entremos en el mundo de la formación de varios trabajadores.

Un clúster con trabajos y tareas.

En TensorFlow, formación distribuida implica: un 'cluster' con varios puestos de trabajo, y cada uno de los puestos de trabajo puede tener uno o más 'task' s.

Necesitará el TF_CONFIG variable de entorno de configuración para la formación en varios equipos, cada uno de los cuales posiblemente tiene un papel diferente. TF_CONFIG es una cadena JSON usado para especificar la configuración de clúster para cada trabajador que forma parte de la agrupación.

Hay dos componentes de un TF_CONFIG variables: 'cluster' y 'task' .

  • Un 'cluster' es el mismo para todos los trabajadores y proporciona información sobre el grupo de entrenamiento, el cual es un diccionario que consiste en diferentes tipos de puestos de trabajo, tales como 'worker' o 'chief' .

    • En el entrenamiento de múltiples trabajador con tf.distribute.MultiWorkerMirroredStrategy , por lo general hay un 'worker' que adquiere responsabilidades, tales como el ahorro de un puesto de control y escribir un archivo de resumen para TensorBoard, además de lo que un habitual 'worker' hace. Tal 'worker' se refiere como el jefe de los trabajadores (con un nombre de trabajo 'chief' ).
    • Es habitual que el 'chief' para tener 'index' 0 será asignado (de hecho, esta es la forma tf.distribute.Strategy se implementa).
  • Un 'task' proporciona información de la tarea actual y es diferente para cada trabajador. Se especifica el 'type' y 'index' de este trabajador.

A continuación se muestra una configuración de ejemplo:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí es lo mismo TF_CONFIG serializado como una cadena JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Tenga en cuenta que tf_config es sólo una variable local en Python. Para ser capaz de utilizarlo para una configuración de formación, este dict necesita ser serializado como JSON y se coloca en un TF_CONFIG variable de entorno.

En el ejemplo de configuración anterior, se establece la tarea 'type' a 'worker' y la tarea 'index' a 0 . Por lo tanto, esta máquina es el primer trabajador. Que será designado como el 'chief' de los trabajadores y hacer más trabajo que los otros.

Con fines ilustrativos, Este tutorial muestra cómo se puede configurar un TF_CONFIG variable con dos trabajadores en un localhost .

En la práctica, debe crear varios trabajadores de las direcciones IP / puertos externos y establecer un TF_CONFIG variable sobre cada trabajador en consecuencia.

En este tutorial, usará dos trabajadores:

  • El primero ( 'chief' ) del trabajador TF_CONFIG se muestra arriba.
  • Para el segundo trabajador, establecerá tf_config['task']['index']=1

Variables de entorno y subprocesos en cuadernos

Los subprocesos heredan las variables de entorno de su padre.

Por ejemplo, puede establecer una variable de entorno en este proceso de Jupyter Notebook de la siguiente manera:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Luego, puede acceder a la variable de entorno desde un subproceso:

echo ${GREETINGS}
Hello TensorFlow!

En la siguiente sección, vamos a usar un método similar para pasar el TF_CONFIG a los subprocesos de los trabajadores. En un escenario del mundo real, no lanzaría sus trabajos de esta manera, pero es suficiente en este ejemplo.

Elige la estrategia adecuada

En TensorFlow, hay dos formas principales de entrenamiento distribuido:

  • La formación sincrónica, donde los pasos de la formación se sincronizan a través de los trabajadores y réplicas, y
  • Formación asíncrona, donde los pasos de formación no están sincronizados en sentido estricto (por ejemplo, la formación del servidor de parámetros ).

En este tutorial se muestra cómo realizar la formación de múltiples trabajador sincrónica utilizando una instancia de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copias de todas las variables en las capas del modelo en cada dispositivo a través de todos los trabajadores. Utiliza CollectiveOps , un op TensorFlow para la comunicación colectiva, a los gradientes de agregados y mantener las variables en sincronía. El tf.distribute.Strategy guía tiene más detalles sobre esta estrategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy proporciona múltiples implementaciones a través de la CommunicationOptions parámetro: 1) RING implementos colectivos anillo basado utilizando GRPC como la capa de comunicación cruzada huésped; 2) NCCL utiliza la biblioteca de comunicación colectiva NVIDIA para implementar colectivos; y 3) AUTO aplaza la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y de la interconexión de red en el clúster.

Entrena el modelo

Con la integración de tf.distribute.Strategy API en tf.keras , el único cambio que hará para distribuir la formación a-varios trabajadores se encierra la construcción de modelos y model.compile() llamada dentro strategy.scope() . Dictados alcance de la estrategia de distribución y la forma en que se crean las variables, y en el caso de MultiWorkerMirroredStrategy , las variables son creadas MirroredVariable s, y que se replican en cada uno de los trabajadores.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Para ejecutar realmente con MultiWorkerMirroredStrategy que necesita para ejecutar procesos de trabajo y pasar un TF_CONFIG a ellos.

Al igual que el mnist.py archivo escrito anteriormente, aquí está el main.py que cada uno de los trabajadores se ejecutará:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

En el fragmento de código anterior nota de que el global_batch_size , que se pasa a Dataset.batch , se establece en per_worker_batch_size * num_workers . Esto asegura que cada trabajador procesa lotes de per_worker_batch_size ejemplos, independientemente del número de trabajadores.

El directorio actual ahora contiene ambos archivos Python:

ls *.py
main.py
mnist.py

Así JSON-serializar el TF_CONFIG y añadirlo a las variables de entorno:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora, puede iniciar un proceso de trabajo que ejecutará el main.py y utilizar el TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Hay algunas cosas a tener en cuenta sobre el comando anterior:

  1. Utiliza la %%bash que es una "mágica" portátil para ejecutar algunos comandos bash.
  2. Utiliza el --bg bandera para ejecutar el bash proceso en segundo plano, ya que este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.

El proceso de trabajo en segundo plano no se imprimirá la salida a este portátil, por lo que el &> vuelve a dirigir su salida a un archivo para que pueda inspeccionar lo que ocurrió en un archivo de registro más tarde.

Entonces, espere unos segundos para que se inicie el proceso:

import time
time.sleep(10)

Ahora, inspeccione lo que se ha enviado al archivo de registro del trabajador hasta ahora:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

La última línea del archivo de registro debe decir: Started server with target: grpc://localhost:12345 . El primer trabajador ahora está listo y está esperando que todos los demás trabajadores estén listos para continuar.

Así actualizar el tf_config para el proceso del segundo trabajador para recoger:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lanza el segundo trabajador. Esto iniciará la capacitación, ya que todos los trabajadores están activos (por lo que no es necesario realizar un trasfondo de este proceso):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

Si vuelve a verificar los registros escritos por el primer trabajador, sabrá que participó en la capacitación de ese modelo:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

Como era de esperar, este corrió más lento que el funcionamiento de prueba al comienzo de este tutorial.

Ejecutar varios trabajadores en una sola máquina solo agrega gastos generales.

El objetivo aquí no era mejorar el tiempo de formación, sino solo dar un ejemplo de formación de varios trabajadores.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formación de varios trabajadores en profundidad

Hasta ahora, ha aprendido a realizar una configuración básica para varios trabajadores.

Durante el resto del tutorial, aprenderá en detalle sobre otros factores, que pueden ser útiles o importantes para casos de uso reales.

Fragmentación de conjuntos de datos

En el entrenamiento de varios de trabajo, un conjunto de datos es necesaria para garantizar la convergencia sharding y rendimiento.

El ejemplo de la sección anterior se basa en la autosharding predeterminada proporcionada por la tf.distribute.Strategy API. Se puede controlar la sharding estableciendo el tf.data.experimental.AutoShardPolicy de los tf.data.experimental.DistributeOptions .

Para aprender más acerca de auto-sharding, consulte la guía de entrada distribuida .

Aquí está un ejemplo rápido de cómo activar la apertura automática sharding fuera, de modo que cada réplica procesa todos los ejemplos (no recomendado):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Evaluación

Si pasa el validation_data en Model.fit , alternará entre la formación y la evaluación de cada época. La evaluación de tomar la validation_data se distribuye a través del mismo conjunto de los trabajadores y de los resultados de la evaluación se agregan y disponible para todos los trabajadores.

Al igual que en el entrenamiento, el conjunto de datos de validación se fragmenta automáticamente a nivel de archivo. Es necesario establecer un tamaño de lote global en el conjunto de datos de validación y establecer los validation_steps .

También se recomienda un conjunto de datos repetido para la evaluación.

Alternativamente, también puede crear otra tarea que lea periódicamente los puntos de control y ejecute la evaluación. Esto es lo que hace Estimator. Pero esta no es una forma recomendada de realizar la evaluación y, por lo tanto, se omiten sus detalles.

Rendimiento

Ahora tiene un modelo Keras que está todo listo para funcionar en múltiples trabajadores con la MultiWorkerMirroredStrategy .

Para modificar el rendimiento de la formación de varios trabajadores, puede intentar lo siguiente:

  • tf.distribute.MultiWorkerMirroredStrategy proporciona múltiples implementaciones de comunicación colectiva :

    • RING colectivos basados en anillo implementos utilizando GRPC como la capa de comunicación cruzada huésped.
    • NCCL utiliza la biblioteca de comunicación colectiva NVIDIA para implementar colectivos.
    • AUTO aplaza la elección al tiempo de ejecución.

    La mejor opción de implementación colectiva depende de la cantidad de GPU, el tipo de GPU y la interconexión de red en el clúster. Para anular la elección automática, especifique la communication_options parámetro de MultiWorkerMirroredStrategy constructor 's. Por ejemplo:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Reparto de las variables a tf.float si es posible:

    • El modelo oficial ResNet incluye un ejemplo de cómo se puede hacer esto.

Tolerancia a fallos

En el entrenamiento sincrónico, el clúster fallaría si uno de los trabajadores falla y no existe ningún mecanismo de recuperación de fallas.

Usando Keras con tf.distribute.Strategy viene con la ventaja de la tolerancia a fallos en los casos en que los trabajadores mueren o son de otro modo inestable. Puede hacer esto conservando el estado de entrenamiento en el sistema de archivos distribuido de su elección, de modo que al reiniciar la instancia que falló o se adelantó anteriormente, se recupere el estado de entrenamiento.

Cuando un trabajador deja de estar disponible, otros trabajadores fallarán (posiblemente después de un tiempo de espera). En tales casos, el trabajador no disponible debe reiniciarse, así como otros trabajadores que han fallado.

Devolución de llamada de ModelCheckpoint

ModelCheckpoint devolución de llamada ya no proporciona la funcionalidad de tolerancia a fallos, por favor utilice BackupAndRestore llamada de retorno.

El ModelCheckpoint devolución de llamada se puede seguir utilizando para salvar los puestos de control. Pero con esto, si el entrenamiento fue interrumpido o finalizado exitosamente, para continuar entrenando desde el punto de control, el usuario es responsable de cargar el modelo manualmente.

Opcionalmente, el usuario puede optar por guardar y restaurar modelo / pesos fuera ModelCheckpoint de devolución de llamada.

Guardar y cargar modelo

Para guardar su modelo usando model.save o tf.saved_model.save , las necesidades de ahorro de destino sean diferentes para cada trabajador.

  • Para los trabajadores que no son jefes, deberá guardar el modelo en un directorio temporal.
  • Para el jefe, deberá guardar en el directorio de modelos proporcionado.

Los directorios temporales del trabajador deben ser únicos para evitar errores resultantes de que varios trabajadores intenten escribir en la misma ubicación.

El modelo guardado en todos los directorios es idéntico y, por lo general, solo se debe hacer referencia al modelo guardado por el jefe para restaurarlo o servirlo.

Debería tener alguna lógica de limpieza que elimine los directorios temporales creados por los trabajadores una vez que se haya completado su capacitación.

La razón para ahorrar en el jefe y los trabajadores al mismo tiempo es porque es posible que esté agregando variables durante los puntos de control, lo que requiere que tanto el jefe como los trabajadores participen en el protocolo de comunicación allreduce. Por otro lado, permitir que el jefe y los trabajadores guarden en el mismo directorio modelo dará lugar a errores debido a la contención.

Utilizando el MultiWorkerMirroredStrategy , el programa se ejecuta en cada trabajador, y con el fin de saber si el trabajador actual es el jefe, que se aprovecha del objeto de resolución de racimo que tiene atributos task_type y task_id :

  • task_type le dice lo que es el trabajo actual (por ejemplo, 'worker' ).
  • task_id le indica el identificador del trabajador.
  • El trabajador con task_id == 0 se designa como el jefe de los trabajadores.

En el fragmento de código siguiente, el write_filepath función proporciona la ruta del archivo de escritura, que depende de la del trabajador task_id :

  • Para el jefe del trabajador (con task_id == 0 ), se escribe en la ruta del archivo original.
  • Para los demás trabajadores, se crea un DIRECTORIO temporal temp_dir -con la task_id en la ruta de directorio para escribir en:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con eso, ahora está listo para ahorrar:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Como se describió anteriormente, más adelante, el modelo solo debe cargarse desde la ruta en la que se guardó el jefe, así que eliminemos los temporales que guardaron los trabajadores que no son jefes:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ahora, cuando llega el momento de la carga, vamos a usar conveniente tf.keras.models.load_model API, y continuar con el trabajo posterior.

Aquí, sólo se presuponen el uso de un solo trabajador de carga y continuar la formación, en cuyo caso no se llama a tf.keras.models.load_model dentro de otro strategy.scope() (nota que strategy = tf.distribute.MultiWorkerMirroredStrategy() , como se define anteriormente ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

Guardar y restaurar puntos de control

Por otro lado, los puntos de control le permiten guardar los pesos de su modelo y restaurarlos sin tener que guardar todo el modelo.

A continuación, vamos a crear una tf.train.Checkpoint que rastrea el modelo, que es administrado por el tf.train.CheckpointManager , de modo que sólo el último punto de control se conserva:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una vez que el CheckpointManager está configurado, usted está listo para guardar y eliminar los puntos de control de los trabajadores no habían guardado principales:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ahora, cuando se necesita para restaurar el modelo, se puede encontrar las últimas punto de control guardado con el cómodo tf.train.latest_checkpoint función. Después de restaurar el punto de control, puede continuar con el entrenamiento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

Devolución de llamada de BackupAndRestore

El tf.keras.callbacks.experimental.BackupAndRestore devolución de llamada proporciona la funcionalidad de tolerancia a fallos mediante la copia del modelo y el número época actual en un archivo de controles temporal bajo backup_dir argumento para BackupAndRestore . Esto se hace al final de cada época.

Una vez que los trabajos se interrumpen y se reinician, la devolución de llamada restaura el último punto de control y el entrenamiento continúa desde el comienzo de la época interrumpida. Cualquier entrenamiento parcial que ya se haya realizado en la época inacabada antes de la interrupción se desechará, de modo que no afecte el estado final del modelo.

Para usarlo, proporcionar una instancia de tf.keras.callbacks.experimental.BackupAndRestore en el Model.fit llamada.

Con MultiWorkerMirroredStrategy , si un trabajador se interrumpe, todo el conjunto se detiene hasta que se reinicie el trabajador interrumpe. Otros trabajadores también se reiniciarán y el trabajador interrumpido volverá a unirse al clúster. Luego, cada trabajador lee el archivo de punto de control que se guardó previamente y recupera su estado anterior, lo que permite que el clúster vuelva a sincronizarse. Luego, el entrenamiento continúa.

El BackupAndRestore devolución de llamada utiliza el CheckpointManager para guardar y restaurar el estado de entrenamiento, lo que genera un archivo llamado puesto de control que las pistas puntos de control existentes, junto con la más reciente. Por esta razón, backup_dir no debe ser re-utilizado para almacenar otros puestos de control con el fin de evitar la colisión nombre.

Actualmente, la BackupAndRestore devolución de llamada soporta solo trabajador sin una estrategia, MirroredStrategy, y multi-trabajador con MultiWorkerMirroredStrategy. A continuación, se muestran dos ejemplos de capacitación para trabajadores múltiples y capacitación para un solo trabajador.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

Si inspecciona el directorio de backup_dir especificó en BackupAndRestore , es posible que observe algunos archivos de control generadas de forma temporal. Se necesitan esos archivos para recuperar las instancias previamente perdidos, y serán retirados por la biblioteca al final de Model.fit al salir con éxito de su formación.

Recursos adicionales

  1. La formación distribuida en TensorFlow guía proporciona una visión general de las estrategias de distribución disponibles.
  2. El circuito de entrenamiento personalizado con Keras y MultiWorkerMirroredStrategy espectáculos tutorial de cómo utilizar el MultiWorkerMirroredStrategy con Keras y un circuito de entrenamiento personalizado.
  3. Echa un vistazo a los modelos oficiales , muchos de los cuales pueden ser configurados para ejecutar varias estrategias de distribución.
  4. El rendimiento mejor con tf.function guía proporciona información sobre otras estrategias y herramientas, tales como la TensorFlow Profiler se puede utilizar para optimizar el rendimiento de sus modelos TensorFlow.