Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Entrenamiento distribuido con TensorFlow

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Visión general

tf.distribute.Strategy es una API de TensorFlow para distribuir el entrenamiento en múltiples GPU, múltiples máquinas o TPU. Con esta API, puede distribuir sus modelos existentes y código de entrenamiento con cambios mínimos en el código.

tf.distribute.Strategy ha sido diseñado con estos objetivos clave en mente:

  • Fácil de usar y compatible con múltiples segmentos de usuarios, incluidos investigadores, ingenieros de ML, etc.
  • Proporciona un buen rendimiento desde el primer momento.
  • Fácil cambio entre estrategias.

tf.distribute.Strategy se puede usar con una API de alto nivel como Keras , y también se puede usar para distribuir bucles de entrenamiento personalizados (y, en general, cualquier cálculo con TensorFlow).

En TensorFlow 2.x, puede ejecutar sus programas con entusiasmo o en un gráfico usando tf.function . tf.distribute.Strategy intenta admitir ambos modos de ejecución, pero funciona mejor con tf.function . El modo ansioso solo se recomienda para fines de depuración y no es compatible con TPUStrategy . Aunque discutimos el entrenamiento la mayor parte del tiempo en esta guía, esta API también se puede usar para distribuir evaluaciones y predicciones en diferentes plataformas.

Puede usar tf.distribute.Strategy con muy pocos cambios en su código, porque hemos cambiado los componentes subyacentes de TensorFlow para que sean conscientes de la estrategia. Esto incluye variables, capas, modelos, optimizadores, métricas, resúmenes y puntos de control.

En esta guía, explicamos varios tipos de estrategias y cómo puede usarlas en diferentes situaciones.

# Import TensorFlow
import tensorflow as tf

Tipos de estrategias

tf.distribute.Strategy pretende cubrir una serie de casos de uso a lo largo de diferentes ejes. Algunas de estas combinaciones son compatibles actualmente y otras se agregarán en el futuro. Algunos de estos ejes son:

  • Entrenamiento síncrono vs asincrónico: estas son dos formas comunes de distribuir el entrenamiento con paralelismo de datos. En el entrenamiento sincronizado, todos los trabajadores entrenan en diferentes segmentos de datos de entrada sincronizados y agregando gradientes en cada paso. En el entrenamiento asincrónico, todos los trabajadores se entrenan de forma independiente sobre los datos de entrada y actualizan las variables de forma asincrónica. Normalmente, el entrenamiento de sincronización se admite a través de all-reduce y async a través de la arquitectura del servidor de parámetros.
  • Plataforma de hardware: es posible que desee escalar su entrenamiento a varias GPU en una máquina, o varias máquinas en una red (con 0 o más GPU cada una), o en Cloud TPU.

Para respaldar estos casos de uso, hay seis estrategias disponibles. En la siguiente sección explicamos cuáles de estos son compatibles en qué escenarios en TF 2.2 en este momento. Aquí hay una descripción general rápida:

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API de Keras Soportado Soportado Soporte experimental Soporte experimental Publicación planificada compatible 2.3
Bucle de entrenamiento personalizado Soportado Soportado Soporte experimental Soporte experimental Publicación planificada apoyada 2.3
API de estimador Soporte limitado No soportado Soporte limitado Soporte limitado Soporte limitado

EspejoEstrategia

tf.distribute.MirroredStrategy admite el entrenamiento distribuido sincrónico en varias GPU en una máquina. Crea una réplica por dispositivo GPU. Cada variable del modelo se refleja en todas las réplicas. Juntas, estas variables forman una única variable conceptual llamada MirroredVariable . Estas variables se mantienen sincronizadas entre sí mediante la aplicación de actualizaciones idénticas.

Se utilizan algoritmos eficientes de reducción total para comunicar las actualizaciones de variables entre los dispositivos. All-reduce agrega tensores en todos los dispositivos agregándolos y los pone a disposición en cada dispositivo. Es un algoritmo fusionado que es muy eficiente y puede reducir significativamente la sobrecarga de sincronización. Hay muchos algoritmos e implementaciones de reducción total disponibles, según el tipo de comunicación disponible entre dispositivos. De forma predeterminada, utiliza NVIDIA NCCL como la implementación de reducción total. Puede elegir entre algunas otras opciones que ofrecemos o escribir la suya propia.

