Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Entrenamiento distribuido con TensorFlow

Ver en TensorFlow.org Ejecutar en Google Colab Ver código fuente en GitHub Descargar cuaderno

Visión general

tf.distribute.Strategy es una API de TensorFlow para distribuir capacitación en múltiples GPU, múltiples máquinas o TPU. Con esta API, puede distribuir sus modelos existentes y el código de entrenamiento con cambios mínimos de código.

tf.distribute.Strategy ha sido diseñado con estos objetivos clave en mente:

  • Fácil de usar y compatible con múltiples segmentos de usuarios, incluidos investigadores, ingenieros de ML, etc.
  • Proporcione un buen rendimiento fuera de la caja.
  • Fácil cambio entre estrategias.

tf.distribute.Strategy se puede usar con una API de alto nivel como Keras , y también se puede usar para distribuir bucles de entrenamiento personalizados (y, en general, cualquier cálculo usando TensorFlow).

En TensorFlow 2.x, puede ejecutar sus programas con entusiasmo, o en un gráfico usando tf.function . tf.distribute.Strategy pretende admitir estos dos modos de ejecución, pero funciona mejor con tf.function . El modo Eager solo se recomienda para fines de depuración y no es compatible con TPUStrategy . Aunque discutimos la capacitación la mayor parte del tiempo en esta guía, esta API también se puede utilizar para distribuir la evaluación y la predicción en diferentes plataformas.

Puede usar tf.distribute.Strategy con muy pocos cambios en su código, porque hemos cambiado los componentes subyacentes de TensorFlow para tomar conciencia de la estrategia. Esto incluye variables, capas, modelos, optimizadores, métricas, resúmenes y puntos de control.

En esta guía, explicamos varios tipos de estrategias y cómo puede usarlas en diferentes situaciones.

 # Import TensorFlow
import tensorflow as tf
 

Tipos de estrategias.

tf.distribute.Strategy tiene la intención de cubrir una serie de casos de uso a lo largo de diferentes ejes. Algunas de estas combinaciones son compatibles actualmente y otras se agregarán en el futuro. Algunos de estos ejes son:

  • Entrenamiento síncrono versus asíncrono: Estas son dos formas comunes de distribuir el entrenamiento con paralelismo de datos. En la capacitación de sincronización, todos los trabajadores se capacitan en diferentes segmentos de datos de entrada sincronizados y agregan gradientes en cada paso. En la capacitación asíncrona, todos los trabajadores se capacitan independientemente sobre los datos de entrada y actualizan las variables de forma asincrónica. Por lo general, el entrenamiento de sincronización es compatible a través de todo-reducir y asíncrono a través de la arquitectura del servidor de parámetros.
  • Plataforma de hardware: es posible que desee escalar su entrenamiento en múltiples GPU en una máquina, o en varias máquinas en una red (con 0 o más GPU cada una), o en TPU en la nube.

Para respaldar estos casos de uso, hay seis estrategias disponibles. En la siguiente sección explicamos cuáles de estos son compatibles en qué escenarios en TF 2.2 en este momento. Aquí hay un resumen rápido:

API de entrenamiento Estrategia reflejada TPUStrategy MultiWorkerMirroredStrategy Estrategia de almacenamiento central ParameterServerStrategy
API de Keras Soportado Soportado Apoyo experimental Apoyo experimental Publicación planificada admitida 2.3
Bucle de entrenamiento personalizado Soportado Soportado Apoyo experimental Apoyo experimental Publicación planificada admitida 2.3
API de estimador Soporte limitado No soportado Soporte limitado Soporte limitado Soporte limitado

Estrategia reflejada

tf.distribute.MirroredStrategy admite entrenamiento distribuido sincrónico en múltiples GPU en una máquina. Crea una réplica por dispositivo GPU. Cada variable en el modelo se refleja en todas las réplicas. Juntas, estas variables forman una sola variable conceptual llamada MirroredVariable . Estas variables se mantienen sincronizadas entre sí mediante la aplicación de actualizaciones idénticas.

