![]() | ![]() | ![]() | ![]() |
API-интерфейсы tf.distribute предоставляют пользователям простой способ масштабирования обучения с одной машины на несколько машин. При масштабировании своей модели пользователи также должны распределять свои входные данные по нескольким устройствам. tf.distribute
предоставляет API-интерфейсы, с помощью которых вы можете автоматически распределять ввод между устройствами.
Это руководство покажет вам различные способы создания распределенных наборов данных и итераторов с tf.distribute
API tf.distribute
. Кроме того, будут рассмотрены следующие темы:
-
tf.distribute.Strategy.experimental_distribute_dataset
использования,tf.distribute.Strategy.experimental_distribute_dataset
иtf.distribute.Strategy.distribute_datasets_from_function
при использованииtf.distribute.Strategy.experimental_distribute_dataset
иtf.distribute.Strategy.distribute_datasets_from_function
. - Различные способы перебора распределенного набора данных.
- Различия между
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API-интерфейсами иtf.data
API, а также любые ограничения, с которыми пользователи могут столкнуться при их использовании.
В этом руководстве не рассматривается использование распределенного ввода с API Keras.
Распределенные наборы данных
Чтобы использовать API tf.distribute
для масштабирования, рекомендуется, чтобы пользователи использовалиtf.data.Dataset
для представления своих входных данных. tf.distribute
был создан для эффективной работы сtf.data.Dataset
(например, автоматическая предварительная выборка данных на каждом устройстве-ускорителе) с оптимизацией производительности, которая регулярно включается в реализацию. Если у вас есть вариант использования чего-то другого, кромеtf.data.Dataset
, обратитесь к следующему разделу этого руководства. В нераспределенном цикле обучения пользователи сначала создают экземплярtf.data.Dataset
а затем перебирают элементы. Например:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.4.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Чтобы позволить пользователям использовать стратегию tf.distribute
с минимальными изменениями в существующем коде пользователя, были введены два API, которые будут распространять экземплярtf.data.Dataset
и возвращать объект распределенного набора данных. Затем пользователь может перебирать этот экземпляр распределенного набора данных и обучать свою модель, как и раньше. Давайте теперь рассмотрим два API - tf.distribute.Strategy.experimental_distribute_dataset
и tf.distribute.Strategy.distribute_datasets_from_function
более подробно:
tf.distribute.Strategy.experimental_distribute_dataset
использование
Этот API принимаетtf.data.Dataset
экземпляр в качестве входных данных и возвращает tf.distribute.DistributedDataset
экземпляр. Вы должны пакетировать входной набор данных со значением, равным глобальному размеру пакета. Этот глобальный размер пакета - это количество образцов, которые вы хотите обработать на всех устройствах за 1 шаг. Вы можете перебирать этот распределенный набор данных в стиле Pythonic или создать итератор с помощью iter
.tf.data.Dataset
объект не является экземпляромtf.data.Dataset
и не поддерживает никаких других API, которые каким-либо образом преобразуют или проверяют набор данных. Это рекомендуемый API, если у вас нет конкретных способов сегментировать ввод по разным репликам.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>)
Характеристики
Дозирование
tf.distribute
загружает входной экземплярtf.data.Dataset
с новым размером пакета, равным глобальному размеру пакета, деленному на количество синхронизируемых реплик. Количество синхронизируемых реплик равно количеству устройств, участвующих в сокращении градиента во время обучения. Когда пользователь вызывает next
на распределенном итераторе, размер пакета данных для каждой реплики возвращается на каждой реплике. Количество элементов повторно сопоставленного набора данных всегда будет кратно количеству реплик. Вот пара примеров:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Без раздачи:
- Пакет 1: [0, 1, 2, 3]
- Пакет 2: [4, 5]
С раздачей более 2 реплик. Последний пакет ([4, 5]) делится между двумя репликами.
Партия 1:
- Реплика 1: [0, 1]
- Реплика 2: [2, 3]
Партия 2:
- Реплика 2: [4]
- Реплика 2: [5]
tf.data.Dataset.range(4).batch(4)
- Без раздачи:
- Пакет 1: [[0], [1], [2], [3]]
- При раздаче более 5 реплик:
- Партия 1:
- Реплика 1: [0]
- Реплика 2: [1]
- Реплика 3: [2]
- Реплика 4: [3]
- Реплика 5: []
tf.data.Dataset.range(8).batch(4)
- Без раздачи:
- Пакет 1: [0, 1, 2, 3]
- Пакет 2: [4, 5, 6, 7]
- С раздачей более 3 реплик:
- Партия 1:
- Реплика 1: [0, 1]
- Реплика 2: [2, 3]
- Реплика 3: []
- Партия 2:
- Реплика 1: [4, 5]
- Реплика 2: [6, 7]
- Реплика 3: []
Повторное объединение набора данных имеет пространственную сложность, которая линейно увеличивается с количеством реплик. Это означает, что для варианта использования обучения с несколькими работниками конвейер ввода может столкнуться с ошибками OOM.
Шардинг
tf.distribute
также автоматически масштабирует входной набор данных при обучении нескольких сотрудников с помощью MultiWorkerMirroredStrategy
и TPUStrategy
. Каждый набор данных создается на ЦП рабочего. Автошардинг набора данных по набору рабочих означает, что каждому рабочему назначается подмножество всего набора данных (если установлен правильный tf.data.experimental.AutoShardPolicy
). Это необходимо для обеспечения того, чтобы на каждом этапе каждый работник обрабатывал глобальный размер пакета неперекрывающихся элементов набора данных. Автошардинг имеет несколько различных параметров, которые можно указать с помощью tf.data.experimental.DistributeOptions
. Обратите внимание, что при обучении нескольких рабочих с помощью ParameterServerStrategy
не используется автошардинг, а дополнительную информацию о создании наборов данных с помощью этой стратегии можно найти в руководстве по стратегии Parameter Server .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Для tf.data.experimental.AutoShardPolicy
можно установить три различных параметра:
- АВТО: это параметр по умолчанию, который означает, что будет предпринята попытка разбить на ФАЙЛ. Попытка сегментировать с помощью FILE завершается ошибкой, если файловый набор данных не обнаружен.
tf.distribute
вернется к сегментированию по ДАННЫМ. Обратите внимание: если входной набор данных основан на файлах, но количество файлов меньше, чем количество рабочих, будетInvalidArgumentError
. В этом случае явно установите для политики значениеAutoShardPolicy.DATA
или разделите источник ввода на файлы меньшего размера, чтобы количество файлов было больше, чем количество рабочих процессов. ФАЙЛ: это вариант, если вы хотите разделить входные файлы на всех рабочих. Вы должны использовать эту опцию, если количество входных файлов намного больше, чем количество рабочих, и данные в файлах распределены равномерно. Обратной стороной этого варианта является наличие простаивающих рабочих, если данные в файлах распределяются неравномерно. Если количество файлов меньше, чем количество рабочих, будет
InvalidArgumentError
. В этом случае явно установите для политики значениеAutoShardPolicy.DATA
. Например, давайте распределим 2 файла по 2 воркерам по 1 реплике в каждом. Файл 1 содержит [0, 1, 2, 3, 4, 5], а файл 2 содержит [6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик будет 2, а размер глобального пакета - 4.- Рабочий 0:
- Пакет 1 = Реплика 1: [0, 1]
- Пакет 2 = Реплика 1: [2, 3]
- Пакет 3 = Реплика 1: [4]
- Пакет 4 = Реплика 1: [5]
- Рабочий 1:
- Пакет 1 = Реплика 2: [6, 7]
- Пакет 2 = Реплика 2: [8, 9]
- Пакет 3 = Реплика 2: [10]
- Пакет 4 = Реплика 2: [11]
ДАННЫЕ: это автоматически перекроет элементы для всех рабочих процессов. Каждый из рабочих будет читать весь набор данных и обрабатывать только назначенный ему сегмент. Все остальные осколки будут отброшены. Обычно это используется, если количество входных файлов меньше количества воркеров, и вы хотите улучшить сегментирование данных по всем воркерам. Обратной стороной является то, что весь набор данных будет прочитан каждым рабочим. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик равно 2.
- Рабочий 0:
- Пакет 1 = Реплика 1: [0, 1]
- Пакет 2 = Реплика 1: [4, 5]
- Пакет 3 = Реплика 1: [8, 9]
- Рабочий 1:
- Пакет 1 = Реплика 2: [2, 3]
- Пакет 2 = Реплика 2: [6, 7]
- Пакет 3 = Реплика 2: [10, 11]
ВЫКЛ: Если вы отключите автошардинг, каждый воркер будет обрабатывать все данные. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик равно 2. Тогда каждый воркер увидит следующее распределение:
- Рабочий 0:
- Пакет 1 = Реплика 1: [0, 1]
- Пакет 2 = Реплика 1: [2, 3]
- Пакет 3 = Реплика 1: [4, 5]
- Пакет 4 = Реплика 1: [6, 7]
- Пакет 5 = Реплика 1: [8, 9]
Пакет 6 = Реплика 1: [10, 11]
Рабочий 1:
Пакет 1 = Реплика 2: [0, 1]
Пакет 2 = Реплика 2: [2, 3]
Пакет 3 = Реплика 2: [4, 5]
Пакет 4 = Реплика 2: [6, 7]
Пакет 5 = Реплика 2: [8, 9]
Пакет 6 = Реплика 2: [10, 11]
Предварительная загрузка
По умолчанию tf.distribute
добавляет преобразование предварительной выборки в конец предоставленного пользователем экземпляраtf.data.Dataset
. Аргумент преобразования предварительной выборки, который равен buffer_size
, равен количеству синхронизируемых реплик.
tf.distribute.Strategy.distribute_datasets_from_function
использование
Этот API принимает входную функцию и возвращает экземпляр tf.distribute.DistributedDataset
. Функция ввода, которую передают пользователи, имеет аргумент tf.distribute.InputContext
и должна возвращать экземплярtf.data.Dataset
. С помощью этого API tf.distribute
не вносит никаких дальнейших изменений в пользовательский экземплярtf.data.Dataset
возвращаемый функцией ввода. Пользователь несет ответственность за пакетирование и сегментирование набора данных. tf.distribute
вызывает функцию ввода на устройстве ЦП каждого из рабочих. Помимо того, что пользователи могут указывать собственную логику пакетирования и сегментирования, этот API также демонстрирует лучшую масштабируемость и производительность по сравнению с tf.distribute.Strategy.experimental_distribute_dataset
при использовании для обучения нескольких рабочих.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Характеристики
Дозирование
Экземплярtf.data.Dataset
который является возвращаемым значением функции ввода, должен быть упакован с использованием размера пакета для каждой реплики. Размер пакета на реплику - это глобальный размер пакета, деленный на количество реплик, участвующих в обучении синхронизации. Это связано с тем, что tf.distribute
вызывает функцию ввода на устройстве ЦП каждого из рабочих процессов. Набор данных, созданный для данного воркера, должен быть готов к использованию всеми репликами этого воркера.
Шардинг
Объект tf.distribute.InputContext
который неявно передается в качестве аргумента функции ввода пользователя, создается tf.distribute
под капотом. Он содержит информацию о количестве рабочих, текущем идентификаторе рабочего и т. Д. Эта функция ввода может обрабатывать сегментирование в соответствии с политиками, установленными пользователем, с использованием этих свойств, которые являются частью объекта tf.distribute.InputContext
.
Предварительная загрузка
tf.distribute
не добавляет преобразование предварительной выборки в конецtf.data.Dataset
возвращаемого функцией ввода, предоставленной пользователем.
Распределенные итераторы
Подобноtf.data.Dataset
экземплярамtf.data.Dataset
, вам потребуется создать итератор tf.distribute.DistributedDataset
экземпляров tf.distribute.DistributedDataset
чтобы перебирать его и получать доступ к элементам в tf.distribute.DistributedDataset
. Ниже приведены способы, которыми вы можете создать tf.distribute.DistributedIterator
и использовать его для обучения вашей модели:
Использование
Используйте конструкцию цикла Pythonic for
Вы можете использовать удобный цикл tf.distribute.DistributedDataset
для перебора tf.distribute.DistributedDataset
. Элементы, возвращаемые из tf.distribute.DistributedIterator
могут быть одним tf.Tensor
или tf.distribute.DistributedValues
который содержит значение для каждой реплики. Размещение цикла внутри tf.function
даст прирост производительности. Однако в настоящее время break
и return
не поддерживаются для цикла над tf.distribute.DistributedDataset
который помещается внутри tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Используйте iter
для создания явного итератора
Чтобы перебирать элементы в экземпляре tf.distribute.DistributedDataset
, вы можете создать tf.distribute.DistributedIterator
используя для него iter
API. С помощью явного итератора вы можете выполнять итерацию для фиксированного количества шагов. Для того , чтобы получить следующий элемент из tf.distribute.DistributedIterator
экземпляра dist_iterator
, вы можете вызвать next(dist_iterator)
, dist_iterator.get_next()
или dist_iterator.get_next_as_optional()
. Первые два по сути одинаковы:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
С next()
или tf.distribute.DistributedIterator.get_next()
, если tf.distribute.DistributedIterator
достиг своего конца, будет выдана ошибка OutOfRange. Клиент может поймать ошибку на стороне Python и продолжить выполнение другой работы, такой как контрольные точки и оценка. Однако это не сработает, если вы используете цикл обучения хоста (т. tf.function
несколько шагов для каждой tf.function
), который выглядит так:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
содержит несколько шагов, tf.range
тело шага в tf.range
. В этом случае разные итерации в цикле без зависимости могут запускаться параллельно, поэтому ошибка OutOfRange может быть вызвана в последующих итерациях до завершения вычисления предыдущих итераций. Как только возникает ошибка OutOfRange, все операции в функции сразу же завершаются. Если вы хотели бы избежать этого случая, альтернативой, которая не вызывает ошибку tf.distribute.DistributedIterator.get_next_as_optional()
является tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
возвращает tf.experimental.Optional
который содержит следующий элемент или не содержит значения, если tf.distribute.DistributedIterator
достиг своего конца.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Использование свойства element_spec
Если вы передаете элементы распределенного набора данных в tf.function
и хотите tf.TypeSpec
гарантии, вы можете указать input_signature
аргумент tf.function
. Выходные данные распределенного набора данных - это tf.distribute.DistributedValues
который может представлять входные данные для одного или нескольких устройств. Чтобы получить tf.TypeSpec
соответствующий этому распределенному значению, вы можете использовать свойство element_spec
распределенного набора данных или объекта распределенного итератора.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Частичные партии
Частичные пакеты встречаются, когда экземплярыtf.data.Dataset
пользователями, могут содержать размеры пакетов, которые не делятся равномерно на количество реплик, или когда количество элементов экземпляра набора данных не делится на размер пакета. Это означает, что когда набор данных распределяется по нескольким репликам, next
вызов некоторых итераторов приведет к ошибке OutOfRangeError. Чтобы справиться с этим вариантом использования, tf.distribute
возвращает фиктивные пакеты размером 0 на репликах, у которых больше нет данных для обработки.
Для случая одного рабочего, если данные не возвращаются при next
вызове итератора, создаются фиктивные пакеты с размером пакета 0, которые используются вместе с реальными данными в наборе данных. В случае частичных пакетов последний глобальный пакет данных будет содержать реальные данные наряду с фиктивными пакетами данных. Условие остановки обработки данных теперь проверяет, есть ли данные у какой-либо из реплик. Если нет данных ни на одной из реплик, выдается ошибка OutOfRange.
Для случая с несколькими рабочими логическое значение, представляющее наличие данных по каждому из рабочих, агрегируется с использованием перекрестной связи реплик, и это используется, чтобы определить, все ли рабочие завершили обработку распределенного набора данных. Поскольку это связано с взаимодействием между сотрудниками, возникает некоторое снижение производительности.
Предостережения
При использовании API
tf.distribute.Strategy.experimental_distribute_dataset
с настройкой нескольких рабочих, пользователи передаютtf.data.Dataset
который читает из файлов. Если дляtf.data.experimental.AutoShardPolicy
задано значениеAUTO
илиFILE
, фактический размер пакета на шаг может быть меньше, чем глобальный размер пакета, определенный пользователем. Это может произойти, когда оставшиеся элементы в файле меньше глобального размера пакета. Пользователи могут либо исчерпать набор данных, независимо от количества выполняемых шагов, либо установить дляtf.data.experimental.AutoShardPolicy
значениеDATA
чтобы обойти это.Преобразования набора данных с
tf.distribute
состояния в настоящее время не поддерживаютсяtf.distribute
и любыеtf.distribute
сtf.distribute
состояния, которые может иметь набор данных, в настоящее время игнорируются. Например, если в вашем наборе данных естьmap_fn
который используетtf.random.uniform
для поворота изображения, тогда у вас есть граф набора данных, который зависит от состояния (т.е. случайного начального числа) на локальном компьютере, на котором выполняется процесс python.Экспериментальные
tf.data.experimental.OptimizationOptions
, которые отключены по умолчанию, могут в определенных контекстах - например, при использовании вместе сtf.distribute
- вызывать снижение производительности. Их следует включать только после того, как вы убедитесь, что они улучшают производительность вашей рабочей нагрузки в настройке распространения.Пожалуйста, обратитесь к этому руководству, чтобы узнать, как в целом оптимизировать конвейер ввода с помощью
tf.data
. Несколько дополнительных советов:Если у вас несколько рабочих
tf.data.Dataset.list_files
и вы используетеtf.data.Dataset.list_files
для создания набора данных из всех файлов, соответствующих одному или нескольким шаблонам глобусов, не забудьте установитьseed
аргумент или установитьshuffle=False
чтобы каждый рабочий сегментировал файл последовательно.Если ваш входной конвейер включает в себя как перетасовку данных на уровне записи, так и анализ данных, если неанализируемые данные значительно больше, чем проанализированные данные (что обычно не так), сначала перемешайте, а затем проанализируйте, как показано в следующем примере. Это может улучшить использование памяти и производительность.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
поддерживает внутренний буфер элементовbuffer_size
, и, таким образом, уменьшениеbuffer_size
может облегчить проблему OOM.Порядок, в котором данные обрабатываются рабочими при использовании
tf.distribute.experimental_distribute_dataset
илиtf.distribute.distribute_datasets_from_function
, не гарантируется. Обычно это требуется, если вы используетеtf.distribute
для прогнозирования масштаба. Однако вы можете вставить индекс для каждого элемента в пакете и соответственно упорядочить выходные данные. Следующий фрагмент является примером того, как заказывать выходные данные.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
Как мне распространять свои данные, если я не использую канонический экземпляр tf.data.Dataset?
Иногда пользователи не могут использоватьtf.data.Dataset
для представления своих входных данных, а затем упомянутые выше API-интерфейсы для распространения набора данных на несколько устройств. В таких случаях вы можете использовать необработанные тензоры или входные данные от генератора.
Используйте экспериментальную_distribute_values_from_function для произвольных входных значений тензора
strategy.run
принимает tf.distribute.DistributedValues
который является выходом next(iterator)
. Чтобы передать значения тензора, используйте experimental_distribute_values_from_function
для построения tf.distribute.DistributedValues
из необработанных тензоров.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Используйте tf.data.Dataset.from_generator, если ваш ввод от генератора
Если у вас есть функция генератора, которую вы хотите использовать, вы можете создать экземплярtf.data.Dataset
с from_generator
API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)