Esta es la forma más sencilla de crear MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Esto creará una instancia de MirroredStrategy que usará todas las GPU que son visibles para TensorFlow y usará NCCL como comunicación entre dispositivos.

Si desea usar solo algunas de las GPU en su máquina, puede hacerlo así:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Si desea anular la comunicación entre dispositivos, puede hacerlo utilizando el argumento cross_device_ops proporcionando una instancia de tf.distribute.CrossDeviceOps . Actualmente, tf.distribute.HierarchicalCopyAllReduce y tf.distribute.ReductionToOneDevice son dos opciones distintas de tf.distribute.NcclAllReduce que es la predeterminada.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy te permite ejecutar tu entrenamiento de TensorFlow en unidades de procesamiento de tensor (TPU). Los TPU son ASIC especializados de Google diseñados para acelerar drásticamente las cargas de trabajo de aprendizaje automático. Están disponibles en Google Colab, TensorFlow Research Cloud y Cloud TPU .

En términos de arquitectura de entrenamiento distribuido, TPUStrategy es la misma MirroredStrategy : implementa entrenamiento distribuido sincrónico. Las TPU proporcionan su propia implementación de operaciones colectivas eficientes de reducción total y otras en varios núcleos de TPU, que se utilizan en TPUStrategy .

Así es como podría crear TPUStrategy instancia de TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

La instancia de TPUClusterResolver ayuda a localizar las TPU. En Colab, no es necesario que le especifiques ningún argumento.

Si desea usar esto para Cloud TPU:

  • Debes especificar el nombre de tu recurso de TPU en el argumento tpu .
  • Debe inicializar el sistema tpu explícitamente al inicio del programa. Esto es necesario antes de que las TPU se puedan utilizar para el cálculo. La inicialización del sistema tpu también borra la memoria de la TPU, por lo que es importante completar este paso primero para evitar perder el estado.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy es muy similar a MirroredStrategy . Implementa entrenamiento distribuido sincrónico entre múltiples trabajadores, cada uno con potencialmente múltiples GPU. Similar a MirroredStrategy , crea copias de todas las variables en el modelo en cada dispositivo en todos los trabajadores.

Utiliza CollectiveOps como el método de comunicación de reducción total de múltiples trabajadores que se utiliza para mantener las variables sincronizadas. Una operación colectiva es una operación única en el gráfico de TensorFlow que puede elegir automáticamente un algoritmo de reducción total en el tiempo de ejecución de TensorFlow de acuerdo con el hardware, la topología de red y los tamaños de tensor.

También implementa optimizaciones de rendimiento adicionales. Por ejemplo, incluye una optimización estática que convierte múltiples reducciones totales en tensores pequeños en menos reducciones totales en tensores más grandes. Además, lo estamos diseñando para que tenga una arquitectura de complementos, de modo que en el futuro, podrá agregar algoritmos que estén mejor adaptados a su hardware. Tenga en cuenta que las operaciones colectivas también implementan otras operaciones colectivas como la transmisión y la recopilación total.

Esta es la forma más sencilla de crear MultiWorkerMirroredStrategy :

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy actualmente le permite elegir entre dos implementaciones diferentes de operaciones colectivas. CollectiveCommunication.RING implementa colectivos basados ​​en anillos utilizando gRPC como capa de comunicación. CollectiveCommunication.NCCL utiliza NCCL de Nvidia para implementar colectivos. CollectiveCommunication.AUTO difiere la elección al tiempo de ejecución. La mejor opción de implementación colectiva depende del número y tipo de GPU y de la interconexión de red en el clúster. Puede especificarlos de la siguiente manera:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

Una de las diferencias clave para poner en marcha la formación de varios trabajadores, en comparación con la formación de varias GPU, es la configuración de varios trabajadores. La variable de entorno TF_CONFIG es la forma estándar en TensorFlow de especificar la configuración del clúster para cada trabajador que forma parte del clúster. Obtenga más información sobre cómo configurar TF_CONFIG .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy realiza entrenamiento sincrónico. Las variables no se reflejan, sino que se colocan en la CPU y las operaciones se replican en todas las GPU locales. Si solo hay una GPU, todas las variables y operaciones se colocarán en esa GPU.