Se utilizan algoritmos eficientes de reducción total para comunicar las actualizaciones variables a través de los dispositivos. Reduzca todos los tensores de agregados en todos los dispositivos agregándolos y los pone a disposición en cada dispositivo. Es un algoritmo fusionado que es muy eficiente y puede reducir significativamente la sobrecarga de sincronización. Hay muchos algoritmos e implementaciones de reducción total disponibles, dependiendo del tipo de comunicación disponible entre dispositivos. Por defecto, usa NVIDIA NCCL como la implementación de reducción total. Puede elegir entre algunas otras opciones que ofrecemos, o escribir la suya.

Aquí está la forma más simple de crear MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Esto creará una instancia de MirroredStrategy que usará todas las GPU que son visibles para TensorFlow, y usará NCCL como la comunicación entre dispositivos.

Si desea utilizar solo algunas de las GPU en su máquina, puede hacerlo así:

 mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
 
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Si desea anular la comunicación entre dispositivos, puede hacerlo utilizando el argumento cross_device_ops proporcionando una instancia de tf.distribute.CrossDeviceOps . Actualmente, tf.distribute.HierarchicalCopyAllReduce y tf.distribute.ReductionToOneDevice son dos opciones distintas de tf.distribute.NcclAllReduce que es el valor predeterminado.

 mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Estrategia TPUS

tf.distribute.TPUStrategy permite ejecutar su entrenamiento TensorFlow en unidades de procesamiento de tensor (TPU). Los TPU son los ASIC especializados de Google diseñados para acelerar drásticamente las cargas de trabajo de aprendizaje automático. Están disponibles en Google Colab, TensorFlow Research Cloud y Cloud TPU .

En términos de arquitectura de entrenamiento distribuido, TPUStrategy es el mismo MirroredStrategy : implementa entrenamiento distribuido sincrónico. Las TPU proporcionan su propia implementación de operaciones eficaces de reducción total y otras operaciones colectivas en múltiples núcleos de TPU, que se utilizan en la TPUStrategy .

Así es como TPUStrategy instancia de TPUStrategy :

 cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
 

La instancia TPUClusterResolver ayuda a localizar las TPU. En Colab, no necesita especificar ningún argumento.

Si desea usar esto para TPU en la nube:

  • Debe especificar el nombre de su recurso TPU en el argumento tpu .
  • Debe inicializar el sistema tpu explícitamente al inicio del programa. Esto es necesario antes de que las TPU se puedan usar para el cálculo. La inicialización del sistema tpu también borra la memoria TPU, por lo que es importante completar este paso primero para evitar perder el estado.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy es muy similar a MirroredStrategy . Implementa capacitación distribuida sincrónica a través de múltiples trabajadores, cada uno con GPU potencialmente múltiples. Similar a MirroredStrategy , crea copias de todas las variables en el modelo en cada dispositivo en todos los trabajadores.

Utiliza CollectiveOps como el método de comunicación multi-trabajador de reducción total utilizado para mantener las variables sincronizadas. Una operación colectiva es una operación única en el gráfico TensorFlow que puede elegir automáticamente un algoritmo de reducción total en el tiempo de ejecución de TensorFlow de acuerdo con el hardware, la topología de red y los tamaños de tensor.

También implementa optimizaciones de rendimiento adicionales. Por ejemplo, incluye una optimización estática que convierte múltiples reducciones totales en tensores pequeños en menos reducciones totales en tensores más grandes. Además, lo estamos diseñando para que tenga una arquitectura de complementos, de modo que en el futuro, pueda completar algoritmos que estén mejor ajustados para su hardware. Tenga en cuenta que las operaciones colectivas también implementan otras operaciones colectivas, como la transmisión y la recopilación total.

Aquí está la forma más simple de crear MultiWorkerMirroredStrategy :

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy actualmente le permite elegir entre dos implementaciones diferentes de operaciones colectivas. CollectiveCommunication.RING implementa colectivos basados ​​en anillos utilizando gRPC como capa de comunicación. CollectiveCommunication.NCCL utiliza NCCL de Nvidia para implementar colectivos. CollectiveCommunication.AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y la interconexión de red en el clúster. Puede especificarlos de la siguiente manera:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

Una de las diferencias clave para iniciar la capacitación de múltiples trabajadores, en comparación con la capacitación de múltiples GPU, es la configuración de múltiples trabajadores. La variable de entorno TF_CONFIG es la forma estándar en TensorFlow para especificar la configuración del clúster para cada trabajador que forma parte del clúster. Obtenga más información sobre cómo configurar TF_CONFIG .

