¡Google I / O regresa del 18 al 20 de mayo! Reserva espacio y crea tu horario Regístrate ahora

Formación de varios trabajadores con Keras

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Descripción general

Este tutorial demuestra el entrenamiento distribuido de múltiples trabajadores con el modelo Keras usando la API tf.distribute.Strategy , específicamente tf.distribute.MultiWorkerMirroredStrategy . Con la ayuda de esta estrategia, un modelo de Keras que fue diseñado para ejecutarse en un solo trabajador puede trabajar sin problemas en varios trabajadores con un cambio mínimo de código.

La guía Distributed Training in TensorFlow está disponible para obtener una descripción general de las estrategias de distribución que admite TensorFlow para aquellos interesados ​​en una comprensión más profunda de tf.distribute.Strategy API de tf.distribute.Strategy .

Configuración

Primero, algunas importaciones necesarias.

import json
import os
import sys

Antes de importar TensorFlow, realice algunos cambios en el entorno.

Desactive todas las GPU. Esto evita errores causados ​​por todos los trabajadores que intentan usar la misma GPU. Para una aplicación real, cada trabajador estaría en una máquina diferente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Restablezca la variable de entorno TF_CONFIG , verá más sobre esto más adelante.

os.environ.pop('TF_CONFIG', None)

Asegúrese de que el directorio actual esté en la ruta de Python. Esto permite que el cuaderno importe los archivos escritos por %%writefile más tarde.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ahora importa TensorFlow.

import tensorflow as tf

Definición de modelo y conjunto de datos

A continuación, cree un archivo mnist.py con una configuración simple de modelo y conjunto de datos. Este archivo de Python será utilizado por los procesos de trabajo en este tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Intente entrenar el modelo para una pequeña cantidad de épocas y observe los resultados de un solo trabajador para asegurarse de que todo funcione correctamente. A medida que avanza el entrenamiento, la pérdida debería disminuir y la precisión debería aumentar.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2959 - accuracy: 0.0977
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2311 - accuracy: 0.2726
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1668 - accuracy: 0.4236
<tensorflow.python.keras.callbacks.History at 0x7f62c6ec0780>

Configuración de varios trabajadores

Ahora entremos en el mundo de la formación de varios trabajadores. En TensorFlow, la variable de entorno TF_CONFIG es necesaria para el entrenamiento en varias máquinas, cada una de las cuales posiblemente tenga una función diferente. TF_CONFIG es una cadena JSON que se utiliza para especificar la configuración del clúster en cada trabajador que forma parte del clúster.

Aquí hay una configuración de ejemplo:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí está el mismo TF_CONFIG serializado como una cadena JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Hay dos componentes de TF_CONFIG : cluster y task .

  • cluster es el mismo para todos los trabajadores y proporciona información sobre el cluster de capacitación, que es un diccionario que consta de diferentes tipos de trabajos, como worker . En la capacitación de varios trabajadores con MultiWorkerMirroredStrategy , generalmente hay un worker que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un worker normal. A dicho trabajador se le conoce como trabajador chief , y es habitual que el worker con index 0 sea designado como worker principal (de hecho, así es como se implementa tf.distribute.Strategy ).

  • task proporciona información de la tarea actual y es diferente en cada trabajador. Especifica el type y el index de ese trabajador.

En este ejemplo, estableces el type tarea en "worker" y el index la tarea en 0 . Esta máquina es el primer trabajador y será designado como el trabajador principal y hará más trabajo que los demás. Tenga en cuenta que otras máquinas también necesitarán tener TF_CONFIG la variable de entorno TF_CONFIG , y debe tener el mismo dict de cluster , pero diferente type tarea o index tarea dependiendo de los roles de esas máquinas.

Con fines ilustrativos, este tutorial muestra cómo se puede configurar un TF_CONFIG con 2 trabajadores en localhost . En la práctica, los usuarios crearían varios trabajadores en direcciones / puertos IP externos y establecerían TF_CONFIG en cada trabajador de forma adecuada.

En este ejemplo, usará 2 trabajadores, el TF_CONFIG del primer trabajador se muestra arriba. Para el segundo trabajador, establecería tf_config['task']['index']=1

Arriba, tf_config es solo una variable local en Python. Para usarlo realmente para configurar el entrenamiento, este diccionario debe serializarse como JSON y colocarse en la variable de entorno TF_CONFIG .

Variables de entorno y subprocesos en cuadernos

Los subprocesos heredan las variables de entorno de su padre. Entonces, si establece una variable de entorno en este proceso de jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Puede acceder a la variable de entorno desde un subproceso:

echo ${GREETINGS}
Hello TensorFlow!