Cree una instancia de CentralStorageStrategy mediante:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Esto creará una instancia de CentralStorageStrategy que utilizará todas las GPU y CPU visibles. La actualización de las variables en las réplicas se agregará antes de aplicarse a las variables.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy admite el entrenamiento de servidores de parámetros en varias máquinas. En esta configuración, algunas máquinas se designan como trabajadores y otras como servidores de parámetros. Cada variable del modelo se coloca en un servidor de parámetros. La computación se replica en todas las GPU de todos los trabajadores.

En términos de código, se parece a otras estrategias:

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

Para la capacitación de varios trabajadores, TF_CONFIG debe especificar la configuración de los servidores de parámetros y los trabajadores en su clúster, sobre lo cual puede leer más en TF_CONFIG a continuación .

Otras estrategias

Además de las estrategias anteriores, existen otras dos estrategias que pueden resultar útiles para la creación de prototipos y la depuración cuando se utilizan tf.distribute API de tf.distribute .

Estrategia predeterminada

La estrategia predeterminada es una estrategia de distribución que está presente cuando no se incluye una estrategia de distribución explícita. Implementa la interfaz tf.distribute.Strategy pero es un paso a través y no proporciona una distribución real. Por ejemplo, strategy.run(fn) simplemente llamará a fn . El código escrito con esta estrategia debe comportarse exactamente como el código escrito sin ninguna estrategia. Puede pensar en ello como una estrategia "no operativa".

La estrategia predeterminada es un singleton, y no se pueden crear más instancias de él. Se puede obtener usando tf.distribute.get_strategy() fuera del alcance de cualquier estrategia explícita (la misma API que se puede usar para obtener la estrategia actual dentro del alcance de una estrategia explícita).

default_strategy = tf.distribute.get_strategy()

Esta estrategia tiene dos propósitos principales:

  • Permite escribir código de biblioteca compatible con la distribución de forma incondicional. Por ejemplo, en el optimizador, podemos hacer tf.distribute.get_strategy() y usar esa estrategia para reducir los gradientes; siempre devolverá un objeto de estrategia en el que podemos llamar a la API de reducción.
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Al igual que el código de biblioteca, se puede utilizar para escribir programas de usuarios finales para trabajar con y sin estrategia de distribución, sin requerir lógica condicional. Un fragmento de código de muestra que ilustra esto:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy es una estrategia para colocar todas las variables y el cálculo en un solo dispositivo especificado.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Esta estrategia es distinta de la estrategia predeterminada en varias formas. En la estrategia predeterminada, la lógica de ubicación de las variables no cambia en comparación con la ejecución de TensorFlow sin ninguna estrategia de distribución. Pero cuando se usa OneDeviceStrategy , todas las variables creadas en su alcance se colocan explícitamente en el dispositivo especificado. Además, cualquier función llamada a través de OneDeviceStrategy.run también se colocará en el dispositivo especificado.

La entrada distribuida a través de esta estrategia se precargará en el dispositivo especificado. En la estrategia predeterminada, no hay distribución de entrada.

De manera similar a la estrategia predeterminada, esta estrategia también podría usarse para probar su código antes de cambiar a otras estrategias que realmente distribuyen a múltiples dispositivos / máquinas. Esto ejercitará la maquinaria de la estrategia de distribución un poco más que la estrategia predeterminada, pero no en toda su extensión como cuando se usa MirroredStrategy o TPUStrategy etc. Si desea un código que se comporte como si no tuviera una estrategia, utilice la estrategia predeterminada.

Hasta ahora hemos hablado sobre cuáles son las diferentes estrategias disponibles y cómo puede instanciarlas. En las próximas secciones, hablaremos sobre las diferentes formas en las que puede utilizarlos para distribuir su formación. Mostraremos fragmentos de código cortos en esta guía y enlazaremos a tutoriales completos que puede ejecutar de un extremo a otro.