Estrategia de almacenamiento central

tf.distribute.experimental.CentralStorageStrategy realiza entrenamiento sincrónico. Las variables no se reflejan, sino que se colocan en la CPU y las operaciones se replican en todas las GPU locales. Si solo hay una GPU, todas las variables y operaciones se colocarán en esa GPU.

Cree una instancia de CentralStorageStrategy de la siguiente manera:

 central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
 
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Esto creará una instancia de CentralStorageStrategy que usará todas las GPU y CPU visibles. La actualización de las variables en las réplicas se agregará antes de aplicarse a las variables.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy admite la formación de servidores de parámetros en varias máquinas. En esta configuración, algunas máquinas se designan como trabajadores y otras como servidores de parámetros. Cada variable del modelo se coloca en un servidor de parámetros. La computación se replica en todas las GPU de todos los trabajadores.

En términos de código, se parece a otras estrategias:

 ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
 

Para la capacitación de múltiples trabajadores, TF_CONFIG necesita especificar la configuración de los servidores de parámetros y los trabajadores en su clúster, sobre lo cual puede leer más en TF_CONFIG a continuación .

Otras estrategias

Además de las estrategias anteriores, existen otras dos estrategias que pueden ser útiles para la creación de prototipos y la depuración cuando se utilizan tf.distribute API tf.distribute .

Estrategia predeterminada

La estrategia predeterminada es una estrategia de distribución que está presente cuando no hay una estrategia de distribución explícita dentro del alcance. Implementa la interfaz tf.distribute.Strategy pero es un paso y no proporciona una distribución real. Por ejemplo, strategy.run(fn) simplemente llamará a fn . El código escrito usando esta estrategia debe comportarse exactamente como el código escrito sin ninguna estrategia. Puedes considerarlo como una estrategia "no operativa".

La estrategia predeterminada es un singleton, y no se pueden crear más instancias. Se puede obtener usando tf.distribute.get_strategy() fuera del alcance de cualquier estrategia explícita (la misma API que se puede usar para obtener la estrategia actual dentro del alcance de una estrategia explícita).

 default_strategy = tf.distribute.get_strategy()
 

Esta estrategia tiene dos propósitos principales:

  • Permite escribir código de biblioteca consciente de distribución incondicionalmente. Por ejemplo, en el optimizador, podemos hacer tf.distribute.get_strategy() y usar esa estrategia para reducir gradientes; siempre devolverá un objeto de estrategia en el que podemos llamar reducir API.
 # In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
 
1.0
  • Similar al código de la biblioteca, se puede usar para escribir programas de usuarios finales para trabajar con y sin estrategia de distribución, sin requerir lógica condicional. Un fragmento de código de muestra que ilustra esto:
 if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
 
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy es una estrategia para colocar todas las variables y el cálculo en un solo dispositivo especificado.

 strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
 

Esta estrategia es distinta de la estrategia predeterminada en varias formas. En la estrategia predeterminada, la lógica de ubicación variable permanece sin cambios en comparación con la ejecución de TensorFlow sin ninguna estrategia de distribución. Pero cuando se usa OneDeviceStrategy , todas las variables creadas en su alcance se colocan explícitamente en el dispositivo especificado. Además, cualquier función llamada a través de OneDeviceStrategy.run también se colocará en el dispositivo especificado.

La entrada distribuida a través de esta estrategia se capturará previamente en el dispositivo especificado. En la estrategia predeterminada, no hay distribución de entrada.

Similar a la estrategia predeterminada, esta estrategia también podría usarse para probar su código antes de cambiar a otras estrategias que realmente se distribuyen a múltiples dispositivos / máquinas. Esto ejercitará la maquinaria de la estrategia de distribución algo más que la estrategia predeterminada, pero no en toda su extensión, ya que utiliza MirroredStrategy o TPUStrategy etc. Si desea un código que se comporte como si no hubiera una estrategia, utilice la estrategia predeterminada.

Hasta ahora hemos hablado sobre cuáles son las diferentes estrategias disponibles y cómo puede crearlas. En las próximas secciones, hablaremos sobre las diferentes formas en que puede usarlas para distribuir su capacitación. Mostraremos fragmentos cortos de código en esta guía y enlazaremos con tutoriales completos que puede ejecutar de extremo a extremo.