En la siguiente sección, usará esto para pasar TF_CONFIG a los subprocesos de trabajo. Realmente nunca lanzaría sus trabajos de esta manera, pero es suficiente para los propósitos de este tutorial: Para demostrar un ejemplo mínimo de varios trabajadores.

Elige la estrategia adecuada

En TensorFlow hay dos formas principales de entrenamiento distribuido:

  • Entrenamiento sincrónico, donde los pasos del entrenamiento se sincronizan entre los trabajadores y las réplicas, y
  • Entrenamiento asincrónico, donde los pasos de entrenamiento no están estrictamente sincronizados.

MultiWorkerMirroredStrategy , que es la estrategia recomendada para la formación sincrónica de varios trabajadores, se demostrará en esta guía. Para entrenar el modelo, use una instancia de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copias de todas las variables en las capas del modelo en cada dispositivo en todos los trabajadores. Utiliza CollectiveOps , una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas. La guía tf.distribute.Strategy tiene más detalles sobre esta estrategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy proporciona múltiples implementaciones a través del parámetro CommunicationOptions . RING implementa colectivos basados ​​en anillos que utilizan gRPC como capa de comunicación entre hosts. NCCL utiliza NCCL Nvidia para implementar colectivos. AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y de la interconexión de red en el clúster.

Entrena el modelo

Con la integración de la API tf.distribute.Strategy en tf.keras , el único cambio que hará para distribuir la capacitación a varios trabajadores es incluir la construcción del modelo y la llamada model.compile() dentro de strategy.scope() . El alcance de la estrategia de distribución dicta cómo y dónde se crean las variables, y en el caso de MultiWorkerMirroredStrategy , las variables creadas son MirroredVariable y se replican en cada uno de los trabajadores.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Para ejecutar realmente MultiWorkerMirroredStrategy , deberá ejecutar procesos de trabajo y pasarles un TF_CONFIG .

Al igual que el archivo mnist.py escrito anteriormente, aquí está el main.py que main.py cada uno de los trabajadores:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

En el fragmento de código anterior, observe que global_batch_size , que se pasa a Dataset.batch , se establece en per_worker_batch_size * num_workers . Esto asegura que cada trabajador procese lotes de ejemplos per_worker_batch_size independientemente del número de trabajadores.

El directorio actual ahora contiene ambos archivos Python:

ls *.py
main.py
mnist.py

Entonces json- TF_CONFIG y agréguelo a las variables de entorno:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora, puede iniciar un proceso de trabajo que ejecutará main.py y usará TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Hay algunas cosas a tener en cuenta sobre el comando anterior:

  1. Utiliza %%bash que es una "magia" de cuaderno para ejecutar algunos comandos de bash.
  2. Utiliza la bandera --bg para ejecutar el proceso bash en segundo plano, porque este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.

El proceso de trabajo en segundo plano no imprimirá la salida en este cuaderno, por lo que &> redirige su salida a un archivo, para que pueda ver lo que sucedió.

Entonces, espere unos segundos para que se inicie el proceso:

import time
time.sleep(10)

Ahora mire lo que se ha enviado al archivo de registro del trabajador hasta ahora:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

La última línea del archivo de registro debería decir: Started server with target: grpc://localhost:12345 . El primer trabajador ahora está listo y está esperando que todos los demás trabajadores estén listos para continuar.

Así que actualice tf_config para que el proceso del segundo trabajador lo recoja:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora inicie el segundo trabajador. Esto iniciará la capacitación, ya que todos los trabajadores están activos (por lo que no es necesario que este proceso en segundo plano):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511
2021-02-23 02:20:43.794926: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:45.375845: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:45.376779: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:46.347650: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:46.347716: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347726: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347847: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:46.347887: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:46.347898: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:46.348715: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:46.349096: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.349700: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.353497: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:46.353936: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-02-23 02:20:47.285814: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.507974: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508360: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz

Ahora, si vuelve a verificar los registros escritos por el primer trabajador, verá que participó en la capacitación de ese modelo:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-02-23 02:20:47.286117: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.508657: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508964: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

Como era de esperar, esto funcionó más lento que la ejecución de prueba al comienzo de este tutorial. Ejecutar varios trabajadores en una sola máquina solo agrega gastos generales. El objetivo aquí no era mejorar el tiempo de formación, sino solo dar un ejemplo de formación de varios trabajadores.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formación de varios trabajadores en profundidad

Hasta ahora, este tutorial ha demostrado una configuración básica para varios trabajadores. El resto de este documento analiza en detalle otros factores que pueden ser útiles o importantes para casos de uso reales.

Fragmentación de conjuntos de datos

En la formación de varios trabajadores, se necesita la fragmentación del conjunto de datos para garantizar la convergencia y el rendimiento.