Usando tf.distribute.Strategy con tf.keras.Model.fit

Hemos integrado tf.distribute.Strategy en tf.keras que es la implementación de TensorFlow de la especificación de la API de Keras . tf.keras es una API de alto nivel para construir y entrenar modelos. Al integrarnos en el backend de tf.keras , hemos facilitado la distribución de la formación escrita en el marco de formación de Keras mediante model.fit .

Esto es lo que necesita cambiar en su código:

  1. Cree una instancia del tf.distribute.Strategy apropiado.
  2. Mueva la creación del modelo, el optimizador y las métricas de Keras dentro de strategy.scope .

Admitimos todo tipo de modelos de Keras: secuenciales, funcionales y subclasificados.

Aquí hay un fragmento de código para hacer esto para un modelo Keras muy simple con una capa densa:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

En este ejemplo usamos MirroredStrategy para poder ejecutar esto en una máquina con múltiples GPU. strategy.scope() indica a Keras qué estrategia utilizar para distribuir la formación. La creación de modelos / optimizadores / métricas dentro de este ámbito nos permite crear variables distribuidas en lugar de variables regulares. Una vez que esto esté configurado, puede ajustar su modelo como lo haría normalmente. MirroredStrategy se encarga de replicar el entrenamiento del modelo en las GPU disponibles, agregar gradientes y más.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 3.4059
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 1.5054
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 0.9349

0.9348920583724976

Aquí usamos un tf.data.Dataset para proporcionar la entrada de entrenamiento y evaluación. También puede usar matrices numpy:

import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.6654
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.2941

<tensorflow.python.keras.callbacks.History at 0x7f8dd86cbcc0>

En ambos casos (conjunto de datos o cantidad), cada lote de la entrada dada se divide por igual entre las múltiples réplicas. Por ejemplo, si usa MirroredStrategy con 2 GPU, cada lote de tamaño 10 se dividirá entre las 2 GPU, y cada una recibirá 5 ejemplos de entrada en cada paso. Cada época se entrenará más rápido a medida que agregue más GPU. Normalmente, querrá aumentar el tamaño de su lote a medida que agrega más aceleradores para hacer un uso efectivo de la potencia de cálculo adicional. También deberá volver a ajustar su tasa de aprendizaje, según el modelo. Puede usar strategy.num_replicas_in_sync para obtener el número de réplicas.

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

¿Qué es compatible ahora?

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API de Keras Soportado Soportado Soporte experimental Soporte experimental Soporte planificado puesto 2.3

Ejemplos y tutoriales

Aquí hay una lista de tutoriales y ejemplos que ilustran la integración anterior de principio a fin con Keras:

  1. Tutorial para entrenar MNIST con MirroredStrategy .
  2. Tutorial para entrenar MNIST usando MultiWorkerMirroredStrategy .
  3. Guía para entrenar MNIST usando TPUStrategy .
  4. Repositorio de TensorFlow Model Garden que contiene colecciones de modelos de última generación implementados mediante diversas estrategias.

Usando tf.distribute.Strategy con ciclos de entrenamiento personalizados

Como ha visto, usar tf.distribute.Strategy con Keras model.fit requiere cambiar solo un par de líneas de su código. Con un poco más de esfuerzo, también puede usar tf.distribute.Strategy con ciclos de entrenamiento personalizados.

Si necesita más flexibilidad y control sobre sus ciclos de entrenamiento de lo que es posible con Estimator o Keras, puede escribir ciclos de entrenamiento personalizados. Por ejemplo, al usar un GAN, es posible que desee realizar un número diferente de pasos de generador o discriminador en cada ronda. Del mismo modo, los marcos de alto nivel no son muy adecuados para la formación de aprendizaje por refuerzo.

Para admitir ciclos de entrenamiento personalizados, proporcionamos un conjunto básico de métodos a través de las clases tf.distribute.Strategy . El uso de estos puede requerir una reestructuración menor del código inicialmente, pero una vez hecho esto, debería poder cambiar entre GPU, TPU y varias máquinas simplemente cambiando la instancia de estrategia.

