Entrenamiento distribuido con TensorFlow

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Descripción general

tf.distribute.Strategy es una API TensorFlow para distribuir la capacitación a través de múltiples GPU, múltiples máquinas o TPU. Con esta API, puede distribuir sus modelos existentes y el código de entrenamiento con cambios mínimos en el código.

tf.distribute.Strategy ha sido diseñado con estos objetivos clave en mente:

  • Fácil de usar y compatible con múltiples segmentos de usuarios, incluidos investigadores, ingenieros de ML, etc.
  • Proporciona un buen rendimiento desde el primer momento.
  • Fácil cambio entre estrategias.

tf.distribute.Strategy se puede utilizar con un API de alto nivel como Keras , y también puede ser utilizado para distribuir los bucles de formación a medida (y, en general, cualquier cálculo utilizando TensorFlow).

En TensorFlow 2.x, puede ejecutar sus programas con impaciencia, o en un gráfico usando tf.function . tf.distribute.Strategy tiene la intención de apoyar a estos dos modos de ejecución, pero funciona mejor con tf.function . Modo ansiosos sólo se recomienda para fines de depuración y no se admite para TPUStrategy . Aunque la capacitación es el tema central de esta guía, esta API también se puede utilizar para distribuir evaluaciones y predicciones en diferentes plataformas.

Puede utilizar tf.distribute.Strategy con muy pocos cambios en el código, ya que los componentes subyacentes de TensorFlow se han cambiado para convertirse en la estrategia consciente. Esto incluye variables, capas, modelos, optimizadores, métricas, resúmenes y puntos de control.

En esta guía, aprenderá sobre varios tipos de estrategias y cómo puede usarlas en diferentes situaciones. Para aprender cómo depurar problemas de rendimiento, ver la Optimizar el rendimiento de la GPU TensorFlow guía.

# Import TensorFlow
import tensorflow as tf
2021-07-14 01:23:18.251555: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0

Tipos de estrategias

tf.distribute.Strategy tiene la intención de cubrir una serie de casos de uso a lo largo de diferentes ejes. Algunas de estas combinaciones son compatibles actualmente y otras se agregarán en el futuro. Algunos de estos ejes son:

  • Síncrona contra el entrenamiento asíncrono: Estas son dos formas comunes de la distribución de la formación con paralelismo de datos. En el entrenamiento sincronizado, todos los trabajadores entrenan sobre diferentes segmentos de datos de entrada sincronizados y agregando gradientes en cada paso. En el entrenamiento asincrónico, todos los trabajadores se entrenan de forma independiente sobre los datos de entrada y actualizan las variables de forma asincrónica. Normalmente, el entrenamiento de sincronización se admite a través de all-reduce y async a través de la arquitectura del servidor de parámetros.
  • Plataforma de Hardware: Es posible que desee escalar su formación en varias GPU en una máquina o varias máquinas en una red (con 0 o más GPU cada uno), o en la nube TPU.

Para respaldar estos casos de uso, hay 6 estrategias disponibles. La siguiente sección explica cuáles de estos son compatibles en qué escenarios en TensorFlow. Aquí hay una descripción general rápida:

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API de Keras Soportado Soportado Soportado Soporte experimental Publicación planificada compatible 2.4
Bucle de entrenamiento personalizado Soportado Soportado Soportado Soporte experimental Soporte experimental
API de estimador Soporte limitado No soportado Soporte limitado Soporte limitado Soporte limitado

EspejoEstrategia

tf.distribute.MirroredStrategy soportes distribuidos síncronos de formación en múltiples GPUs en una sola máquina. Crea una réplica por dispositivo GPU. Cada variable del modelo se refleja en todas las réplicas. En conjunto, estas variables forman una sola variable conceptual llamado MirroredVariable . Estas variables se mantienen sincronizadas entre sí mediante la aplicación de actualizaciones idénticas.

Se utilizan algoritmos eficientes de reducción total para comunicar las actualizaciones de variables a través de los dispositivos. All-reduce agrega tensores en todos los dispositivos agregándolos y los pone a disposición en cada dispositivo. Es un algoritmo fusionado que es muy eficiente y puede reducir significativamente la sobrecarga de sincronización. Hay muchos algoritmos e implementaciones de reducción total disponibles, según el tipo de comunicación disponible entre dispositivos. Por defecto, se utiliza la biblioteca de la comunicación colectiva NVIDIA ( NCCL ) como la implementación de todos los reducirá. Puede elegir entre algunas otras opciones o escribir la suya propia.

