![]() | ![]() | ![]() | ![]() |
Las API de tf.distribute proporcionan una manera fácil para que los usuarios escalen su entrenamiento de una sola máquina a varias máquinas. Al escalar su modelo, los usuarios también deben distribuir su entrada en varios dispositivos. tf.distribute
proporciona API con las que puede distribuir automáticamente su entrada entre dispositivos.
Esta guía le mostrará las diferentes formas en que puede crear iteradores y conjuntos de datos distribuidos utilizando tf.distribute
API tf.distribute
. Además, se cubrirán los siguientes temas:
- Opciones de uso, fragmentación y procesamiento por lotes al usar
tf.distribute.Strategy.experimental_distribute_dataset
ytf.distribute.Strategy.distribute_datasets_from_function
. - Diferentes formas en las que puede iterar sobre el conjunto de datos distribuidos.
- Las diferencias entre las API de
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
ytf.data
API detf.data
, así como las limitaciones que los usuarios puedan encontrar en su uso.
Esta guía no cubre el uso de entradas distribuidas con las API de Keras.
Conjuntos de datos distribuidos
Para usar tf.distribute
API de tf.distribute
para escalar, se recomienda que los usuarios usentf.data.Dataset
para representar su entrada. tf.distribute
ha hecho quetf.data.Dataset
funcione de manera eficiente contf.data.Dataset
(por ejemplo, precarga automática de datos en cada dispositivo acelerador) con optimizaciones de rendimiento que se incorporan regularmente en la implementación. Si tiene un caso de uso para usar algo distinto atf.data.Dataset
, consulte una sección posterior de esta guía. En un ciclo de entrenamiento no distribuido, los usuarios primero crean una instanciatf.data.Dataset
y luego iteran sobre los elementos. Por ejemplo:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.4.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Para permitir a los usuarios usar la estrategia tf.distribute
con cambios mínimos en el código existente del usuario, se introdujeron dos API que distribuirían una instanciatf.data.Dataset
y devolverían un objeto de conjunto de datos distribuido. Luego, un usuario podría iterar sobre esta instancia de conjunto de datos distribuidos y entrenar su modelo como antes. Veamos ahora las dos API: tf.distribute.Strategy.experimental_distribute_dataset
y tf.distribute.Strategy.distribute_datasets_from_function
con más detalle:
tf.distribute.Strategy.experimental_distribute_dataset
Uso
Esta API toma una instanciatf.data.Dataset
como entrada y devuelve una instancia tf.distribute.DistributedDataset
. Debe agrupar el conjunto de datos de entrada con un valor que sea igual al tamaño del lote global. Este tamaño de lote global es la cantidad de muestras que desea procesar en todos los dispositivos en 1 paso. Puede iterar sobre este conjunto de datos distribuido en forma Pythonic o crear un iterador usando iter
. El objeto devuelto no es una instancia detf.data.Dataset
y no admite ninguna otra API que transforme o inspeccione el conjunto de datos de ninguna manera. Esta es la API recomendada si no tiene formas específicas en las que desea dividir su entrada en diferentes réplicas.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>)
Propiedades
Por lotes
tf.distribute
vuelve atf.data.Dataset
instancia de entradatf.data.Dataset
con un nuevo tamaño de lote que es igual al tamaño de lote global dividido por el número de réplicas sincronizadas. El número de réplicas sincronizadas es igual al número de dispositivos que participan en la reducción de gradiente durante el entrenamiento. Cuando un usuario llama a next
en el iterador distribuido, se devuelve un tamaño de lote de datos por réplica en cada réplica. La cardinalidad del conjunto de datos reagrupados siempre será un múltiplo del número de réplicas. Aquí hay un par de ejemplos:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Sin distribución:
- Lote 1: [0, 1, 2, 3]
- Lote 2: [4, 5]
Con distribución sobre 2 réplicas. El último lote ([4, 5]) se divide en 2 réplicas.
Lote 1:
- Réplica 1: [0, 1]
- Réplica 2: [2, 3]
Lote 2:
- Réplica 2: [4]
- Réplica 2: [5]
tf.data.Dataset.range(4).batch(4)
- Sin distribución:
- Lote 1: [[0], [1], [2], [3]]
- Con distribución sobre 5 réplicas:
- Lote 1:
- Réplica 1: [0]
- Réplica 2: [1]
- Réplica 3: [2]
- Réplica 4: [3]
- Réplica 5: []
tf.data.Dataset.range(8).batch(4)
- Sin distribución:
- Lote 1: [0, 1, 2, 3]
- Lote 2: [4, 5, 6, 7]
- Con distribución en 3 réplicas:
- Lote 1:
- Réplica 1: [0, 1]
- Réplica 2: [2, 3]
- Réplica 3: []
- Lote 2:
- Réplica 1: [4, 5]
- Réplica 2: [6, 7]
- Réplica 3: []
Volver a agrupar el conjunto de datos tiene una complejidad de espacio que aumenta linealmente con la cantidad de réplicas. Esto significa que para el caso de uso de capacitación de varios trabajadores, la canalización de entrada puede generar errores OOM.
Fragmentación
tf.distribute
también comparte automáticamente el conjunto de datos de entrada en el entrenamiento de varios trabajadores con MultiWorkerMirroredStrategy
y TPUStrategy
. Cada conjunto de datos se crea en el dispositivo de CPU del trabajador. La fragmentación automática de un conjunto de datos sobre un conjunto de trabajadores significa que a cada trabajador se le asigna un subconjunto de todo el conjunto de datos (si se establece la tf.data.experimental.AutoShardPolicy
correcta). Esto es para garantizar que en cada paso, cada trabajador procese un tamaño de lote global de elementos del conjunto de datos que no se superponen. Autosharding tiene un par de opciones diferentes que se pueden especificar usando tf.data.experimental.DistributeOptions
. Tenga en cuenta que no existe la fragmentación automática en el entrenamiento de varios trabajadores con ParameterServerStrategy
, y se puede encontrar más información sobre la creación de conjuntos de datos con esta estrategia en el tutorial Estrategia del servidor de parámetros .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Hay tres opciones diferentes que puede configurar para tf.data.experimental.AutoShardPolicy
:
- AUTO: Esta es la opción predeterminada, lo que significa que FILE hará un intento de fragmentación. El intento de fragmentar mediante FILE falla si no se detecta un conjunto de datos basado en archivos.
tf.distribute
luego recurrirá a la fragmentación por DATA. Tenga en cuenta que si el conjunto de datos de entrada está basado en archivos, pero la cantidad de archivos es menor que la cantidad de trabajadores, seInvalidArgumentError
unInvalidArgumentError
. Si esto sucede, establezca explícitamente la política enAutoShardPolicy.DATA
o divida su fuente de entrada en archivos más pequeños de modo que la cantidad de archivos sea mayor que la cantidad de trabajadores. ARCHIVO: esta es la opción si desea fragmentar los archivos de entrada entre todos los trabajadores. Debería utilizar esta opción si la cantidad de archivos de entrada es mucho mayor que la cantidad de trabajadores y los datos de los archivos se distribuyen de manera uniforme. La desventaja de esta opción es tener trabajadores inactivos si los datos de los archivos no se distribuyen de manera uniforme. Si el número de archivos es menor que el número de trabajadores, se
InvalidArgumentError
unInvalidArgumentError
. Si esto sucede, establezca explícitamente la política enAutoShardPolicy.DATA
. Por ejemplo, distribuyamos 2 archivos entre 2 trabajadores con 1 réplica cada uno. El archivo 1 contiene [0, 1, 2, 3, 4, 5] y el archivo 2 contiene [6, 7, 8, 9, 10, 11]. Sea 2 el número total de réplicas sincronizadas y 4 el tamaño del lote global.- Trabajador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [2, 3]
- Lote 3 = Réplica 1: [4]
- Lote 4 = Réplica 1: [5]
- Trabajador 1:
- Lote 1 = Réplica 2: [6, 7]
- Lote 2 = Réplica 2: [8, 9]
- Lote 3 = Réplica 2: [10]
- Lote 4 = Réplica 2: [11]
DATOS: Esto automáticamente endurecerá los elementos en todos los trabajadores. Cada uno de los trabajadores leerá el conjunto de datos completo y solo procesará el fragmento asignado. Todos los demás fragmentos se descartarán. Esto se usa generalmente si la cantidad de archivos de entrada es menor que la cantidad de trabajadores y desea una mejor fragmentación de los datos en todos los trabajadores. La desventaja es que todo el conjunto de datos se leerá en cada trabajador. Por ejemplo, distribuyamos 1 archivo entre 2 trabajadores. El archivo 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Sea 2 el número total de réplicas sincronizadas.
- Trabajador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [4, 5]
- Lote 3 = Réplica 1: [8, 9]
- Trabajador 1:
- Lote 1 = Réplica 2: [2, 3]
- Lote 2 = Réplica 2: [6, 7]
- Lote 3 = Réplica 2: [10, 11]
DESACTIVADO: si desactiva la función de fragmentación automática, cada trabajador procesará todos los datos. Por ejemplo, distribuyamos 1 archivo entre 2 trabajadores. El archivo 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Deje que el número total de réplicas sincronizadas sea 2. Luego, cada trabajador verá la siguiente distribución:
- Trabajador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [2, 3]
- Lote 3 = Réplica 1: [4, 5]
- Lote 4 = Réplica 1: [6, 7]
- Lote 5 = Réplica 1: [8, 9]
Lote 6 = Réplica 1: [10, 11]
Trabajador 1:
Lote 1 = Réplica 2: [0, 1]
Lote 2 = Réplica 2: [2, 3]
Lote 3 = Réplica 2: [4, 5]
Lote 4 = Réplica 2: [6, 7]
Lote 5 = Réplica 2: [8, 9]
Lote 6 = Réplica 2: [10, 11]
Precarga
De forma predeterminada, tf.distribute
agrega una transformación de tf.distribute
al final de la instancia detf.data.Dataset
proporcionada por el usuario. El argumento de la transformación de buffer_size
que es buffer_size
es igual al número de réplicas sincronizadas.
tf.distribute.Strategy.distribute_datasets_from_function
Uso
Esta API toma una función de entrada y devuelve una instancia tf.distribute.DistributedDataset
. La función de entrada que los usuarios pasan tiene un argumento tf.distribute.InputContext
y debe devolver una instanciatf.data.Dataset
. Con esta API, tf.distribute
no realiza más cambios en la instanciatf.data.Dataset
del usuario devuelta por la función de entrada. Es responsabilidad del usuario procesar por lotes y fragmentar el conjunto de datos. tf.distribute
llama a la función de entrada en el dispositivo de CPU de cada uno de los trabajadores. Además de permitir a los usuarios especificar su propia lógica de fragmentación y procesamiento por lotes, esta API también demuestra una mejor escalabilidad y rendimiento en comparación con tf.distribute.Strategy.experimental_distribute_dataset
cuando se utiliza para la capacitación de varios trabajadores.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Propiedades
Por lotes
La instancia detf.data.Dataset
que es el valor de retorno de la función de entrada debe procesarse por lotes utilizando el tamaño de lote por réplica. El tamaño de lote por réplica es el tamaño de lote global dividido por la cantidad de réplicas que participan en el entrenamiento de sincronización. Esto se debe a que tf.distribute
llama a la función de entrada en el dispositivo de CPU de cada uno de los trabajadores. El conjunto de datos que se crea en un trabajador determinado debe estar listo para que lo utilicen todas las réplicas de ese trabajador.
Fragmentación
El objeto tf.distribute.InputContext
que se pasa implícitamente como un argumento a la función de entrada del usuario es creado por tf.distribute
bajo el capó. Tiene información sobre el número de trabajadores, la identificación del trabajador actual, etc. Esta función de entrada puede manejar la fragmentación según las políticas establecidas por el usuario utilizando estas propiedades que son parte del objeto tf.distribute.InputContext
.
Precarga
tf.distribute
no agrega una transformación detf.data.Dataset
al final deltf.data.Dataset
devuelto por la función de entrada proporcionada por el usuario.
Iteradores distribuidos
De forma similar a las instancias detf.data.Dataset
no distribuidas, deberá crear un iterador en las instancias de tf.distribute.DistributedDataset
para iterar sobre él y acceder a los elementos en tf.distribute.DistributedDataset
. Las siguientes son las formas en las que puede crear un tf.distribute.DistributedIterator
y usarlo para entrenar su modelo:
Usos
Utilice una construcción Pythonic for loop
Puede utilizar un bucle Pythonic fácil de usar para iterar sobre tf.distribute.DistributedDataset
. Los elementos devueltos por tf.distribute.DistributedIterator
pueden ser un solo tf.Tensor
o un tf.distribute.DistributedValues
que contiene un valor por réplica. Colocar el bucle dentro de una función tf.function
. tf.function
rendimiento. Sin embargo, break
y return
no se admiten actualmente para un bucle sobre un tf.distribute.DistributedDataset
que se coloca dentro de un tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Use iter
para crear un iterador explícito
Para iterar sobre los elementos en una instancia de tf.distribute.DistributedDataset
, puede crear un tf.distribute.DistributedIterator
usando la API iter
en él. Con un iterador explícito, puede iterar durante un número fijo de pasos. Con el fin de obtener el siguiente elemento de una tf.distribute.DistributedIterator
ejemplo dist_iterator
, puede llamar al next(dist_iterator)
, dist_iterator.get_next()
, o dist_iterator.get_next_as_optional()
. Los dos primeros son esencialmente los mismos:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Con next()
o tf.distribute.DistributedIterator.get_next()
, si tf.distribute.DistributedIterator
ha llegado a su fin, se generará un error OutOfRange. El cliente puede detectar el error en el lado de Python y continuar haciendo otro trabajo, como puntos de control y evaluación. Sin embargo, esto no funcionará si está utilizando un ciclo de entrenamiento de host (es decir, ejecute varios pasos por función tf.function
), que se ve así:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
contiene varios pasos al envolver el cuerpo del paso dentro de un tf.range
. En este caso, diferentes iteraciones en el bucle sin dependencia podrían comenzar en paralelo, por lo que un error OutOfRange se puede desencadenar en iteraciones posteriores antes de que finalice el cálculo de iteraciones anteriores. Una vez que se lanza un error de OutOfRange, todas las operaciones en la función se terminarán de inmediato. Si este es algún caso que le gustaría evitar, una alternativa que no arroja un error de tf.distribute.DistributedIterator.get_next_as_optional()
es tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
devuelve un tf.experimental.Optional
que contiene el siguiente elemento o ningún valor si tf.distribute.DistributedIterator
ha llegado a su fin.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Usando la propiedad element_spec
Si pasa los elementos de un conjunto de datos distribuidos a una función tf.function
y desea una garantía tf.TypeSpec
, puede especificar el argumento input_signature
de la función tf.function
. La salida de un conjunto de datos distribuidos es tf.distribute.DistributedValues
que puede representar la entrada a un solo dispositivo o varios dispositivos. Para obtener el tf.TypeSpec
correspondiente a este valor distribuido, puede usar la propiedad element_spec
del conjunto de datos distribuidos o del objeto iterador distribuido.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Lotes parciales
Los lotes parciales se encuentran cuandotf.data.Dataset
instanciastf.data.Dataset
que crean los usuarios pueden contener tamaños de lote que no son divisibles de manera uniforme por el número de réplicas o cuando la cardinalidad de la instancia del conjunto de datos no es divisible por el tamaño del lote. Esto significa que cuando el conjunto de datos se distribuye en varias réplicas, la next
llamada en algunos iteradores dará como resultado un OutOfRangeError. Para manejar este caso de uso, tf.distribute
devuelve lotes ficticios de tamaño de lote 0 en réplicas que no tienen más datos para procesar.
Para el caso de un solo trabajador, si la next
llamada en el iterador no devuelve datos, se crean lotes ficticios de tamaño de lote 0 y se utilizan junto con los datos reales en el conjunto de datos. En el caso de lotes parciales, el último lote global de datos contendrá datos reales junto con lotes de datos ficticios. La condición de detención para procesar datos ahora verifica si alguna de las réplicas tiene datos. Si no hay datos en ninguna de las réplicas, se genera un error OutOfRange.
Para el caso de varios trabajadores, el valor booleano que representa la presencia de datos en cada uno de los trabajadores se agrega mediante la comunicación de réplicas cruzadas y esto se usa para identificar si todos los trabajadores han terminado de procesar el conjunto de datos distribuidos. Dado que esto implica la comunicación entre trabajadores, hay una penalización de rendimiento involucrada.
Advertencias
Cuando se utilizan
tf.distribute.Strategy.experimental_distribute_dataset
APItf.distribute.Strategy.experimental_distribute_dataset
con una configuración de varios trabajadores, los usuarios pasan untf.data.Dataset
que lee archivos. Sitf.data.experimental.AutoShardPolicy
se establece enAUTO
oFILE
, el tamaño de lote real por paso puede ser menor que el tamaño de lote global definido por el usuario. Esto puede suceder cuando los elementos restantes del archivo son menores que el tamaño del lote global. Los usuarios pueden agotar el conjunto de datos sin depender de la cantidad de pasos a ejecutar o configurartf.data.experimental.AutoShardPolicy
enDATA
paratf.data.experimental.AutoShardPolicy
.Actualmente, las transformaciones de conjuntos de datos con estado no son compatibles con
tf.distribute
y las operaciones con estado que pueda tener el conjunto de datos se ignoran actualmente. Por ejemplo, si su conjunto de datos tiene unmap_fn
que usatf.random.uniform
para rotar una imagen, entonces tiene un gráfico de conjunto de datos que depende del estado (es decir, la semilla aleatoria) en la máquina local donde se está ejecutando el proceso de Python.Experimental
tf.data.experimental.OptimizationOptions
que están deshabilitadas de forma predeterminada pueden en ciertos contextos, como cuando se usan junto contf.distribute
, causar una degradación del rendimiento. Solo debe habilitarlos después de validar que benefician el rendimiento de su carga de trabajo en una configuración de distribución.Consulte esta guía para
tf.data
cómo optimizar sutf.data
entrada contf.data
en general. Algunos consejos adicionales:Si tiene varios trabajadores y está usando
tf.data.Dataset.list_files
para crear un conjunto de datos a partir de todos los archivos que coinciden con uno o más patrones globales, recuerde establecer el argumentoseed
o establecershuffle=False
para que cada trabajador fragmente el archivo de manera consistente.Si su canalización de entrada incluye tanto mezclar los datos en el nivel de registro como analizar los datos, a menos que los datos sin analizar sean significativamente más grandes que los datos analizados (que generalmente no es el caso), primero mezcle y luego analice, como se muestra en el siguiente ejemplo. Esto puede beneficiar el uso y el rendimiento de la memoria.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
mantiene un búfer interno de elementosbuffer_size
, y así reducirbuffer_size
podríabuffer_size
problema de OOM.No se garantiza el orden en que los trabajadores procesan los datos cuando utilizan
tf.distribute.experimental_distribute_dataset
otf.distribute.distribute_datasets_from_function
. Esto suele ser necesario si utilizatf.distribute
para escalar la predicción. Sin embargo, puede insertar un índice para cada elemento en el lote y ordenar las salidas en consecuencia. El siguiente fragmento es un ejemplo de cómo ordenar los resultados.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
¿Cómo distribuyo mis datos si no estoy usando una instancia canónica de tf.data.Dataset?
A veces, los usuarios no pueden utilizar untf.data.Dataset
para representar su entrada y, posteriormente, las API mencionadas anteriormente para distribuir el conjunto de datos a varios dispositivos. En tales casos, puede utilizar tensores sin procesar o entradas de un generador.
Utilice experimental_distribute_values_from_function para entradas de tensor arbitrarias
strategy.run
acepta tf.distribute.DistributedValues
que es la salida de next(iterator)
. Para pasar los valores del tensor, use experimental_distribute_values_from_function
para construir tf.distribute.DistributedValues
partir de tensores sin procesar.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Use tf.data.Dataset.from_generator si su entrada es de un generador
Si tiene una función de generador que desea usar, puede crear una instanciatf.data.Dataset
usando la API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)