Entrenamiento distribuido con TensorFlow

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar libreta

Descripción general

tf.distribute.Strategy es una API de TensorFlow para distribuir la capacitación en múltiples GPU, múltiples máquinas o TPU. Con esta API, puede distribuir sus modelos existentes y el código de entrenamiento con cambios mínimos en el código.

tf.distribute.Strategy ha sido diseñado con estos objetivos clave en mente:

  • Fácil de usar y compatible con múltiples segmentos de usuarios, incluidos investigadores, ingenieros de aprendizaje automático, etc.
  • Proporcione un buen rendimiento fuera de la caja.
  • Fácil cambio entre estrategias.

Puede distribuir el entrenamiento usando tf.distribute.Strategy con una API de alto nivel como Keras Model.fit , así como bucles de entrenamiento personalizados (y, en general, cualquier cálculo usando TensorFlow).

En TensorFlow 2.x, puede ejecutar sus programas con entusiasmo o en un gráfico usando tf.function . tf.distribute.Strategy pretende admitir estos dos modos de ejecución, pero funciona mejor con tf.function . El modo Eager solo se recomienda con fines de depuración y no es compatible con tf.distribute.TPUStrategy . Aunque la capacitación es el enfoque de esta guía, esta API también se puede usar para distribuir evaluaciones y predicciones en diferentes plataformas.

Puede usar tf.distribute.Strategy con muy pocos cambios en su código, porque los componentes subyacentes de TensorFlow se cambiaron para que tengan en cuenta la estrategia. Esto incluye variables, capas, modelos, optimizadores, métricas, resúmenes y puntos de control.

En esta guía, aprenderá sobre varios tipos de estrategias y cómo puede usarlas en diferentes situaciones. Para obtener información sobre cómo depurar problemas de rendimiento, consulte la guía de rendimiento de la GPU Optimize TensorFlow .

Configurar TensorFlow

import tensorflow as tf

Tipos de estrategias

tf.distribute.Strategy tiene la intención de cubrir una serie de casos de uso a lo largo de diferentes ejes. Algunas de estas combinaciones son compatibles actualmente y otras se agregarán en el futuro. Algunos de estos ejes son:

  • Entrenamiento sincrónico frente a asincrónico: estas son dos formas comunes de distribuir el entrenamiento con paralelismo de datos. En el entrenamiento sincronizado, todos los trabajadores entrenan sobre diferentes porciones de datos de entrada sincronizados y agregando gradientes en cada paso. En el entrenamiento asíncrono, todos los trabajadores se entrenan de forma independiente sobre los datos de entrada y actualizan las variables de forma asíncrona. Por lo general, el entrenamiento de sincronización se admite a través de all-reduce y asíncrono a través de la arquitectura del servidor de parámetros.
  • Plataforma de hardware: es posible que desee escalar su capacitación a varias GPU en una máquina, o varias máquinas en una red (con 0 o más GPU cada una), o en Cloud TPU.

Para admitir estos casos de uso, TensorFlow tiene MirroredStrategy , TPUStrategy , MultiWorkerMirroredStrategy , ParameterServerStrategy , CentralStorageStrategy , así como otras estrategias disponibles. La siguiente sección explica cuáles de estos son compatibles en qué escenarios en TensorFlow. Aquí hay una descripción general rápida:

API de entrenamiento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Model.fit Soportado Soportado Soportado Soporte experimental Soporte experimental
Bucle de entrenamiento personalizado Soportado Soportado Soportado Soporte experimental Soporte experimental
API del estimador Soporte limitado No soportado Soporte limitado Soporte limitado Soporte limitado

Estrategia reflejada

tf.distribute.MirroredStrategy admite el entrenamiento distribuido síncrono en múltiples GPU en una máquina. Crea una réplica por dispositivo GPU. Cada variable del modelo se refleja en todas las réplicas. Juntas, estas variables forman una sola variable conceptual llamada MirroredVariable . Estas variables se mantienen sincronizadas entre sí mediante la aplicación de actualizaciones idénticas.