Esta es la forma más sencilla de crear MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
2021-07-14 01:23:19.530023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-07-14 01:23:20.213426: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.214078: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:00:05.0 name: NVIDIA Tesla V100-SXM2-16GB computeCapability: 7.0
coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 15.78GiB deviceMemoryBandwidth: 836.37GiB/s
2021-07-14 01:23:20.214108: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-07-14 01:23:20.217733: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
2021-07-14 01:23:20.217820: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11
2021-07-14 01:23:20.218871: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcufft.so.10
2021-07-14 01:23:20.219205: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcurand.so.10
2021-07-14 01:23:20.220324: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcusolver.so.11
2021-07-14 01:23:20.221308: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcusparse.so.11
2021-07-14 01:23:20.221482: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudnn.so.8
2021-07-14 01:23:20.221579: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.222279: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.222966: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1871] Adding visible gpu devices: 0
2021-07-14 01:23:20.224084: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-14 01:23:20.224666: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.225322: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:00:05.0 name: NVIDIA Tesla V100-SXM2-16GB computeCapability: 7.0
coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 15.78GiB deviceMemoryBandwidth: 836.37GiB/s
2021-07-14 01:23:20.225396: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.226008: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.226671: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1871] Adding visible gpu devices: 0
2021-07-14 01:23:20.226715: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-07-14 01:23:20.843669: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1258] Device interconnect StreamExecutor with strength 1 edge matrix:
2021-07-14 01:23:20.843707: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1264]      0 
2021-07-14 01:23:20.843716: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1277] 0:   N 
2021-07-14 01:23:20.843925: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.844616: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.845280: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-07-14 01:23:20.845985: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1418] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 14646 MB memory) -> physical GPU (device: 0, name: NVIDIA Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Esto creará un MirroredStrategy ejemplo, que utilizar todas las GPU que son visibles para TensorFlow, y NCCL-como la comunicación entre dispositivos.

Si desea utilizar solo algunas de las GPU en su máquina, puede hacerlo así:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Si desea anular el dispositivo de comunicación cruzada, puede hacerlo utilizando el cross_device_ops argumento mediante el suministro de una instancia de tf.distribute.CrossDeviceOps . Actualmente, tf.distribute.HierarchicalCopyAllReduce y tf.distribute.ReductionToOneDevice dos opciones distintas de tf.distribute.NcclAllReduce , que es el valor predeterminado.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy le permite ejecutar su entrenamiento TensorFlow en unidades de procesamiento Tensor (TPU). Los TPU son ASIC especializados de Google diseñados para acelerar drásticamente las cargas de trabajo de aprendizaje automático. Están disponibles en Google Colab , la nube de Investigación de TPU , y la nube de TPU .

En cuanto a la arquitectura distribuida de formación, TPUStrategy es el mismo MirroredStrategy -es implementos síncronos formación distribuida. TPU proporcionar su propia implementación de las operaciones colectivas eficaces todo reducir y otros a través de múltiples núcleos de TPU, que se utilizan en TPUStrategy .

Así es como se haría una instancia TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

El TPUClusterResolver instancia ayuda a localizar los TPU. En Colab, no es necesario que le especifiques ningún argumento.

Si desea usar esto para Cloud TPU:

  • Debe especificar el nombre de su recurso de TPU en el tpu argumento.
  • Debe inicializar el sistema TPU explícitamente al comienzo del programa. Esto es necesario antes de que las TPU se puedan utilizar para el cálculo. La inicialización del sistema tpu también borra la memoria de la TPU, por lo que es importante completar este paso primero para evitar perder el estado.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy es muy similar a MirroredStrategy . Implementa entrenamiento distribuido sincrónico entre múltiples trabajadores, cada uno con potencialmente múltiples GPU. Al igual que en tf.distribute.MirroredStrategy , crea copias de todas las variables en el modelo en cada dispositivo a través de todos los trabajadores.

Esta es la forma más sencilla de crear MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy tiene dos implementaciones para las comunicaciones en varios dispositivos. CommunicationImplementation.RING es RPC basados en y apoya las dos CPU y GPU. CommunicationImplementation.NCCL utiliza NCCL y proporciona un rendimiento estado de arte en las GPU pero no soporta CPUs. CollectiveCommunication.AUTO aplaza la decisión de Tensorflow. Puede especificarlos de la siguiente manera:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Una de las diferencias clave para poner en marcha la formación de varios trabajadores, en comparación con la formación de varias GPU, es la configuración de varios trabajadores. El TF_CONFIG variable de entorno es la forma estándar en TensorFlow para especificar la configuración de clúster a cada trabajador que es parte de la agrupación. Más información sobre la creación de TF_CONFIG .

ParameterServerStrategy

El entrenamiento del servidor de parámetros es un método común de datos paralelos para escalar el entrenamiento de modelos en varias máquinas. Un clúster de entrenamiento del servidor de parámetros consta de trabajadores y servidores de parámetros. Las variables se crean en servidores de parámetros y los trabajadores las leen y actualizan en cada paso. Echa un vistazo a la formación del servidor Parámetro tutorial para obtener más detalles.

En TensorFlow 2, la formación del servidor parámetro utiliza una arquitectura central basado en el coordinador a través de la tf.distribute.experimental.coordinator.ClusterCoordinator clase.

En esta implementación, los worker y el parameter server tareas se ejecutan tf.distribute.Server s que escuchan para las tareas del coordinador. El coordinador crea recursos, distribuye tareas de capacitación, escribe puntos de control y se ocupa de las fallas de tareas.

En la programación se ejecuta en el coordinador, que va a utilizar un ParameterServerStrategy objeto de definir un paso de formación y utilizar un ClusterCoordinator a los pasos de formación de despacho a los trabajadores a distancia. Esta es la forma más sencilla de crearlos:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

En TensorFlow 1, ParameterServerStrategy sólo está disponible con un estimador a través tf.compat.v1.distribute.experimental.ParameterServerStrategy símbolo.

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy hace la formación sincrónica también. Las variables no se reflejan, sino que se colocan en la CPU y las operaciones se replican en todas las GPU locales. Si solo hay una GPU, todas las variables y operaciones se colocarán en esa GPU.

Crear una instancia de CentralStorageStrategy por:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Esto creará un CentralStorageStrategy ejemplo que utilizará todas las GPU y CPU visibles. La actualización de las variables en las réplicas se agregará antes de aplicarse a las variables.

Otras estrategias

Además de las estrategias anteriores, hay otras dos estrategias que podrían ser útiles para la creación de prototipos y depuración utilizando tf.distribute APIs.

Estrategia predeterminada

La estrategia predeterminada es una estrategia de distribución que está presente cuando no se incluye una estrategia de distribución explícita. Implementa el tf.distribute.Strategy interfaz, pero es un paso a través y no proporciona ninguna distribución real. Por ejemplo, strategy.run(fn) simplemente llamar a fn . El código escrito con esta estrategia debe comportarse exactamente como el código escrito sin ninguna estrategia. Puede pensar en ello como una estrategia "no operativa".

La estrategia predeterminada es un singleton, y no se pueden crear más instancias de ella. Se puede obtener usando tf.distribute.get_strategy fuera del alcance de cualquier estrategia explícita (la misma API que se puede utilizar para obtener la estrategia actual dentro del alcance de una estrategia explícita).

default_strategy = tf.distribute.get_strategy()

Esta estrategia tiene dos propósitos principales:

  • Permite escribir código de biblioteca compatible con la distribución de forma incondicional. Por ejemplo, en tf.optimizer s uso puede tf.distribute.get_strategy y el uso que la estrategia para la reducción de los gradientes-que siempre devolverá un objeto de la estrategia en la que se puede llamar al reducir la API.
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Similar al código de la biblioteca, se puede utilizar para escribir programas de usuarios finales para que funcionen con y sin estrategia de distribución, sin requerir lógica condicional. Aquí hay un fragmento de código de muestra que ilustra esto:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy es una estrategia para colocar todas las variables y cómputo en un solo dispositivo especificado.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Esta estrategia es distinta de la estrategia predeterminada en varias formas. En la estrategia predeterminada, la lógica de ubicación de las variables permanece sin cambios en comparación con la ejecución de TensorFlow sin ninguna estrategia de distribución. Pero cuando se utiliza OneDeviceStrategy , todas las variables creadas en su ámbito de aplicación se colocan de forma explícita en el dispositivo especificado. Además, cualesquiera funciones llamadas a través de OneDeviceStrategy.run también serán colocados en el dispositivo especificado.

La entrada distribuida a través de esta estrategia se precargará en el dispositivo especificado. En la estrategia predeterminada, no hay distribución de entrada.

De manera similar a la estrategia predeterminada, esta estrategia también podría usarse para probar su código antes de cambiar a otras estrategias que realmente se distribuyen a múltiples dispositivos / máquinas. Esto ejercerá la maquinaria estrategia de distribución algo más de estrategia por defecto, pero no con todo el rigor como el uso de MirroredStrategy o TPUStrategy etc Si desea código que se comporta como si hay una estrategia, a continuación, utilizar la estrategia por defecto.

Hasta ahora, ha visto las diferentes estrategias disponibles y cómo puede instanciarlas. Las siguientes secciones muestran las diferentes formas en las que puede utilizarlas para distribuir su formación.

Usando tf.distribute.Strategy con tf.keras.Model.fit

tf.distribute.Strategy está integrado en tf.keras , que es la aplicación de la TensorFlow de especificación API Keras . tf.keras es una API de alto nivel para los modelos de construcción y de tren. Mediante la integración en tf.keras back-end, es transparente para que usted pueda distribuir su escrito de capacitación en el marco de formación Keras usando Model.fit .

Esto es lo que necesita cambiar en su código:

  1. Crear una instancia de la adecuada tf.distribute.Strategy .
  2. Mover la creación de Keras modelo, optimizador y métricas dentro strategy.scope .

Las estrategias de distribución de TensorFlow admiten todos los tipos de modelos de Keras: secuenciales, funcionales y subclases.

Aquí hay un fragmento de código para hacer esto para un modelo Keras muy simple con una capa densa:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

En este ejemplo se utiliza MirroredStrategy , para que pueda ejecutar esto en una máquina con múltiples GPU. strategy.scope() indica que Keras la estrategia a utilizar para distribuir el entrenamiento. La creación de modelos / optimizadores / métricas dentro de este ámbito le permite crear variables distribuidas en lugar de variables regulares. Una vez que esto esté configurado, puede ajustar su modelo como lo haría normalmente. MirroredStrategy se encarga de replicar la formación del modelo sobre las GPU disponibles, la agregación de los gradientes, y mucho más.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-07-14 01:23:21.641884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-07-14 01:23:21.671942: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-07-14 01:23:21.672424: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000175000 Hz
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-07-14 01:23:24.013985: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
10/10 [==============================] - 3s 2ms/step - loss: 0.0014
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 6.3687e-04
2021-07-14 01:23:24.434928: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11
2021-07-14 01:23:24.528027: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 3.9551e-04
0.00039551049121655524

Aquí un tf.data.Dataset proporciona la formación y la entrada de eval. También puede usar matrices NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-07-14 01:23:25.775102: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_1006"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 2.8150e-04
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 1.2442e-04
<tensorflow.python.keras.callbacks.History at 0x7fe091028f50>

En ambos casos-con Dataset o Numpy-cada lote de la entrada dada se divide por igual entre las múltiples réplicas. Por ejemplo, si está utilizando la MirroredStrategy con 2 GPU, cada lote de tamaño 10 conseguirá dividido entre los 2 GPU, con cada recepción 5 ejemplos de entrada en cada paso. Cada época se entrenará más rápido a medida que agregue más GPU. Por lo general, querrá aumentar el tamaño de su lote a medida que agrega más aceleradores, para hacer un uso efectivo de la potencia de cálculo adicional. También deberá volver a ajustar su tasa de aprendizaje, según el modelo. Puede utilizar strategy.num_replicas_in_sync para obtener el número de réplicas.

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

¿Qué es compatible ahora?

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
API de Keras Soportado Soportado Soporte experimental Soporte experimental Soporte experimental

Ejemplos y tutoriales

Aquí hay una lista de tutoriales y ejemplos que ilustran la integración anterior de principio a fin con Keras:

  1. Tutorial para entrenar con MNIST MirroredStrategy .
  2. Tutorial para entrenar MNIST usando MultiWorkerMirroredStrategy .
  3. Guía de entrenamiento usando MNIST TPUStrategy .
  4. Tutorial para la formación del servidor parámetro en TensorFlow 2 con ParameterServerStrategy .
  5. TensorFlow Modelo Jardín repository que contiene colecciones de modelos de estado-of-the-art implementados utilizando diversas estrategias.

Usando tf.distribute.Strategy con bucles de formación personalizada

Como hemos visto, el uso de tf.distribute.Strategy con Keras model.fit requiere cambiar sólo un par de líneas de código. Con un pequeño esfuerzo más, también se puede utilizar tf.distribute.Strategy con bucles de formación personalizado.

Si necesita más flexibilidad y control sobre sus ciclos de entrenamiento de lo que es posible con Estimator o Keras, puede escribir ciclos de entrenamiento personalizados. Por ejemplo, al usar un GAN, es posible que desee realizar un número diferente de pasos de generador o discriminador en cada ronda. Del mismo modo, los marcos de alto nivel no son muy adecuados para la formación de aprendizaje por refuerzo.

Los tf.distribute.Strategy clases proporcionan un conjunto básico de los métodos a través de los bucles de formación a medida de apoyo. El uso de estos puede requerir una reestructuración menor del código inicialmente, pero una vez hecho esto, debería poder cambiar entre GPU, TPU y varias máquinas simplemente cambiando la instancia de estrategia.

Aquí, verá un breve fragmento que ilustra este caso de uso para un ejemplo de entrenamiento simple usando el mismo modelo de Keras que antes.

Primero, cree el modelo y el optimizador dentro del alcance de la estrategia. Esto asegura que las variables creadas con el modelo y el optimizador sean variables reflejadas.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

A continuación, cree el conjunto de datos de entrada y llamar tf.distribute.Strategy.experimental_distribute_dataset para distribuir el conjunto de datos basado en la estrategia.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-07-14 01:23:27.005233: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Luego, defina un paso del entrenamiento. Uso tf.GradientTape a los gradientes de cómputo y optimizador para aplicar esos gradientes para actualizar las variables de su modelo. Para distribuir este paso de formación, lo puso en una función train_step y pasarlo a tf.distrbute.Strategy.run junto con las entradas del conjunto de datos que obtuvo de la dist_dataset creado antes:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Algunas otras cosas a tener en cuenta en el código anterior:

  1. Que utilizó tf.nn.compute_average_loss para calcular la pérdida. tf.nn.compute_average_loss resume la pérdida por ejemplo, y dividir la suma por el global_batch_size. Esto es importante porque más tarde después de los gradientes se calculan en cada réplica, que están agregados a través de las réplicas sumando ellos.
  2. También utilizó el tf.distribute.Strategy.reduce API para agregar los resultados devueltos por tf.distribute.Strategy.run . tf.distribute.Strategy.run devuelve los resultados de cada réplica local en la estrategia, y hay varias maneras de consumir este resultado. Puede reduce ellos para obtener un valor agregado. También se puede hacer tf.distribute.Strategy.experimental_local_results para obtener la lista de los valores contenidos en el resultado, uno por cada réplica local.
  3. Cuando se llama a apply_gradients dentro de un ámbito estrategia de distribución, su comportamiento se modifica. Específicamente, antes de aplicar gradientes en cada instancia paralela durante el entrenamiento sincrónico, realiza una suma total de réplicas de los gradientes.

Por último, una vez que haya definido el paso de la formación, se puede iterar sobre dist_dataset y ejecutar la formación en un bucle:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.36346385, shape=(), dtype=float32)
tf.Tensor(0.3616105, shape=(), dtype=float32)
tf.Tensor(0.3597728, shape=(), dtype=float32)
tf.Tensor(0.35795057, shape=(), dtype=float32)
tf.Tensor(0.35614368, shape=(), dtype=float32)
tf.Tensor(0.35435185, shape=(), dtype=float32)
tf.Tensor(0.3525751, shape=(), dtype=float32)
tf.Tensor(0.35081312, shape=(), dtype=float32)
tf.Tensor(0.3490658, shape=(), dtype=float32)
tf.Tensor(0.34733298, shape=(), dtype=float32)
tf.Tensor(0.34561458, shape=(), dtype=float32)
tf.Tensor(0.3439103, shape=(), dtype=float32)
tf.Tensor(0.3422201, shape=(), dtype=float32)
tf.Tensor(0.34054378, shape=(), dtype=float32)
tf.Tensor(0.33888122, shape=(), dtype=float32)
tf.Tensor(0.33723223, shape=(), dtype=float32)
tf.Tensor(0.3355967, shape=(), dtype=float32)
tf.Tensor(0.3339745, shape=(), dtype=float32)
tf.Tensor(0.33236548, shape=(), dtype=float32)
tf.Tensor(0.33076945, shape=(), dtype=float32)