El ejemplo de la sección anterior se basa en la fragmentación automática predeterminada proporcionada por la API tf.distribute.Strategy . Puede controlar la fragmentación configurando tf.data.experimental.AutoShardPolicy de tf.data.experimental.DistributeOptions . Para obtener más información sobre la fragmentación automática, consulte la guía de entrada distribuida .

A continuación, se muestra un ejemplo rápido de cómo desactivar la fragmentación automática, para que cada réplica procese todos los ejemplos (no recomendado):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Evaluación

Si pasa validation_data a model.fit , se alternará entre entrenamiento y evaluación para cada época. La evaluación que toma validation_data se distribuye entre el mismo conjunto de trabajadores y los resultados de la evaluación se agregan y están disponibles para todos los trabajadores. De manera similar al entrenamiento, el conjunto de datos de validación se fragmenta automáticamente a nivel de archivo. Debe establecer un tamaño de lote global en el conjunto de datos de validación y establecer validation_steps . También se recomienda un conjunto de datos repetido para la evaluación.

Alternativamente, también puede crear otra tarea que lea periódicamente los puntos de control y ejecute la evaluación. Esto es lo que hace Estimator. Pero esta no es una forma recomendada de realizar la evaluación y, por lo tanto, se omiten sus detalles.

Actuación

Ahora tiene un modelo de Keras que está configurado para ejecutarse en varios trabajadores con MultiWorkerMirroredStrategy . Puede probar las siguientes técnicas para ajustar el rendimiento de la formación de varios trabajadores con MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy proporciona múltiples implementaciones de comunicación colectiva . RING implementa colectivos basados ​​en anillos que utilizan gRPC como capa de comunicación entre hosts. NCCL utiliza NCCL Nvidia para implementar colectivos. AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y de la interconexión de red en el clúster. Para anular la elección automática, especifique el parámetro communication_options del constructor de MultiWorkerMirroredStrategy , por ejemplo, communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • Transmita las variables a tf.float si es posible. El modelo oficial de ResNet incluye un ejemplo de cómo se puede hacer esto.

Tolerancia a fallos

En el entrenamiento sincrónico, el clúster fallaría si uno de los trabajadores falla y no existe ningún mecanismo de recuperación de fallas. El uso de Keras con tf.distribute.Strategy tiene la ventaja de la tolerancia a fallas en los casos en que los trabajadores mueren o son inestables. Para ello, conserva el estado de entrenamiento en el sistema de archivos distribuido de su elección, de modo que al reiniciar la instancia que falló o se adelantó anteriormente, se recupere el estado de entrenamiento.

Cuando un trabajador deja de estar disponible, otros trabajadores fallarán (posiblemente después de un tiempo de espera). En tales casos, es necesario reiniciar el trabajador no disponible, así como otros trabajadores que han fallado.

Devolución de llamada de ModelCheckpoint

ModelCheckpoint devolución de llamada de ModelCheckpoint ya no proporciona la funcionalidad de tolerancia a fallas, utilice la BackupAndRestore llamada de BackupAndRestore lugar.

La ModelCheckpoint llamada ModelCheckpoint todavía se puede usar para guardar puntos de control. Pero con esto, si el entrenamiento se interrumpió o finalizó con éxito, para continuar entrenando desde el punto de control, el usuario es responsable de cargar el modelo manualmente.

Opcionalmente, el usuario puede optar por guardar y restaurar modelos / pesos fuera de la ModelCheckpoint llamada de ModelCheckpoint .

Guardar y cargar modelo

Para guardar su modelo con model.save o tf.saved_model.save , el destino para guardar debe ser diferente para cada trabajador. En los trabajadores que no son jefes, deberá guardar el modelo en un directorio temporal, y en el jefe, deberá guardar en el directorio de modelos proporcionado. Los directorios temporales del trabajador deben ser únicos para evitar errores resultantes de que varios trabajadores intenten escribir en la misma ubicación. El modelo guardado en todos los directorios es idéntico y, por lo general, solo se debe hacer referencia al modelo guardado por el jefe para restaurarlo o servirlo. Debería tener alguna lógica de limpieza que elimine los directorios temporales creados por los trabajadores una vez que se haya completado su capacitación.

La razón por la que necesita ahorrar en el jefe y los trabajadores al mismo tiempo es porque es posible que esté agregando variables durante los puntos de control, lo que requiere que tanto el jefe como los trabajadores participen en el protocolo de comunicación allreduce. Por otro lado, permitir que el jefe y los trabajadores guarden en el mismo directorio modelo dará lugar a errores debido a la contención.

Con MultiWorkerMirroredStrategy , el programa se ejecuta en cada trabajador y, para saber si el trabajador actual es el jefe, aprovecha el objeto de resolución de clúster que tiene los atributos task_type y task_id . task_type le dice cuál es el trabajo actual (por ejemplo, 'trabajador'), y task_id le dice el identificador del trabajador. El trabajador con id 0 se designa como trabajador principal.