Aquí mostraremos un breve fragmento que ilustra este caso de uso para un ejemplo de entrenamiento simple usando el mismo modelo de Keras que antes.

Primero, creamos el modelo y el optimizador dentro del alcance de la estrategia. Esto asegura que cualquier variable creada con el modelo y el optimizador sean variables reflejadas.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

A continuación, creamos el conjunto de datos de entrada y llamamos a tf.distribute.Strategy.experimental_distribute_dataset para distribuir el conjunto de datos en función de la estrategia.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

Luego, definimos un paso del entrenamiento. Usaremos tf.GradientTape para calcular gradientes y optimizador para aplicar esos gradientes para actualizar las variables de nuestro modelo. Para distribuir este paso de entrenamiento, colocamos una función train_step y la pasamos a tf.distrbute.Strategy.run junto con las entradas del conjunto de datos que obtenemos de dist_dataset creado antes:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Algunas otras cosas a tener en cuenta en el código anterior:

  1. Usamos tf.nn.compute_average_loss para calcular la pérdida. tf.nn.compute_average_loss suma la pérdida por ejemplo y divide la suma por el global_batch_size. Esto es importante porque más tarde, después de que se calculan los gradientes en cada réplica, se agregan a través de las réplicas sumándolas .
  2. Usamos la API tf.distribute.Strategy.reduce para agregar los resultados devueltos por tf.distribute.Strategy.run . tf.distribute.Strategy.run devuelve resultados de cada réplica local en la estrategia y hay varias formas de consumir este resultado. Puede reduce para obtener un valor agregado. También puede hacer tf.distribute.Strategy.experimental_local_results para obtener la lista de valores contenidos en el resultado, uno por réplica local.
  3. Cuando se llama a apply_gradients dentro del alcance de una estrategia de distribución, se modifica su comportamiento. Específicamente, antes de aplicar gradientes en cada instancia paralela durante el entrenamiento sincrónico, realiza una suma total de réplicas de los gradientes.

Finalmente, una vez que hayamos definido el paso de entrenamiento, podemos iterar sobre dist_dataset y ejecutar el entrenamiento en un ciclo:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.3262986, shape=(), dtype=float32)
tf.Tensor(0.32475147, shape=(), dtype=float32)
tf.Tensor(0.3232167, shape=(), dtype=float32)
tf.Tensor(0.32169423, shape=(), dtype=float32)
tf.Tensor(0.32018384, shape=(), dtype=float32)
tf.Tensor(0.3186855, shape=(), dtype=float32)
tf.Tensor(0.317199, shape=(), dtype=float32)
tf.Tensor(0.31572425, shape=(), dtype=float32)
tf.Tensor(0.31426117, shape=(), dtype=float32)
tf.Tensor(0.31280956, shape=(), dtype=float32)
tf.Tensor(0.3113694, shape=(), dtype=float32)
tf.Tensor(0.30994043, shape=(), dtype=float32)
tf.Tensor(0.30852267, shape=(), dtype=float32)
tf.Tensor(0.30711594, shape=(), dtype=float32)
tf.Tensor(0.30572012, shape=(), dtype=float32)
tf.Tensor(0.30433518, shape=(), dtype=float32)
tf.Tensor(0.3029609, shape=(), dtype=float32)
tf.Tensor(0.30159724, shape=(), dtype=float32)
tf.Tensor(0.30024406, shape=(), dtype=float32)
tf.Tensor(0.29890123, shape=(), dtype=float32)

En el ejemplo anterior, dist_dataset sobre dist_dataset para proporcionar información para su entrenamiento. También proporcionamos tf.distribute.Strategy.make_experimental_numpy_dataset para admitir numerosas entradas. Puede utilizar esta API para crear un conjunto de datos antes de llamar a tf.distribute.Strategy.experimental_distribute_dataset .