En el ejemplo anterior, se repiten a lo largo del dist_dataset para proporcionar información a su entrenamiento. También se proporcionan con el tf.distribute.Strategy.make_experimental_numpy_dataset para apoyar entradas NumPy. Se puede utilizar esta API para crear un conjunto de datos antes de llamar tf.distribute.Strategy.experimental_distribute_dataset .

Otra forma de iterar sobre sus datos es usar iteradores explícitamente. Es posible que desee hacer esto cuando desee ejecutar una determinada cantidad de pasos en lugar de iterar sobre todo el conjunto de datos. Lo anterior iteración ahora se modificaría para crear primero un iterador y luego llamar explícitamente next en él para obtener los datos de entrada.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.3291864, shape=(), dtype=float32)
tf.Tensor(0.32761604, shape=(), dtype=float32)
tf.Tensor(0.32605836, shape=(), dtype=float32)
tf.Tensor(0.3245131, shape=(), dtype=float32)
tf.Tensor(0.32298028, shape=(), dtype=float32)
tf.Tensor(0.32145968, shape=(), dtype=float32)
tf.Tensor(0.3199512, shape=(), dtype=float32)
tf.Tensor(0.31845465, shape=(), dtype=float32)
tf.Tensor(0.31697, shape=(), dtype=float32)
tf.Tensor(0.31549713, shape=(), dtype=float32)