Usando tf.distribute.Strategy con tf.keras.Model.fit

Hemos integrado tf.distribute.Strategy en tf.keras que es la implementación de TensorFlow de la especificación API de Keras . tf.keras es una API de alto nivel para construir y entrenar modelos. Al integrarnos en el backend de tf.keras , hemos hecho que puedas distribuir tu entrenamiento escrito en el marco de entrenamiento de Keras usando model.fit .

Esto es lo que necesita cambiar en su código:

  1. Cree una instancia del tf.distribute.Strategy apropiado.
  2. Mueva la creación del modelo, optimizador y métricas de Keras dentro de strategy.scope .

Admitimos todo tipo de modelos Keras: secuenciales, funcionales y subclasificados.

Aquí hay un fragmento de código para hacer esto para un modelo Keras muy simple con una capa densa:

 mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

En este ejemplo, usamos MirroredStrategy para poder ejecutar esto en una máquina con múltiples GPU. strategy.scope() indica a Keras qué estrategia utilizar para distribuir la capacitación. Crear modelos / optimizadores / métricas dentro de este alcance nos permite crear variables distribuidas en lugar de variables regulares. Una vez que esté configurado, puede ajustar su modelo como lo haría normalmente. MirroredStrategy se encarga de replicar la capacitación del modelo en las GPU disponibles, agregar gradientes y más.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
 
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.0035
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.4436
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.2755

0.27546340227127075

Aquí usamos un tf.data.Dataset de datos tf.data.Dataset para proporcionar la entrada de capacitación y evaluación. También puede usar matrices numpy:

 import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
 
Epoch 1/2
10/10 [==============================] - 0s 1ms/step - loss: 0.1961
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.0867

<tensorflow.python.keras.callbacks.History at 0x7fa17808e240>

En ambos casos (conjunto de datos o numpy), cada lote de la entrada dada se divide por igual entre las múltiples réplicas. Por ejemplo, si usa MirroredStrategy con 2 GPU, cada lote de tamaño 10 se dividirá entre las 2 GPU, y cada una recibirá 5 ejemplos de entrada en cada paso. Cada época se entrenará más rápido a medida que agregue más GPU. Por lo general, querrá aumentar el tamaño de su lote a medida que agrega más aceleradores para hacer un uso efectivo de la potencia informática adicional. También deberá volver a ajustar su tasa de aprendizaje, según el modelo. Puede usar strategy.num_replicas_in_sync para obtener el número de réplicas.

 # Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
 

¿Qué se admite ahora?

API de entrenamiento Estrategia reflejada TPUStrategy MultiWorkerMirroredStrategy Estrategia de almacenamiento central ParameterServerStrategy
API de Keras Soportado Soportado Apoyo experimental Apoyo experimental Soporte planificado post 2.3

Ejemplos y tutoriales

Aquí hay una lista de tutoriales y ejemplos que ilustran la integración anterior de principio a fin con Keras:

  1. Tutorial para entrenar MNIST con MirroredStrategy .
  2. Tutorial para entrenar MNIST usando MultiWorkerMirroredStrategy .
  3. Guía sobre la formación de MNIST utilizando TPUStrategy .
  4. El repositorio de TensorFlow Model Garden que contiene colecciones de modelos de vanguardia implementados utilizando diversas estrategias.

Usando tf.distribute.Strategy con bucles de entrenamiento personalizados

Como has visto, usar tf.distribute.Strategy con Keras model.fit requiere cambiar solo un par de líneas de tu código. Con un poco más de esfuerzo, también puede usar tf.distribute.Strategy con bucles de entrenamiento personalizados.

Si necesita más flexibilidad y control sobre sus ciclos de entrenamiento de lo que es posible con Estimator o Keras, puede escribir ciclos de entrenamiento personalizados. Por ejemplo, al usar una GAN, es posible que desee tomar un número diferente de pasos de generador o discriminador en cada ronda. Del mismo modo, los marcos de alto nivel no son muy adecuados para la formación del aprendizaje por refuerzo.

Para admitir bucles de entrenamiento personalizados, proporcionamos un conjunto básico de métodos a través de las clases tf.distribute.Strategy . El uso de estos puede requerir una reestructuración menor del código inicialmente, pero una vez hecho esto, debería poder cambiar entre GPU, TPU y múltiples máquinas simplemente cambiando la instancia de estrategia.

