Entrenamiento del servidor de parámetros con ParameterServerStrategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Visión general

La formación del servidor parámetro es un método de datos en paralelo común de ampliar la formación modelo en varias máquinas.

Un grupo de entrenamiento servidor de parámetro consta de los trabajadores y servidores de parámetros. Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. De forma predeterminada, los trabajadores leen y actualizan estas variables de forma independiente sin sincronizarse entre sí. Es por esto que a veces parámetro de entrenamiento de tipo servidor se llama la formación asíncrona.

En TensorFlow 2, la formación del servidor parámetro es alimentado por el tf.distribute.experimental.ParameterServerStrategy clase, que distribuye los pasos de formación de un grupo que escala hasta miles de trabajadores (acompañado de servidores de parámetros).

Métodos de formación compatibles

Hay dos métodos principales de formación admitidos:

Un clúster con trabajos y tareas.

Independientemente de la API de elección ( Model.fit o un bucle de entrenamiento personalizado), la formación distribuidos en 2 TensorFlow implica: un 'cluster' con varios 'jobs' , y cada uno de los puestos de trabajo puede tener uno o más 'tasks' .

Al utilizar el entrenamiento del servidor de parámetros, se recomienda tener:

  • Un puesto de coordinador (que tiene el nombre de trabajo chief )
  • Puestos de trabajo de los trabajadores múltiples (nombre de trabajo worker ); y
  • Trabajos de servidor de múltiples parámetros (nombre de trabajo ps )

Mientras que el coordinador crea recursos, despachos de formación tareas, escribe los puestos de control, y se ocupa de los fracasos de trabajo, los trabajadores y los servidores de parámetros corren tf.distribute.Server que escucha las peticiones de la coordinadora.

La formación del servidor de parámetros con Model.fit API

La formación del servidor parámetro con el Model.fit API requiere el coordinador utilizar un tf.distribute.experimental.ParameterServerStrategy objeto, y una tf.keras.utils.experimental.DatasetCreator que la entrada. Al igual que en Model.fit uso sin una estrategia, o con otras estrategias, el flujo de trabajo consiste en crear y compilar el modelo, la preparación de las devoluciones de llamada, seguido de un Model.fit llamada.

Entrenamiento del servidor de parámetros con un ciclo de entrenamiento personalizado

Con bucles de formación personalizados, el tf.distribute.experimental.coordinator.ClusterCoordinator clase es el componente clave que se utiliza para el coordinador.

La API más importante proporcionada por el ClusterCoordinator objeto es schedule :

  • El schedule API encola un tf.function y devuelve un futuro similar RemoteValue inmediatamente.
  • Las funciones en cola serán enviados a los trabajadores a distancia en las discusiones de fondo y su RemoteValue s serán llenados de forma asíncrona.
  • Desde schedule no requiere la asignación de los trabajadores, la tf.function aprobada en se puede ejecutar en cualquier trabajador disponible.
  • Si el trabajador en el que se ejecuta deja de estar disponible antes de su finalización, la función se reintentará en otro trabajador disponible.
  • Debido a este hecho y al hecho de que la ejecución de la función no es atómica, una función puede ejecutarse más de una vez.

Además de despachar las funciones remotas, el ClusterCoordinator también ayuda a crear conjuntos de datos sobre todos los trabajadores y reconstruir estos conjuntos de datos cuando el trabajador se recupera de fracaso.

Configuración del tutorial

El tutorial se ramifican en Model.fit y caminos de bucle de entrenamiento personalizados, y se puede elegir el que se adapte a sus necesidades. Las secciones distintas de "Entrenamiento con X" son aplicables a ambos caminos.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

Configuración de clúster