Se utilizan algoritmos de reducción total eficientes para comunicar las actualizaciones de variables a través de los dispositivos. All-reduce los tensores agregados en todos los dispositivos sumándolos y poniéndolos a disposición en cada dispositivo. Es un algoritmo fusionado que es muy eficiente y puede reducir significativamente la sobrecarga de sincronización. Hay muchos algoritmos e implementaciones de reducción total disponibles, según el tipo de comunicación disponible entre dispositivos. De forma predeterminada, utiliza la biblioteca de comunicación colectiva de NVIDIA ( NCCL ) como implementación de reducción total. Puede elegir entre algunas otras opciones o escribir las suyas propias.

Esta es la forma más sencilla de crear MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Esto creará una instancia de MirroredStrategy , que usará todas las GPU visibles para TensorFlow y NCCL, como comunicación entre dispositivos.

Si desea usar solo algunas de las GPU en su máquina, puede hacerlo así:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Si desea anular la comunicación entre dispositivos, puede hacerlo usando el argumento cross_device_ops proporcionando una instancia de tf.distribute.CrossDeviceOps . Actualmente, tf.distribute.HierarchicalCopyAllReduce y tf.distribute.ReductionToOneDevice son dos opciones además de tf.distribute.NcclAllReduce , que es la predeterminada.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUEstrategia

tf.distribute.TPUStrategy le permite ejecutar su capacitación de TensorFlow en unidades de procesamiento de tensor (TPU) . Las TPU son los ASIC especializados de Google diseñados para acelerar drásticamente las cargas de trabajo de aprendizaje automático. Están disponibles en Google Colab , TPU Research Cloud y Cloud TPU .

En términos de arquitectura de entrenamiento distribuido, TPUStrategy es el mismo MirroredStrategy : implementa entrenamiento distribuido síncrono. Las TPU proporcionan su propia implementación de operaciones eficientes de reducción total y otras operaciones colectivas en múltiples núcleos de TPU, que se utilizan en TPUStrategy .

Así es como crearía una instancia TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

La instancia TPUClusterResolver ayuda a localizar las TPU. En Colab, no necesita especificarle ningún argumento.

Si quieres usar esto para Cloud TPU:

  • Debe especificar el nombre de su recurso de TPU en el argumento tpu .
  • Debe inicializar el sistema de TPU explícitamente al inicio del programa. Esto es necesario antes de que las TPU se puedan usar para el cálculo. La inicialización del sistema de TPU también borra la memoria de TPU, por lo que es importante completar este paso primero para evitar perder el estado.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy es muy similar a MirroredStrategy . Implementa capacitación distribuida sincrónica entre múltiples trabajadores, cada uno con potencialmente múltiples GPU. Similar a tf.distribute.MirroredStrategy , crea copias de todas las variables en el modelo en cada dispositivo para todos los trabajadores.

Esta es la forma más sencilla de crear MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy tiene dos implementaciones para comunicaciones entre dispositivos. CommunicationImplementation.RING está basado en RPC y es compatible con CPU y GPU. CommunicationImplementation.NCCL usa NCCL y proporciona un rendimiento de última generación en las GPU, pero no es compatible con las CPU. CollectiveCommunication.AUTO difiere la elección a Tensorflow. Puede especificarlos de la siguiente manera:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Una de las diferencias clave para poner en marcha la capacitación de varios trabajadores, en comparación con la capacitación de múltiples GPU, es la configuración de varios trabajadores. La variable de entorno 'TF_CONFIG' es la forma estándar en TensorFlow de especificar la configuración del clúster para cada trabajador que forma parte del clúster. Obtenga más información en la sección de configuración de TF_CONFIG de este documento.

Para obtener más detalles sobre MultiWorkerMirroredStrategy , considere los siguientes tutoriales:

ParámetroServidorEstrategia

El entrenamiento del servidor de parámetros es un método paralelo de datos común para escalar el entrenamiento del modelo en varias máquinas. Un clúster de entrenamiento de servidor de parámetros consta de trabajadores y servidores de parámetros. Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. Consulte el tutorial de capacitación del servidor de parámetros para obtener más información.

En TensorFlow 2, el entrenamiento del servidor de parámetros utiliza una arquitectura basada en un coordinador central a través de la clase tf.distribute.experimental.coordinator.ClusterCoordinator .