Aquí mostraremos un breve fragmento que ilustra este caso de uso para un ejemplo de entrenamiento simple usando el mismo modelo Keras que antes.

Primero, creamos el modelo y el optimizador dentro del alcance de la estrategia. Esto garantiza que cualquier variable creada con el modelo y el optimizador sean variables reflejadas.

 with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
 

A continuación, creamos el conjunto de datos de entrada y llamamos a tf.distribute.Strategy.experimental_distribute_dataset para distribuir el conjunto de datos en función de la estrategia.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
 

Luego, definimos un paso de la capacitación. Usaremos tf.GradientTape para calcular gradientes y el optimizador para aplicar esos gradientes para actualizar las variables de nuestro modelo. Para distribuir este paso de entrenamiento, ponemos una función train_step y la pasamos a tf.distrbute.Strategy.run junto con las entradas del conjunto de datos que obtenemos de dist_dataset creado antes:

 loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
 

Algunas otras cosas a tener en cuenta en el código anterior:

  1. Utilizamos tf.nn.compute_average_loss para calcular la pérdida. tf.nn.compute_average_loss suma la pérdida por ejemplo y divide la suma entre global_batch_size. Esto es importante porque más tarde después de los gradientes se calculan en cada réplica, que están agregados a través de las réplicas sumando ellos.
  2. Utilizamos la API tf.distribute.Strategy.reduce para agregar los resultados devueltos por tf.distribute.Strategy.run . tf.distribute.Strategy.run devuelve resultados de cada réplica local en la estrategia, y hay varias formas de consumir este resultado. Puede reduce para obtener un valor agregado. También puede hacer tf.distribute.Strategy.experimental_local_results para obtener la lista de valores contenidos en el resultado, uno por réplica local.
  3. Cuando se llama a apply_gradients dentro de un alcance de estrategia de distribución, su comportamiento se modifica. Específicamente, antes de aplicar gradientes en cada instancia paralela durante el entrenamiento sincrónico, realiza una suma total de las réplicas de los gradientes.

Finalmente, una vez que hemos definido el paso de entrenamiento, podemos iterar sobre dist_dataset y ejecutar el entrenamiento en un bucle:

 for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
 
tf.Tensor(0.4155251, shape=(), dtype=float32)
tf.Tensor(0.41321823, shape=(), dtype=float32)
tf.Tensor(0.4109319, shape=(), dtype=float32)
tf.Tensor(0.40866604, shape=(), dtype=float32)
tf.Tensor(0.40642032, shape=(), dtype=float32)
tf.Tensor(0.40419456, shape=(), dtype=float32)
tf.Tensor(0.4019885, shape=(), dtype=float32)
tf.Tensor(0.399802, shape=(), dtype=float32)
tf.Tensor(0.39763477, shape=(), dtype=float32)
tf.Tensor(0.3954866, shape=(), dtype=float32)
tf.Tensor(0.39335734, shape=(), dtype=float32)
tf.Tensor(0.3912467, shape=(), dtype=float32)
tf.Tensor(0.38915452, shape=(), dtype=float32)
tf.Tensor(0.38708064, shape=(), dtype=float32)
tf.Tensor(0.38502476, shape=(), dtype=float32)
tf.Tensor(0.38298675, shape=(), dtype=float32)
tf.Tensor(0.38096642, shape=(), dtype=float32)
tf.Tensor(0.3789635, shape=(), dtype=float32)
tf.Tensor(0.3769779, shape=(), dtype=float32)
tf.Tensor(0.37500936, shape=(), dtype=float32)

En el ejemplo anterior, dist_dataset sobre dist_dataset para proporcionar información a su entrenamiento. También proporcionamos tf.distribute.Strategy.make_experimental_numpy_dataset para admitir entradas numpy. Puede usar esta API para crear un conjunto de datos antes de llamar a tf.distribute.Strategy.experimental_distribute_dataset .

Otra forma de iterar sobre sus datos es usar explícitamente iteradores. Es posible que desee hacer esto cuando desee ejecutar un número determinado de pasos en lugar de iterar sobre todo el conjunto de datos. La iteración anterior ahora se modificaría para crear primero un iterador y luego llamar explícitamente a next para obtener los datos de entrada.

 iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
 