Como se mencionó anteriormente, un grupo de entrenamiento servidor parámetro requiere una tarea que se ejecuta coordinador de su programa de entrenamiento, uno o varios trabajadores y las tareas del servidor de parámetros que ejecutar TensorFlow Servidores- tf.distribute.Server -y, posiblemente, una tarea de evaluación adicional que se ejecuta la evaluación sidecar (consulte la sección de evaluación del sidecar a continuación). Los requisitos para configurarlos son:

  • La tarea del coordinador debe conocer las direcciones y los puertos de todos los demás servidores de TensorFlow, excepto el evaluador.
  • Los trabajadores y los servidores de parámetros necesitan saber qué puerto necesitan escuchar. En aras de la simplicidad, generalmente puede pasar la información completa del clúster al crear servidores TensorFlow en estas tareas.
  • La tarea del evaluador no tiene que conocer la configuración del grupo de capacitación. Si es así, no debería intentar conectarse al clúster de entrenamiento.
  • Los trabajadores y los servidores de los parámetros deben tener tipos de tareas como "worker" y "ps" , respectivamente. El coordinador debe utilizar "chief" como tipo de tarea por razones de compatibilidad.

En este tutorial, creará un clúster en proceso para que todo el entrenamiento del servidor de parámetros se pueda ejecutar en Colab. Usted aprenderá cómo configurar grupos reales en una sección posterior.

Clúster en proceso

Comenzará creando varios servidores de TensorFlow con anticipación y se conectará a ellos más tarde. Tenga en cuenta que esto es sólo para el propósito de la manifestación de este tutorial, y en la formación real de los servidores se inició el "worker" y "ps" máquinas.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

El proceso de configuración del clúster-in se utiliza con frecuencia en las pruebas unitarias, como aquí .

Otra opción para las pruebas locales es poner en marcha los procesos en el local de la máquina-la salida de entrenamiento del Multi-trabajador con Keras para un ejemplo de este enfoque.

Crear una instancia de ParameterServerStrategy

Antes de sumergirse en el código de entrenamiento, vamos a crear instancias de un ParameterServerStrategy objeto. Tenga en cuenta que esto es necesario independientemente de que esté procediendo con Model.fit o un bucle de entrenamiento personalizado. El variable_partitioner argumento se explicará en la sección de sharding variable .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

Para usar GPU para la capacitación, asigne GPU visibles para cada trabajador. ParameterServerStrategy utilizará todas las GPU disponibles en cada trabajador, con la restricción de que todos los trabajadores deben tener el mismo número de GPU disponibles.

Fragmentación variable

Sharding variable se refiere a la división de una variable en múltiples variables más pequeñas, que se denominan fragmentos. La fragmentación variable puede resultar útil para distribuir la carga de la red al acceder a estas fragmentos. También es útil distribuir el cálculo y el almacenamiento de una variable normal entre varios servidores de parámetros.

Para habilitar sharding variable, puede pasar en un variable_partitioner cuando se construye un ParameterServerStrategy objeto. El variable_partitioner será invocado cada vez que se crea una variable y se espera que regrese el número de fragmentos a lo largo de cada dimensión de la variable. Algunos fuera de la caja de variable_partitioner s se proporcionan como tf.distribute.experimental.partitioners.MinSizePartitioner . Se recomienda el uso de particionadores basada en tamaño como tf.distribute.experimental.partitioners.MinSizePartitioner para evitar la partición de pequeñas variables, que podrían tener un impacto negativo sobre la velocidad de entrenamiento del modelo.

Cuando un variable_partitioner se pasa y si crea una variable directamente bajo strategy.scope() , se convertirá en un tipo de recipiente con una variables propiedad, que proporciona acceso a la lista de los fragmentos. En la mayoría de los casos, este contenedor se convertirá automáticamente en un tensor concatenando todos los fragmentos. Como resultado, se puede utilizar como una variable normal. Por otro lado, algunos métodos TensorFlow tales como tf.nn.embedding_lookup proporcionan implementación eficiente para este tipo de contenedor y en estos métodos se evitará concatenación automática.

Por favor, vea los documentos de la API de tf.distribute.experimental.ParameterServerStrategy para más detalles.

El entrenamiento con Model.fit

Keras proporciona una API de formación de fácil uso a través de Model.fit que se encarga de la formación de bucles bajo el capó, con la flexibilidad de Overridable train_step y devoluciones de llamada, que proporcionan funcionalidades tales como el ahorro puesto de control o resumen ahorrar para TensorBoard. Con Model.fit , el mismo código de la formación se puede utilizar para otras estrategias con un simple canje de la estrategia del objeto.