En el fragmento de código a continuación, write_filepath proporciona la ruta del archivo para escribir, que depende de la identificación del trabajador. En el caso de jefe (trabajador con id 0), escribe en la ruta del archivo original; para otros, crea un directorio temporal (con id en la ruta del directorio) para escribir:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con eso, ahora está listo para ahorrar:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Como se describió anteriormente, más adelante, el modelo solo debe cargarse desde la ruta en la que se guardó el jefe, así que eliminemos los temporales que guardaron los trabajadores que no son jefes:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ahora, cuando tf.keras.models.load_model el momento de cargar, usemos la conveniente API tf.keras.models.load_model y continuemos con el trabajo. Aquí, suponga que solo usa un solo trabajador para cargar y continuar el entrenamiento, en cuyo caso no llama a tf.keras.models.load_model dentro de otra strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 13ms/step - loss: 2.3041 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2873 - accuracy: 0.0023
<tensorflow.python.keras.callbacks.History at 0x7f62c4ef5048>

Guardar y restaurar puntos de control

Por otro lado, los puntos de control le permiten guardar los pesos del modelo y restaurarlos sin tener que guardar todo el modelo. Aquí, creará un tf.train.Checkpoint que rastrea el modelo, que es administrado por un tf.train.CheckpointManager para que solo se conserve el último punto de control.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una vez que el CheckpointManager está configurado, ahora está listo para guardar y eliminar los puntos de control guardados por los trabajadores que no son jefes.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ahora, cuando necesite restaurar, puede encontrar el último punto de control guardado utilizando la conveniente función tf.train.latest_checkpoint . Después de restaurar el punto de control, puede continuar con el entrenamiento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.3050 - accuracy: 0.0920
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2963 - accuracy: 0.0896
<tensorflow.python.keras.callbacks.History at 0x7f62c44a2710>

Devolución de llamada de BackupAndRestore

BackupAndRestore devolución de llamada proporciona la funcionalidad de tolerancia a fallos, mediante la copia del modelo y el número época actual en un archivo de controles temporal bajo backup_dir argumento para BackupAndRestore . Esto se hace al final de cada época.

Una vez que los trabajos se interrumpen y se reinician, la devolución de llamada restaura el último punto de control y el entrenamiento continúa desde el comienzo de la época interrumpida. Cualquier entrenamiento parcial que ya se haya realizado en la época inacabada antes de la interrupción se desechará, de modo que no afecte el estado final del modelo.

Para usarlo, proporcione una instancia de tf.keras.callbacks.experimental.BackupAndRestore en la llamada tf.keras.Model.fit() .

Con MultiWorkerMirroredStrategy, si un trabajador es interrumpido, todo el clúster se detiene hasta que se reinicia el trabajador interrumpido. Otros trabajadores también se reiniciarán y el trabajador interrumpido volverá a unirse al clúster. Luego, cada trabajador lee el archivo de punto de control que se guardó previamente y recupera su estado anterior, lo que permite que el clúster vuelva a sincronizarse. Entonces continúa el entrenamiento.

BackupAndRestore devolución de llamada de BackupAndRestore usa CheckpointManager para guardar y restaurar el estado de entrenamiento, lo que genera un archivo llamado punto de control que rastrea los puntos de control existentes junto con el último. Por esta razón, backup_dir no debe reutilizarse para almacenar otros puntos de control a fin de evitar la colisión de nombres.

Actualmente, la BackupAndRestore llamada de BackupAndRestore admite un solo trabajador sin estrategia, MirroredStrategy y varios trabajadores con MultiWorkerMirroredStrategy. A continuación, se muestran dos ejemplos de capacitación para trabajadores múltiples y capacitación para un solo trabajador.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2930 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2467 - accuracy: 0.2765
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1963 - accuracy: 0.3645
<tensorflow.python.keras.callbacks.History at 0x7f62c4371390>

Si inspecciona el directorio de backup_dir que especificó en BackupAndRestore , es posible que observe algunos archivos de puntos de control generados temporalmente. Esos archivos son necesarios para recuperar las instancias perdidas anteriormente, y la biblioteca los eliminará al final de tf.keras.Model.fit() vez que tf.keras.Model.fit() con éxito su entrenamiento.

Ver también

  1. La guía Distributed Training in TensorFlow proporciona una descripción general de las estrategias de distribución disponibles.
  2. Modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.
  3. La sección Rendimiento de la guía proporciona información sobre otras estrategias y herramientas que puede utilizar para optimizar el rendimiento de sus modelos de TensorFlow.