tf.Tensor(0.37305772, shape=(), dtype=float32)
tf.Tensor(0.3711228, shape=(), dtype=float32)
tf.Tensor(0.3692044, shape=(), dtype=float32)
tf.Tensor(0.36730233, shape=(), dtype=float32)
tf.Tensor(0.3654165, shape=(), dtype=float32)
tf.Tensor(0.36354658, shape=(), dtype=float32)
tf.Tensor(0.36169255, shape=(), dtype=float32)
tf.Tensor(0.3598542, shape=(), dtype=float32)
tf.Tensor(0.35803124, shape=(), dtype=float32)
tf.Tensor(0.3562236, shape=(), dtype=float32)

Esto cubre el caso más simple de usar tf.distribute.Strategy API para distribuir bucles de entrenamiento personalizados. Estamos en el proceso de mejorar estas API. Dado que este caso de uso requiere más trabajo para adaptar su código, publicaremos una guía detallada por separado en el futuro.

¿Qué se admite ahora?

API de entrenamiento Estrategia reflejada TPUStrategy MultiWorkerMirroredStrategy Estrategia de almacenamiento central ParameterServerStrategy
Bucle de entrenamiento personalizado Soportado Soportado Apoyo experimental Apoyo experimental Soporte planificado post 2.3

Ejemplos y tutoriales

Aquí hay algunos ejemplos para usar la estrategia de distribución con bucles de entrenamiento personalizados:

  1. Tutorial para entrenar MNIST usando MirroredStrategy .
  2. Guía sobre la formación de MNIST utilizando TPUStrategy .
  3. El repositorio de TensorFlow Model Garden que contiene colecciones de modelos de vanguardia implementados utilizando diversas estrategias.

Uso de tf.distribute.Strategy con Estimator (soporte limitado)

tf.estimator es una API distribuida de capacitación de TensorFlow que originalmente admitía el enfoque del servidor de parámetros asíncronos. Al igual que con Keras, hemos integrado tf.distribute.Strategy en tf.Estimator . Si está utilizando Estimator para su entrenamiento, puede cambiar fácilmente a un entrenamiento distribuido con muy pocos cambios en su código. Con esto, los usuarios de Estimator ahora pueden realizar capacitación distribuida sincrónica en múltiples GPU y múltiples trabajadores, así como también usar TPU. Este soporte en Estimator es, sin embargo, limitado. Consulte la sección Qué se admite ahora a continuación para obtener más detalles.

El uso de tf.distribute.Strategy con Estimator es ligeramente diferente al caso de Keras. En lugar de usar strategy.scope , ahora pasamos el objeto de estrategia al RunConfig para el Estimador.

Aquí hay un fragmento de código que muestra esto con un estimador LinearRegressor y MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp2ack9oru
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp2ack9oru', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Aquí utilizamos un Estimador prefabricado, pero el mismo código también funciona con un Estimador personalizado. train_distribute determina cómo se distribuirá el entrenamiento, y eval_distribute determina cómo se distribuirá la evaluación. Esta es otra diferencia de Keras, donde usamos la misma estrategia tanto para el entrenamiento como para la evaluación.

Ahora podemos entrenar y evaluar este Estimador con una función de entrada:

 def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
 
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-08-04T20:28:12Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp2ack9oru/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.20350s
INFO:tensorflow:Finished evaluation at 2020-08-04-20:28:12
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmp2ack9oru/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Otra diferencia a destacar aquí entre Estimator y Keras es el manejo de entrada. En Keras, mencionamos que cada lote del conjunto de datos se divide automáticamente entre las múltiples réplicas. En Estimator, sin embargo, no hacemos una división automática de lotes, ni compartimos automáticamente los datos entre diferentes trabajadores. Usted tiene control total sobre cómo desea que se distribuyan sus datos entre los trabajadores y los dispositivos, y debe proporcionar un input_fn para especificar cómo distribuir sus datos.

Su input_fn se llama una vez por trabajador, dando así un conjunto de datos por trabajador. Luego, un lote de ese conjunto de datos se alimenta a una réplica en ese trabajador, consumiendo así N lotes para N réplicas en 1 trabajador. En otras palabras, el conjunto de datos devuelto por input_fn debe proporcionar lotes de tamaño PER_REPLICA_BATCH_SIZE . Y el tamaño de lote global para un paso se puede obtener como PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