Los datos de entrada

Model.fit con el entrenamiento servidor parámetro requiere que se proporcionen los datos de entrada en un invocable que toma un solo argumento de tipo tf.distribute.InputContext , y devuelve un tf.data.Dataset . A continuación, crear una tf.keras.utils.experimental.DatasetCreator objeto que lo lleve callable , y un opcional de tf.distribute.InputOptions objeto a través input_options argumento.

Tenga en cuenta que se recomienda para mezclar y repetir los datos con la formación servidor de parámetros, y especificar steps_per_epoch de fit llamada para la biblioteca conoce los límites de épocas.

Por favor, vea la entrada distribuida tutorial para obtener más información acerca de la InputContext argumento.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

El código en dataset_fn será invocado en el dispositivo de entrada, que es generalmente la CPU, en cada una de las máquinas de los trabajadores.

Construcción y compilación de modelos

Ahora, va a crear un tf.keras.Model trivial -a tf.keras.models.Sequential modelo para fines de demostración-seguido por un Model.compile llamada a incorporar componentes, tales como un optimizador, métricas, o parámetros tales como steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Devolución de llamada y formación

Antes de llamar model.fit para el entrenamiento real, vamos a preparar las devoluciones de llamada necesarias para las tareas comunes, tales como:

  • ModelCheckpoint : para guardar los pesos modelo.
  • BackupAndRestore : para asegurarse de que el progreso en el entrenamiento se copian automáticamente, y se recupera si el racimo experiencias indisponibilidad (como abortar o de preferencia); o
  • TensorBoard : para guardar los informes de progreso en archivos de resumen, que obtienen visualizan en la herramienta de TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

El uso directo con ClusterCoordinator (opcional)

Incluso si se elige la Model.fit itinerario de formación, se puede crear una instancia opcionalmente un tf.distribute.experimental.coordinator.ClusterCoordinator objeto de programar otras funciones que le gustaría ser ejecutadas en los trabajadores. Ver el entrenamiento con un circuito de formación personalizada sección para más detalles y ejemplos.

Entrenamiento con un ciclo de entrenamiento personalizado

El uso de bucles de formación a medida con tf.distribute.Strategy proporciona una gran flexibilidad para definir bucles de formación. Con la ParameterServerStrategy definido anteriormente (como strategy ), que va a utilizar un tf.distribute.experimental.coordinator.ClusterCoordinator a despachar la ejecución de los pasos de formación a los trabajadores a distancia.

A continuación, va a crear un modelo, definir un conjunto de datos y una función escalonada, como lo han hecho en el circuito de entrenamiento con otra tf.distribute.Strategy s. Puede encontrar más detalles en la formación de encargo con tf.distribute.Strategy tutorial.

Para garantizar la obtención previa de datos eficiente, utilice el conjunto de datos distribuidos recomendada API de creación mencionados en los pasos de formación de despacho a los trabajadores a distancia sección de abajo. Además, asegúrese de llamar Strategy.run dentro worker_fn a sacar el máximo provecho de las GPU asignados a los trabajadores. El resto de los pasos son los mismos para entrenar con o sin GPU.

Creemos estos componentes en los siguientes pasos:

Configurar los datos

En primer lugar, escribir una función que crea un conjunto de datos que incluye el procesamiento previo lógica implementada por capas de pre-procesamiento Keras .

Va a crear estas capas exteriores del dataset_fn sino aplicar la transformación dentro de la dataset_fn , ya que lo envuelva el dataset_fn en un tf.function , que no permite a las variables que se crean dentro de ella.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Genere ejemplos de juguetes en un conjunto de datos:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

A continuación, cree el conjunto de datos de entrenamiento envuelto en una dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construye el modelo

A continuación, cree el modelo y otros objetos. Asegúrese de crear todas las variables bajo strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Vamos a confirmar que el uso de FixedShardsPartitioner divide todas las variables en dos trozos y cada trozo se asigna a diferentes servidores de parámetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Definir el paso de entrenamiento

En tercer lugar, crear el paso de formación envuelto en una tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