Esto cubre el caso más sencillo de utilizar tf.distribute.Strategy API para distribuir los bucles de formación personalizados.

¿Qué es compatible ahora?

API de entrenamiento EspejoEstrategia TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Bucle de entrenamiento personalizado Soportado Soportado Soporte experimental Soporte experimental Soporte experimental

Ejemplos y tutoriales

A continuación, se muestran algunos ejemplos del uso de la estrategia de distribución con ciclos de entrenamiento personalizados:

  1. Tutorial para entrenar MNIST usando MirroredStrategy .
  2. Guía de entrenamiento usando MNIST TPUStrategy .
  3. TensorFlow Modelo Jardín repository que contiene colecciones de modelos de estado-of-the-art implementados utilizando diversas estrategias.

Otros temas

Esta sección cubre algunos temas que son relevantes para múltiples casos de uso.

Configuración de la variable de entorno TF_CONFIG

Para el entrenamiento de varios trabajadores, como se mencionó antes, es necesario configurar el TF_CONFIG variable de entorno para cada binario que se ejecuta en el clúster. El TF_CONFIG variable de entorno es una cadena JSON que especifica las tareas que constituyen un clúster, sus direcciones y el papel de cada tarea en el clúster. El tensorflow/ecosystem de recompra proporciona una plantilla Kubernetes, que establece TF_CONFIG para sus tareas de formación.