Otra forma de iterar sobre sus datos es usar iteradores explícitamente. Es posible que desee hacer esto cuando desee ejecutar una determinada cantidad de pasos en lugar de iterar sobre todo el conjunto de datos. La iteración anterior ahora se modificaría para crear primero un iterador y luego llamar explícitamente a next para obtener los datos de entrada.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.2975687, shape=(), dtype=float32)
tf.Tensor(0.2962464, shape=(), dtype=float32)
tf.Tensor(0.29493415, shape=(), dtype=float32)
tf.Tensor(0.29363185, shape=(), dtype=float32)
tf.Tensor(0.2923394, shape=(), dtype=float32)
tf.Tensor(0.29105672, shape=(), dtype=float32)
tf.Tensor(0.28978375, shape=(), dtype=float32)
tf.Tensor(0.28852034, shape=(), dtype=float32)
tf.Tensor(0.2872664, shape=(), dtype=float32)
tf.Tensor(0.28602186, shape=(), dtype=float32)

Esto cubre el caso más simple de usar la API tf.distribute.Strategy para distribuir ciclos de entrenamiento personalizados. Estamos en el proceso de mejorar estas API. Dado que este caso de uso requiere más trabajo para adaptar su código, publicaremos una guía detallada por separado en el futuro.

¿Qué es compatible ahora?

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Bucle de entrenamiento personalizado Soportado Soportado Soporte experimental Soporte experimental Soporte planificado puesto 2.3

Ejemplos y tutoriales

A continuación, se muestran algunos ejemplos para usar la estrategia de distribución con ciclos de entrenamiento personalizados:

  1. Tutorial para entrenar MNIST usando MirroredStrategy .
  2. Guía para entrenar MNIST usando TPUStrategy .
  3. Repositorio de TensorFlow Model Garden que contiene colecciones de modelos de última generación implementados mediante varias estrategias.

Usando tf.distribute.Strategy con Estimator (soporte limitado)

tf.estimator es una API de TensorFlow de entrenamiento distribuido que originalmente admitía el enfoque de servidor de parámetros asíncronos. Al igual que con Keras, hemos integrado tf.distribute.Strategy en tf.Estimator . Si está utilizando Estimator para su capacitación, puede cambiar fácilmente a capacitación distribuida con muy pocos cambios en su código. Con esto, los usuarios de Estimator ahora pueden realizar entrenamiento distribuido sincrónico en múltiples GPU y múltiples trabajadores, así como también usar TPU. Sin embargo, este soporte en Estimator es limitado. Consulte la sección Qué se admite ahora a continuación para obtener más detalles.

El uso de tf.distribute.Strategy con Estimator es ligeramente diferente al caso de Keras. En lugar de utilizar strategy.scope , ahora pasamos el objeto de estrategia a RunConfig para el Estimator.

Aquí hay un fragmento de código que muestra esto con un Estimator LinearRegressor y MirroredStrategy LinearRegressor :

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpbnyibny6
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpbnyibny6', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8dd83bc518>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8dd83bc518>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Aquí usamos un Estimador prediseñado, pero el mismo código también funciona con un Estimador personalizado. train_distribute determina cómo se distribuirá el entrenamiento y eval_distribute determina cómo se distribuirá la evaluación. Esta es otra diferencia con Keras, donde usamos la misma estrategia tanto para el entrenamiento como para la evaluación.

Ahora podemos entrenar y evaluar este Estimador con una función de entrada:

def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd8317ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd8317ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpbnyibny6/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmpbnyibny6/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd83c6730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd83c6730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-09-10T01:28:07Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpbnyibny6/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.24097s
INFO:tensorflow:Finished evaluation at 2020-09-10-01:28:07
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmpbnyibny6/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Otra diferencia a destacar aquí entre Estimator y Keras es el manejo de entrada. En Keras, mencionamos que cada lote del conjunto de datos se divide automáticamente en las múltiples réplicas. En Estimator, sin embargo, no dividimos automáticamente el lote ni dividimos automáticamente los datos entre diferentes trabajadores. Tiene control total sobre cómo desea que se distribuyan sus datos entre los trabajadores y los dispositivos, y debe proporcionar un input_fn para especificar cómo distribuir sus datos.