En esta implementación, las tareas del parameter server y del worker ejecutan tf.distribute.Server s que escuchan las tareas del coordinador. El coordinador crea recursos, distribuye tareas de capacitación, escribe puntos de control y se ocupa de fallas en las tareas.

En la programación que se ejecuta en el coordinador, utilizará un objeto ParameterServerStrategy para definir un paso de capacitación y utilizará un ClusterCoordinator para enviar los pasos de capacitación a los trabajadores remotos. Esta es la forma más sencilla de crearlos:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

Para obtener más información sobre ParameterServerStrategy , consulte el entrenamiento del servidor de parámetros con Keras Model.fit y un tutorial de bucle de entrenamiento personalizado .

En TensorFlow 1, ParameterServerStrategy solo está disponible con un Estimator a través del símbolo tf.compat.v1.distribute.experimental.ParameterServerStrategy .

Estrategia de almacenamiento central

tf.distribute.experimental.CentralStorageStrategy realiza entrenamiento síncrono. Las variables no se reflejan, sino que se colocan en la CPU y las operaciones se replican en todas las GPU locales. Si solo hay una GPU, todas las variables y operaciones se colocarán en esa GPU.

Cree una instancia de CentralStorageStrategy mediante:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Esto creará una instancia de CentralStorageStrategy que usará todas las GPU y CPU visibles. La actualización de las variables en las réplicas se agregará antes de aplicarse a las variables.

Otras estrategias

Además de las estrategias anteriores, hay otras dos estrategias que pueden ser útiles para la creación de prototipos y la depuración cuando se usan las API de tf.distribute .

Estrategia predeterminada

La estrategia predeterminada es una estrategia de distribución que está presente cuando no hay una estrategia de distribución explícita dentro del alcance. Implementa la interfaz tf.distribute.Strategy pero es una transferencia y no proporciona una distribución real. Por ejemplo, Strategy.run(fn) simplemente llamará a fn . El código escrito con esta estrategia debe comportarse exactamente como el código escrito sin ninguna estrategia. Puede pensar en ello como una estrategia "sin operaciones".

La estrategia predeterminada es única, y no se pueden crear más instancias de ella. Se puede obtener usando tf.distribute.get_strategy fuera del alcance de cualquier estrategia explícita (la misma API que se puede usar para obtener la estrategia actual dentro del alcance de una estrategia explícita).

default_strategy = tf.distribute.get_strategy()

Esta estrategia tiene dos propósitos principales:

  • Permite escribir incondicionalmente código de biblioteca consciente de la distribución. Por ejemplo, en tf.optimizer s puede usar tf.distribute.get_strategy y usar esa estrategia para reducir gradientes; siempre devolverá un objeto de estrategia en el que puede llamar a la API Strategy.reduce .
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Similar al código de biblioteca, se puede utilizar para escribir programas de usuarios finales para trabajar con y sin estrategia de distribución, sin necesidad de lógica condicional. Aquí hay un fragmento de código de muestra que ilustra esto:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy es una estrategia para colocar todas las variables y el cálculo en un solo dispositivo específico.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Esta estrategia es distinta de la estrategia predeterminada en varios aspectos. En la estrategia predeterminada, la lógica de ubicación de variables permanece sin cambios en comparación con la ejecución de TensorFlow sin ninguna estrategia de distribución. Pero cuando se usa OneDeviceStrategy , todas las variables creadas en su ámbito se colocan explícitamente en el dispositivo especificado. Además, cualquier función llamada a través OneDeviceStrategy.run también se colocará en el dispositivo especificado.

La entrada distribuida a través de esta estrategia se precargará en el dispositivo especificado. En la estrategia predeterminada, no hay distribución de entrada.

Similar a la estrategia predeterminada, esta estrategia también podría usarse para probar su código antes de cambiar a otras estrategias que realmente distribuyen a múltiples dispositivos/máquinas. Esto ejercitará la maquinaria de estrategia de distribución un poco más que la estrategia predeterminada, pero no en la medida total de usar, por ejemplo, MirroredStrategy o TPUStrategy . Si desea un código que se comporte como si no hubiera una estrategia, utilice la estrategia predeterminada.