Hay dos componentes de TF_CONFIG : un clúster y una tarea.

  • Un grupo proporciona información sobre el grupo de capacitación, que es un diccionario que consta de diferentes tipos de trabajos, como trabajador. En la capacitación de varios trabajadores, generalmente hay un trabajador que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace un trabajador normal. Dicho trabajador se le conoce como el trabajador "jefe", y es costumbre que el trabajador con el índice 0 es designado como el principal trabajador (de hecho esta es la forma tf.distribute.Strategy se implementa).
  • Por otro lado, una tarea proporciona información sobre la tarea actual. El primer grupo de componentes es el mismo para todos los trabajadores, y la tarea del segundo componente es diferente en cada trabajador y especifica el tipo y el índice de ese trabajador.

Un ejemplo de TF_CONFIG es:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Este TF_CONFIG especifica que hay tres trabajadores y dos "ps" tareas en el "cluster" , junto con sus anfitriones y los puertos. Las "task" parte especifica que el papel de la tarea actual en el "cluster" -worker 1 (el segundo trabajador). Papeles válidos en un racimo son "chief" , "worker" , "ps" y "evaluator" . No debería haber ninguna "ps" trabajo excepto cuando se usa tf.distribute.experimental.ParameterServerStrategy .

¿Que sigue?

tf.distribute.Strategy es activa en fase de desarrollo. Probarlo y proporcionar y sus comentarios usando temas de GitHub .