En la función de paso por encima de la formación, llamando Strategy.run y Strategy.reduce en el step_fn puede soportar múltiples GPU por trabajador. Si los trabajadores han asignado las GPU, Strategy.run distribuirá los conjuntos de datos en múltiples réplicas.

Envío de pasos de formación a trabajadores remotos

Después de que todos los cálculos se definen por ParameterServerStrategy , que va a utilizar el tf.distribute.experimental.coordinator.ClusterCoordinator clase para crear recursos y distribuir los pasos de formación a los trabajadores a distancia.

Primero vamos a crear un ClusterCoordinator objeto y pase el objeto de la estrategia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Luego, cree un conjunto de datos por trabajador y un iterador. En los per_worker_dataset_fn abajo, envolviendo el dataset_fn en strategy.distribute_datasets_from_function se recomienda para permitir la obtención previa eficiente para las GPU sin problemas.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

El paso final es la de distribuir el cálculo a los trabajadores remotos utilizando ClusterCoordinator.schedule :

  • El schedule método encola un tf.function y devuelve un futuro similar RemoteValue inmediatamente. Las funciones en cola serán enviados a los trabajadores a distancia en las discusiones de fondo y el RemoteValue se llenarán de forma asíncrona.
  • La join método ( ClusterCoordinator.join ) se puede utilizar para esperar hasta que se ejecutan todas las funciones programadas.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Así es como se puede recuperar el resultado de una RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternativamente, puede iniciar todos los pasos y hacer algo mientras espera su finalización:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para la formación completa y flujo de trabajo que sirve para este ejemplo en particular, por favor echa un vistazo a esta prueba .

Más sobre la creación de conjuntos de datos

El conjunto de datos en el código anterior se ha creado usando el ClusterCoordinator.create_per_worker_dataset API). Crea un conjunto de datos por trabajador y devuelve un objeto contenedor. Puede llamar al iter método en él para crear un iterador por trabajador. El per-trabajador iterador contiene un iterador por trabajador y la rebanada correspondiente de un trabajador será sustituido en el argumento de entrada de la función pasa a la ClusterCoordinator.schedule método antes de que se ejecute la función de un trabajador particular.

Actualmente, la ClusterCoordinator.schedule método asume trabajadores son equivalentes y por lo tanto asume los conjuntos de datos sobre diferentes trabajadores son iguales, excepto que pueden ser mezcladas de manera diferente si contienen un Dataset.shuffle operación. Debido a esto, también se recomienda que los proveedores de datos para ser repetidos indefinidamente y se programa un número finito de pasos en lugar de depender de la OutOfRangeError de un conjunto de datos.

Otra nota importante es que tf.data conjuntos de datos no son compatibles con la serialización y deserialización implícita a través de límites de tareas. Por eso es importante para crear el conjunto de datos dentro de la función pasó a ClusterCoordinator.create_per_worker_dataset .

Evaluación

Hay más de una forma de definir y ejecutar un ciclo de evaluación en el entrenamiento distribuido. Cada uno tiene sus pros y sus contras, como se describe a continuación. Se recomienda el método de evaluación en línea si no tiene una preferencia.

Evaluación en línea

En este método, los suplentes coordinador entre la formación y evaluación y por lo que se llamó la evaluación en línea.

Hay varios beneficios de la evaluación en línea. Por ejemplo:

  • Puede admitir grandes modelos de evaluación y conjuntos de datos de evaluación que una sola tarea no puede contener.
  • Los resultados de la evaluación se pueden utilizar para tomar decisiones para entrenar la próxima época.

Hay dos formas de implementar la evaluación en línea: evaluación directa y evaluación distribuida.

  • La evaluación directa: Para los pequeños modelos y evaluación de datos, el coordinador puede ejecutar la evaluación directamente en el modelo distribuido con el conjunto de datos de evaluación del coordinador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Evaluación distribuido: Para grandes modelos o conjuntos de datos que son inviables para ejecutarse directamente en el coordinador, coordinador de la tarea puede distribuir las tareas de evaluación a los trabajadores a través de los ClusterCoordinator.schedule / ClusterCoordinator.join métodos:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Evaluación de sidecar