Hasta ahora ha aprendido acerca de diferentes estrategias y cómo puede instanciarlas. Las siguientes secciones muestran las diferentes formas en que puede usarlos para distribuir su capacitación.

Utilice tf.distribute.Strategy con Keras Model.fit

tf.distribute.Strategy está integrado en tf.keras , que es la implementación de TensorFlow de la especificación Keras API . tf.keras es una API de alto nivel para construir y entrenar modelos. Al integrarse en el backend de tf.keras , es fácil para usted distribuir su capacitación escrita en el marco de capacitación de Keras usando Model.fit .

Esto es lo que necesita cambiar en su código:

  1. Cree una instancia del tf.distribute.Strategy apropiado.
  2. Mueva la creación del modelo, el optimizador y las métricas de Keras dentro de la strategy.scope .

Las estrategias de distribución de TensorFlow admiten todos los tipos de modelos de Keras: secuencial , funcional y subclase .

Aquí hay un fragmento de código para hacer esto para un modelo Keras muy simple con una capa Dense :

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Este ejemplo usa MirroredStrategy , por lo que puede ejecutarlo en una máquina con varias GPU. strategy.scope() le indica a Keras qué estrategia usar para distribuir el entrenamiento. La creación de modelos/optimizadores/métricas dentro de este ámbito le permite crear variables distribuidas en lugar de variables regulares. Una vez que esto esté configurado, puede ajustar su modelo como lo haría normalmente. MirroredStrategy se encarga de replicar el entrenamiento del modelo en las GPU disponibles, agregar gradientes y más.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 2.2552
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9968
2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.6190
0.6190494298934937

Aquí, un tf.data.Dataset proporciona la entrada de entrenamiento y evaluación. También puede usar matrices NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 0.4406
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1947
<keras.callbacks.History at 0x7fb81813d2d0>

En ambos casos, con Dataset o NumPy, cada lote de la entrada dada se divide en partes iguales entre las múltiples réplicas. Por ejemplo, si usa MirroredStrategy con 2 GPU, cada lote de tamaño 10 se dividirá entre las 2 GPU, y cada uno recibirá 5 ejemplos de entrada en cada paso. Cada época se entrenará más rápido a medida que agregue más GPU. Por lo general, le gustaría aumentar el tamaño de su lote a medida que agrega más aceleradores, para hacer un uso efectivo de la potencia informática adicional. También deberá reajustar su tasa de aprendizaje, según el modelo. Puede usar la strategy.num_replicas_in_sync para obtener la cantidad de réplicas.

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

¿Qué es compatible ahora?

API de entrenamiento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Model.fit Soportado Soportado Soportado Soporte experimental Soporte experimental

Ejemplos y tutoriales

Aquí hay una lista de tutoriales y ejemplos que ilustran la integración anterior de principio a fin con Keras Model.fit :

  1. Tutorial : Entrenamiento con Model.fit y MirroredStrategy .
  2. Tutorial : Entrenamiento con Model.fit y MultiWorkerMirroredStrategy .
  3. Guía : contiene un ejemplo del uso de Model.fit y TPUStrategy .
  4. Tutorial : Entrenamiento del servidor de parámetros con Model.fit y ParameterServerStrategy .
  5. Tutorial : Ajuste de BERT para muchas tareas desde el banco de pruebas GLUE con Model.fit y TPUStrategy .
  6. Repositorio TensorFlow Model Garden que contiene colecciones de modelos de última generación implementados mediante diversas estrategias.

Use tf.distribute.Strategy con bucles de entrenamiento personalizados

Como se demostró anteriormente, usar tf.distribute.Strategy con Keras Model.fit requiere cambiar solo un par de líneas de su código. Con un poco más de esfuerzo, también puede usar tf.distribute.Strategy con bucles de entrenamiento personalizados .

Si necesita más flexibilidad y control sobre sus bucles de entrenamiento de lo que es posible con Estimator o Keras, puede escribir bucles de entrenamiento personalizados. Por ejemplo, al usar una GAN, es posible que desee tomar una cantidad diferente de pasos de generador o discriminador en cada ronda. Del mismo modo, los marcos de trabajo de alto nivel no son muy adecuados para el entrenamiento de aprendizaje por refuerzo.