Cuando realice una capacitación para múltiples trabajadores, debe dividir sus datos entre los trabajadores o mezclarlos con una semilla aleatoria en cada uno. Puede ver un ejemplo de cómo hacerlo en la Capacitación de múltiples trabajadores con Estimator .

Y de manera similar, también puede usar estrategias de servidores de parámetros y de trabajadores múltiples. El código sigue siendo el mismo, pero debe usar tf.estimator.train_and_evaluate y establecer las variables de entorno TF_CONFIG para cada binario que se ejecuta en su clúster.

¿Qué se admite ahora?

Existe un soporte limitado para la capacitación con Estimator utilizando todas las estrategias excepto TPUStrategy . La capacitación y la evaluación básicas deberían funcionar, pero una serie de características avanzadas como el andamio aún no funcionan. También puede haber una serie de errores en esta integración. En este momento, no planeamos mejorar activamente este soporte, y en cambio estamos enfocados en Keras y el soporte de bucle de entrenamiento personalizado. Si es posible, debería preferir usar tf.distribute con esas API en su lugar.

API de entrenamiento Estrategia reflejada Estrategia TPUS MultiWorkerMirroredStrategy Estrategia de almacenamiento central ParameterServerStrategy
API de estimador Soporte limitado No soportado Soporte limitado Soporte limitado Soporte limitado

Ejemplos y tutoriales

Estos son algunos ejemplos que muestran el uso de varias estrategias de extremo a extremo con Estimator:

  1. Capacitación de múltiples trabajadores con estimador para capacitar a MNIST con múltiples trabajadores utilizando MultiWorkerMirroredStrategy .
  2. Ejemplo de extremo a extremo para capacitación de múltiples trabajadores en tensorflow / ecosistema usando plantillas de Kubernetes. Este ejemplo comienza con un modelo de Keras y lo convierte en un Estimador utilizando la API tf.keras.estimator.model_to_estimator .
  3. Modelo oficial de ResNet50 , que se puede entrenar utilizando MirroredStrategy o MultiWorkerMirroredStrategy .

Otros temas

En esta sección, cubriremos algunos temas que son relevantes para casos de uso múltiple.

Configuración de la variable de entorno TF_CONFIG

Para la capacitación de varios trabajadores, como se mencionó anteriormente, debe establecer la variable de entorno TF_CONFIG para cada binario que se ejecuta en su clúster. La variable de entorno TF_CONFIG es una cadena JSON que especifica qué tareas constituyen un clúster, sus direcciones y el rol de cada tarea en el clúster. Proporcionamos una plantilla de Kubernetes en el repositorio de tensorflow / ecosistema que establece TF_CONFIG para sus tareas de capacitación.

Hay dos componentes de TF_CONFIG: clúster y tarea. El clúster proporciona información sobre el clúster de formación, que es un dict que consta de diferentes tipos de trabajos, como el de trabajador. En la capacitación de múltiples trabajadores, generalmente hay un trabajador que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un trabajador regular. Dicho trabajador se conoce como el trabajador 'principal', y es costumbre que el trabajador con índice 0 sea designado como el trabajador principal (de hecho, así es como se implementa tf.distribute.Strategy). Por otro lado, la tarea proporciona información de la tarea actual. El primer grupo de componentes es el mismo para todos los trabajadores, y la tarea del segundo componente es diferente en cada trabajador y especifica el tipo y el índice de ese trabajador.

Un ejemplo de TF_CONFIG es:

 os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})
 

Este TF_CONFIG especifica que hay tres trabajadores y dos tareas ps en el clúster junto con sus hosts y puertos. La parte de "tarea" especifica que el rol de la tarea actual en el clúster, trabajador 1 (el segundo trabajador). Los roles válidos en un clúster son "jefe", "trabajador", "ps" y "evaluador". No debe haber trabajo "ps" excepto cuando se usa tf.distribute.experimental.ParameterServerStrategy .

¿Que sigue?

tf.distribute.Strategy está activamente en desarrollo. Lo invitamos a probarlo y proporcionar sus comentarios utilizando los problemas de GitHub .