Otro método se llama Evaluación sidecar, donde se crea una tarea evaluador dedicado que lee repetidamente puestos de control y se ejecuta la evaluación en un puesto de control más reciente. Permite que su programa de entrenamiento finalice antes si no necesita cambiar su ciclo de entrenamiento en función de los resultados de la evaluación. Sin embargo, requiere una tarea de evaluador adicional y puntos de control periódicos para desencadenar la evaluación. A continuación se muestra un posible bucle de evaluación del sidecar:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clústeres en el mundo real

En un entorno de producción real, ejecutará todas las tareas en diferentes procesos en diferentes máquinas. La forma más sencilla de configurar la información de clúster en cada tarea es establecer "TF_CONFIG" variables de entorno y utilizar un tf.distribute.cluster_resolver.TFConfigClusterResolver para analizar "TF_CONFIG" .

Para una descripción general sobre "TF_CONFIG" variables de entorno, consulte la formación distribuida guía.

Si usted comienza sus tareas de formación utilizando Kubernetes u otras plantillas de configuración, es muy probable que estas plantillas ya han establecido “TF_CONFIG" para usted.

Ajuste el "TF_CONFIG" variable de entorno

Suponga que tiene 3 trabajadores y 2 servidores de parámetros, el "TF_CONFIG" del trabajador puede ser 1:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

El "TF_CONFIG" de la evaluador puede ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

El "cluster" parte en el anterior "TF_CONFIG" cadena para el evaluador es opcional.

Si usa el mismo binario para todas las tareas

Si prefiere ejecutar todas estas tareas utilizando un solo binario, deberá permitir que su programa se ramifique en diferentes roles desde el principio:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

El siguiente código inicia un servidor de TensorFlow y espera:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Manejo de fallas en la tarea

Fracaso del trabajador

tf.distribute.experimental.coordinator.ClusterCoordinator o Model.fit proporcionan incorporados en la tolerancia a fallos para el fracaso de los trabajadores. Tras la recuperación del trabajador, la función proporcionada anteriormente conjunto de datos (ya sea a ClusterCoordinator.create_per_worker_dataset para un bucle de entrenamiento personalizado o tf.keras.utils.experimental.DatasetCreator para Model.fit ) se invocará a los trabajadores a volver a crear los conjuntos de datos.

Error del coordinador o del servidor de parámetros

Sin embargo, cuando el coordinador ve un error del servidor parámetro, se levantará un UnavailableError o AbortedError inmediatamente. Puede reiniciar el coordinador en este caso. El propio coordinador también puede dejar de estar disponible. Por lo tanto, se recomiendan ciertas herramientas para no perder el progreso del entrenamiento:

  • Para Model.fit , se debe utilizar un BackupAndRestore de devolución de llamada, que maneja el guardado el progreso y la restauración automática. Ver las devoluciones de llamada y la formación apartado anterior para un ejemplo.

  • Para un ciclo de entrenamiento personalizado, debe verificar periódicamente las variables del modelo y cargar las variables del modelo desde un punto de control, si lo hubiera, antes de que comience el entrenamiento. El progreso en el entrenamiento se puede deducir aproximadamente de optimizer.iterations si es un punto de control un optimizador:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Recogida de una RemoteValue

Recogida de una RemoteValue está garantizado para tener éxito si una función es ejecutada con éxito. Esto se debe a que actualmente el valor de retorno se copia inmediatamente al coordinador después de que se ejecuta una función. Si hay algún error de trabajador durante la copia, la función se reintentará en otro trabajador disponible. Por lo tanto, si desea optimizar el rendimiento, puede programar funciones sin un valor de retorno.

Error al reportar

Una vez que el coordinador ve un error como UnavailableError desde los servidores de parámetros u otros errores de aplicación tales como InvalidArgument de tf.debugging.check_numerics , se cancelará todas las funciones pendientes y en cola antes de levantar el error. Ir a buscar sus correspondientes RemoteValue s levantará un CancelledError .

Después de que se genera un error, el coordinador no generará el mismo error ni ningún error de las funciones canceladas.

Mejora del rendimiento