Las clases tf.distribute.Strategy proporcionan un conjunto básico de métodos para admitir bucles de entrenamiento personalizados. El uso de estos puede requerir una pequeña reestructuración del código inicialmente, pero una vez hecho esto, debería poder cambiar entre GPU, TPU y varias máquinas simplemente cambiando la instancia de la estrategia.

A continuación, se muestra un breve fragmento que ilustra este caso de uso para un ejemplo de capacitación simple que usa el mismo modelo de Keras que antes.

Primero, cree el modelo y el optimizador dentro del alcance de la estrategia. Esto garantiza que todas las variables creadas con el modelo y el optimizador sean variables reflejadas.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

A continuación, cree el conjunto de datos de entrada y llame a tf.distribute.Strategy.experimental_distribute_dataset para distribuir el conjunto de datos según la estrategia.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Luego, defina un paso del entrenamiento. Use tf.GradientTape para calcular gradientes y el optimizador para aplicar esos gradientes para actualizar las variables de su modelo. Para distribuir este paso de entrenamiento, póngalo en una función train_step y páselo a tf.distribute.Strategy.run junto con las entradas del conjunto de datos que obtuvo del dist_dataset creado antes:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Algunas otras cosas a tener en cuenta en el código anterior:

  1. Usó tf.nn.compute_average_loss para calcular la pérdida. tf.nn.compute_average_loss suma la pérdida por ejemplo y divide la suma por global_batch_size . Esto es importante porque más tarde, después de calcular los gradientes en cada réplica, se agregan a través de las réplicas sumándolas .
  2. También usó la API tf.distribute.Strategy.reduce para agregar los resultados devueltos por tf.distribute.Strategy.run . tf.distribute.Strategy.run devuelve resultados de cada réplica local en la estrategia, y hay varias formas de consumir este resultado. Puede reduce para obtener un valor agregado. También puede hacer tf.distribute.Strategy.experimental_local_results para obtener la lista de valores contenidos en el resultado, uno por réplica local.
  3. Cuando llama a apply_gradients dentro del alcance de una estrategia de distribución, se modifica su comportamiento. Específicamente, antes de aplicar gradientes en cada instancia paralela durante el entrenamiento síncrono, realiza una suma sobre todas las réplicas de los gradientes.

Finalmente, una vez que haya definido el paso de entrenamiento, puede iterar sobre dist_dataset y ejecutar el entrenamiento en un bucle:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32)
tf.Tensor(0.18628375, shape=(), dtype=float32)
tf.Tensor(0.18570684, shape=(), dtype=float32)
tf.Tensor(0.18513316, shape=(), dtype=float32)
tf.Tensor(0.1845627, shape=(), dtype=float32)
tf.Tensor(0.18399543, shape=(), dtype=float32)
tf.Tensor(0.18343134, shape=(), dtype=float32)
tf.Tensor(0.18287037, shape=(), dtype=float32)
tf.Tensor(0.18231256, shape=(), dtype=float32)
tf.Tensor(0.18175781, shape=(), dtype=float32)
tf.Tensor(0.18120615, shape=(), dtype=float32)
tf.Tensor(0.18065754, shape=(), dtype=float32)
tf.Tensor(0.18011193, shape=(), dtype=float32)
tf.Tensor(0.17956935, shape=(), dtype=float32)
tf.Tensor(0.17902976, shape=(), dtype=float32)
tf.Tensor(0.17849308, shape=(), dtype=float32)
tf.Tensor(0.17795937, shape=(), dtype=float32)
tf.Tensor(0.17742859, shape=(), dtype=float32)
tf.Tensor(0.17690066, shape=(), dtype=float32)
tf.Tensor(0.17637561, shape=(), dtype=float32)

En el ejemplo anterior, iteró sobre dist_dataset para proporcionar información a su entrenamiento. También se le proporciona el tf.distribute.Strategy.make_experimental_numpy_dataset para admitir entradas NumPy. Puede usar esta API para crear un conjunto de datos antes de llamar a tf.distribute.Strategy.experimental_distribute_dataset .