Su input_fn se llama una vez por trabajador, lo que proporciona un conjunto de datos por trabajador. Luego, un lote de ese conjunto de datos se alimenta a una réplica en ese trabajador, consumiendo así N lotes para N réplicas en 1 trabajador. En otras palabras, el conjunto de datos devuelto por input_fn debe proporcionar lotes de tamaño PER_REPLICA_BATCH_SIZE . Y el tamaño de lote global para un paso se puede obtener como PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

Al realizar la capacitación de varios trabajadores, debe dividir sus datos entre los trabajadores o mezclarlos con una semilla aleatoria en cada uno. Puede ver un ejemplo de cómo hacer esto en la Capacitación para varios trabajadores con Estimator .

Y de manera similar, también puede usar estrategias de servidor de parámetros y de trabajadores múltiples. El código sigue siendo el mismo, pero debe usar tf.estimator.train_and_evaluate y configurar las variables de entorno TF_CONFIG para cada binario que se ejecuta en su clúster.

¿Qué es compatible ahora?

Hay soporte limitado para entrenar con Estimator usando todas las estrategias excepto TPUStrategy . La capacitación y la evaluación básicas deberían funcionar, pero una serie de funciones avanzadas, como el andamio, aún no funcionan. También puede haber una serie de errores en esta integración. En este momento, no planeamos mejorar activamente este soporte, sino que nos centramos en Keras y el soporte de bucle de entrenamiento personalizado. Si es posible, debería preferir utilizar tf.distribute con esas API.

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API de estimador Soporte limitado No soportado Soporte limitado Soporte limitado Soporte limitado

Ejemplos y tutoriales

A continuación, se muestran algunos ejemplos que muestran el uso de un extremo a otro de varias estrategias con Estimator:

  1. Capacitación de varios trabajadores con Estimator para capacitar a MNIST con varios trabajadores utilizando MultiWorkerMirroredStrategy .
  2. Ejemplo de extremo a extremo para la capacitación de varios trabajadores en tensorflow / ecosistema utilizando plantillas de Kubernetes. Este ejemplo comienza con un modelo de Keras y lo convierte en un Estimator usando la API tf.keras.estimator.model_to_estimator .
  3. Modelo oficial de ResNet50 , que se puede entrenar utilizando MirroredStrategy o MultiWorkerMirroredStrategy .

Otros temas

En esta sección, cubriremos algunos temas que son relevantes para múltiples casos de uso.

Configuración de la variable de entorno TF_CONFIG

Para la capacitación de varios trabajadores, como se mencionó anteriormente, debe configurar la variable de entorno TF_CONFIG para cada binario que se ejecuta en su clúster. La variable de entorno TF_CONFIG es una cadena JSON que especifica qué tareas constituyen un clúster, sus direcciones y la función de cada tarea en el clúster. Proporcionamos una plantilla de Kubernetes en el repositorio de tensorflow / ecosistema que configura TF_CONFIG para sus tareas de entrenamiento.

Hay dos componentes de TF_CONFIG: clúster y tarea. cluster proporciona información sobre el cluster de formación, que es un diccionario que consta de diferentes tipos de trabajos, como trabajador. En la capacitación de varios trabajadores, generalmente hay un trabajador que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un trabajador normal. A dicho trabajador se le denomina trabajador 'jefe', y es habitual que el trabajador con índice 0 sea designado como trabajador principal (de hecho, así es como se implementa tf.distribute.Strategy). tarea, por otro lado, proporciona información de la tarea actual. El primer grupo de componentes es el mismo para todos los trabajadores, y la tarea del segundo componente es diferente en cada trabajador y especifica el tipo y el índice de ese trabajador.

Un ejemplo de TF_CONFIG es:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Este TF_CONFIG especifica que hay tres trabajadores y dos tareas ps en el clúster junto con sus hosts y puertos. La parte "tarea" especifica que el rol de la tarea actual en el clúster, trabajador 1 (el segundo trabajador). Los roles válidos en un clúster son "jefe", "trabajador", "ps" y "evaluador". No debería haber ningún trabajo "ps" excepto cuando se usa tf.distribute.experimental.ParameterServerStrategy .

¿Que sigue?

tf.distribute.Strategy está activamente en desarrollo. Le invitamos a que lo pruebe y proporcione sus comentarios sobre los problemas de GitHub .