Hay varias razones posibles si ha observado que los problemas de rendimiento cuando se entrena con ParameterServerStrategy y ClusterResolver .

Una razón común es que los servidores de parámetros tienen una carga desequilibrada y algunos servidores de parámetros muy cargados han alcanzado su capacidad. También puede haber varias causas fundamentales. Algunos métodos simples para mitigar este problema son:

  1. Fragmentar sus grandes variables de modelo a través de la especificación de un variable_partitioner cuando se construye un ParameterServerStrategy .
  2. Evite crear una variable de punto de acceso que sea requerida por todos los servidores de parámetros en un solo paso si es posible. Por ejemplo, utilizar una tasa de aprendizaje constante o subclase tf.keras.optimizers.schedules.LearningRateSchedule en optimizadores ya que el comportamiento por defecto es que la tasa de aprendizaje se convertirá en una variable colocado en un servidor parámetro particular y solicitado por todos los demás servidores de parámetros en cada paso .
  3. Mezcle sus vocabularios extensos antes de pasarlos a las capas de preprocesamiento de Keras.

Otra posible razón de problemas de desempeño es el coordinador. Su primera implementación de schedule / join está basado en Python y por lo tanto puede haber enhebrado por encima. Además, la latencia entre el coordinador y los trabajadores puede ser grande. Si este es el caso,

  • Para Model.fit , se puede establecer steps_per_execution argumento proporcionado en Model.compile a un valor mayor que 1.

  • Para un bucle de entrenamiento personalizado, puede empacar múltiples pasos en una sola tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

A medida que la biblioteca se optimice aún más, es de esperar que la mayoría de los usuarios no tengan que empaquetar manualmente los pasos en el futuro.

Además, un pequeño truco para mejorar el rendimiento es programar funciones sin un valor de retorno, como se explica en la sección de fallas de tareas de manejo anterior.

Limitaciones conocidas

La mayoría de las limitaciones conocidas ya se tratan en las secciones anteriores. Esta sección proporciona un resumen.

ParameterServerStrategy generales

  • os.environment["grpc_fail_fast"]="use_caller" se necesita en cada tarea, incluyendo el coordinador, para hacer el trabajo falla la tolerancia correctamente.
  • No se admite el entrenamiento del servidor de parámetros síncronos.
  • Por lo general, es necesario agrupar varios pasos en una sola función para lograr un rendimiento óptimo.
  • No es compatible para cargar un saved_model través tf.saved_model.load que contiene variables fragmentados. Tenga en cuenta que se espera que la carga de un modelo guardado con TensorFlow Serving funcione.
  • No se admite cargar un punto de control que contenga variables de ranura del optimizador fragmentadas en un número diferente de fragmentos.
  • No se admite la recuperación de una falla del servidor de parámetros sin reiniciar la tarea del coordinador.
  • El uso de tf.lookup.StaticHashTable (que comúnmente se emplea por algunos tf.keras.layers.experimental.preprocessing capas, tales como IntegerLookup , StringLookup , y TextVectorization ) resulta en recursos colocados en el coordinador en este momento con el entrenamiento servidor parámetro. Esto tiene implicaciones de rendimiento para las RPC de búsqueda de los trabajadores al coordinador. Esta es una alta prioridad actual para abordar.

Model.fit detalles

  • steps_per_epoch se requiere argumento en Model.fit . Puede seleccionar un valor que proporcione intervalos apropiados en una época.
  • ParameterServerStrategy no tiene soporte para las devoluciones de llamada personalizados que tienen llamadas a nivel de lote por razones de rendimiento. Debe convertir esas llamadas en llamadas a nivel época con convenientemente recogido steps_per_epoch , por lo que están llamados todos los steps_per_epoch número de pasos. Las devoluciones de llamada integradas no se ven afectadas: sus llamadas a nivel de lote se han modificado para que sean eficaces. Apoyando las llamadas a nivel de lote para ParameterServerStrategy se está planificando.
  • Por la misma razón, a diferencia de otras estrategias, la barra de progreso y las métricas se registran solo en los límites de época.
  • run_eagerly no es compatible.

Detalles específicos del ciclo de entrenamiento personalizado