Otra forma de iterar sobre sus datos es usar iteradores explícitamente. Es posible que desee hacer esto cuando desee ejecutar una cantidad determinada de pasos en lugar de iterar sobre todo el conjunto de datos. La iteración anterior ahora se modificaría para crear primero un iterador y luego llamarlo explícitamente a next para obtener los datos de entrada.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32)
tf.Tensor(0.17533402, shape=(), dtype=float32)
tf.Tensor(0.17481743, shape=(), dtype=float32)
tf.Tensor(0.17430364, shape=(), dtype=float32)
tf.Tensor(0.17379259, shape=(), dtype=float32)
tf.Tensor(0.17328428, shape=(), dtype=float32)
tf.Tensor(0.17277871, shape=(), dtype=float32)
tf.Tensor(0.17227581, shape=(), dtype=float32)
tf.Tensor(0.17177561, shape=(), dtype=float32)
tf.Tensor(0.17127804, shape=(), dtype=float32)

Esto cubre el caso más simple de usar tf.distribute.Strategy API para distribuir bucles de entrenamiento personalizados.

¿Qué es compatible ahora?

API de entrenamiento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Bucle de entrenamiento personalizado Soportado Soportado Soportado Soporte experimental Soporte experimental

Ejemplos y tutoriales

Estos son algunos ejemplos del uso de estrategias de distribución con bucles de entrenamiento personalizados:

  1. Tutorial : Entrenamiento con un bucle de entrenamiento personalizado y MirroredStrategy .
  2. Tutorial : Entrenamiento con un bucle de entrenamiento personalizado y MultiWorkerMirroredStrategy .
  3. Guía : contiene un ejemplo de un ciclo de entrenamiento personalizado con TPUStrategy .
  4. Tutorial : Entrenamiento del servidor de parámetros con un ciclo de entrenamiento personalizado y ParameterServerStrategy .
  5. Repositorio TensorFlow Model Garden que contiene colecciones de modelos de última generación implementados mediante diversas estrategias.

Otros temas

Esta sección cubre algunos temas que son relevantes para múltiples casos de uso.

Configuración de la variable de entorno TF_CONFIG

Para la capacitación de varios trabajadores, como se mencionó anteriormente, debe configurar la variable de entorno 'TF_CONFIG' para cada binario que se ejecuta en su clúster. La variable de entorno 'TF_CONFIG' es una cadena JSON que especifica qué tareas constituyen un clúster, sus direcciones y el rol de cada tarea en el clúster. El repositorio de tensorflow/ecosystem proporciona una plantilla de Kubernetes, que configura 'TF_CONFIG' para sus tareas de capacitación.

Hay dos componentes de 'TF_CONFIG' : un clúster y una tarea.

  • Un clúster proporciona información sobre el clúster de capacitación, que es un dictado que consta de diferentes tipos de trabajos, como trabajadores. En la capacitación de varios trabajadores, generalmente hay un trabajador que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un trabajador normal. Dicho trabajador se denomina trabajador "jefe", y es habitual que el trabajador con índice 0 sea ​​designado como trabajador jefe (de hecho, así es como se implementa tf.distribute.Strategy ).
  • Una tarea, por otro lado, proporciona información sobre la tarea actual. El clúster del primer componente es el mismo para todos los trabajadores, y la tarea del segundo componente es diferente en cada trabajador y especifica el tipo y el índice de ese trabajador.

Un ejemplo de 'TF_CONFIG' es:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Este 'TF_CONFIG' especifica que hay tres trabajadores y dos tareas "ps" en el "cluster" junto con sus hosts y puertos. La parte de "task" especifica el rol de la tarea actual en el "cluster" : trabajador 1 (el segundo trabajador). Los roles válidos en un clúster son "chief" , "worker" , "ps" y "evaluator" . No debe haber ningún trabajo "ps" , excepto cuando se usa tf.distribute.experimental.ParameterServerStrategy .

¿Que sigue?

tf.distribute.Strategy está activamente en desarrollo. Pruébelo y proporcione sus comentarios utilizando problemas de GitHub .