Эта страница была переведа с помощью Cloud Translation API.
Switch to English

Распределенный ввод

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть источник на GitHub Скачать блокнот

API-интерфейсы tf.distribute предоставляют пользователям простой способ масштабирования обучения с одной машины на несколько машин. При масштабировании своей модели пользователи также должны распределять свои входные данные между несколькими устройствами. tf.distribute предоставляет API-интерфейсы, с помощью которых вы можете автоматически распределять входные данные между устройствами.

Это руководство покажет вам различные способы создания распределенного набора данных и итераторов с tf.distribute API-интерфейсов tf.distribute . Дополнительно будут рассмотрены следующие темы:

В этом руководстве не рассматривается использование распределенного ввода с API Keras.

Распределенные наборы данных

Чтобы использовать API tf.distribute для масштабирования, пользователям рекомендуется использовать tf.data.Dataset для представления своих входных данных. tf.distribute был создан для эффективной работы с tf.data.Dataset (например, автоматическая предварительная выборка данных на каждое ускорительное устройство) с оптимизацией производительности, которая регулярно включается в реализацию. Если у вас есть вариант использования чего-то другого, кроме tf.data.Dataset , обратитесь к последующему разделу этого руководства. В нераспределенном цикле обучения пользователи сначала создают экземпляр tf.data.Dataset а затем перебирают элементы. Например:

 import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.3.0

 global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

 
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Чтобы позволить пользователям использовать стратегию tf.distribute с минимальными изменениями в существующем коде пользователя, были введены два API, которые будут распространять экземпляр tf.data.Dataset и возвращать объект распределенного набора данных. Затем пользователь может перебирать этот экземпляр распределенного набора данных и обучать свою модель, как и раньше. Давайте теперь посмотрим на два API - tf.distribute.Strategy.experimental_distribute_dataset и tf.distribute.Strategy.experimental_distribute_datasets_from_function более подробно:

tf.distribute.Strategy.experimental_distribute_dataset

использование

Этот API-интерфейс принимает экземпляр tf.data.Dataset качестве входных данных и возвращает экземпляр tf.distribute.DistributedDataset . Вам следует пакетировать входной набор данных со значением, равным глобальному размеру пакета. Этот глобальный размер пакета - это количество выборок, которое вы хотите обработать на всех устройствах за 1 шаг. Вы можете перебирать этот распределенный набор данных в стиле Pythonic или создать итератор с помощью iter . tf.data.Dataset объект не является экземпляром tf.data.Dataset и не поддерживает любые другие API, которые каким-либо образом преобразуют или проверяют набор данных. Это рекомендуемый API, если у вас нет конкретных способов сегментировать ввод по разным репликам.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

свойства

дозирующий

tf.distribute загружает входной экземпляр tf.data.Dataset с новым размером пакета, который равен глобальному размеру пакета, деленному на количество синхронизируемых реплик. Количество синхронизируемых реплик равно количеству устройств, участвующих в уменьшении градиента во время обучения. Когда пользователь вызывает next распределенный итератор, для каждой реплики возвращается размер пакета для каждой реплики. Количество элементов повторно сопоставленного набора данных всегда будет кратным количеству реплик. Вот пара примеров:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    Без раздачи:

    Пакет 1: [0, 1, 2, 3]

    Пакет 2: [4, 5]

    С распределением по 2 репликам:

    Пакет 1: Реплика 1: [0, 1] Реплика 2: [2, 3]

    Пакет 2: Реплика 2: [4] Реплика 2: [5]

    Последний пакет ([4, 5]) делится между 2 репликами.

  • tf.data.Dataset.range(4).batch(4)

    Без раздачи:

    Пакет 1: [[0], [1], [2], [3]]

    С распределением по 5 репликам:

    Пакет 1: Реплика 1: [0] Реплика 2: [1] Реплика 3: [2] Реплика 4: [3] Реплика 5: []

  • tf.data.Dataset.range(8).batch(4)

    Без раздачи:

    Пакет 1: [0, 1, 2, 3]

    Пакет 2: [4, 5, 6, 7]

    При раздаче более 3-х реплик:

    Пакет 1: Реплика 1: [0, 1] Реплика 2: [2, 3] Реплика 3: []

    Пакет 2: Реплика 1: [4, 5] Реплика 2: [6, 7] Реплика 3: []

Перепаковка набора данных имеет сложность пространства, которая линейно возрастает с увеличением количества реплик. Это означает, что в случае использования обучения с несколькими работниками входной конвейер может столкнуться с ошибками OOM.

Sharding

tf.distribute также автоматически масштабирует входной набор данных при обучении нескольких рабочих. Каждый набор данных создается на устройстве ЦП рабочего. Автошардирование набора данных по набору рабочих означает, что каждому рабочему назначается подмножество всего набора данных (если tf.data.experimental.AutoShardPolicy правильный tf.data.experimental.AutoShardPolicy ). Это необходимо для обеспечения того, чтобы на каждом этапе каждый рабочий процесс обрабатывал глобальный размер пакета неперекрывающихся элементов набора данных. Автошардинг имеет несколько различных параметров, которые можно указать с помощью tf.data.experimental.DistributeOptions .

 dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
 

Для tf.data.experimental.AutoShardPolicy можно установить три различных параметра:

  • AUTO: это опция по умолчанию, которая означает, что будет предпринята попытка сегментирования с помощью FILE Попытка сегментирования с помощью FILE не удалась, если набор данных на основе файлов не обнаружен. tf.distribute вернется к сегментированию с помощью DATA. Обратите внимание: если входной набор данных основан на файлах, но количество файлов меньше, чем количество рабочих процессов, возникнет ошибка.
  • ФАЙЛ: это вариант, если вы хотите разделить входные файлы на всех рабочих. Если количество файлов меньше, чем количество рабочих, возникнет ошибка. Вы должны использовать эту опцию, если количество входных файлов намного больше, чем количество рабочих, и данные в файлах распределены равномерно. Обратной стороной этого варианта является наличие простаивающих рабочих, если данные в файлах распределяются неравномерно. Например, давайте распространим 2 файла на 2 рабочих с 1 репликой в ​​каждом. Файл 1 содержит [0, 1, 2, 3, 4, 5], а файл 2 содержит [6, 7, 8, 9, 10, 11]. Пусть общее количество реплик в синхронизации равно 2, а общий размер пакета равен 4.

    • Рабочий 0:

    Пакет 1 = Реплика 1: [0, 1]

    Пакет 2 = Реплика 1: [2, 3]

    Пакет 3 = Реплика 1: [4]

    Пакет 4 = Реплика 1: [5]

    • Рабочий 1:

    Пакет 1 = Реплика 2: [6, 7]

    Пакет 2 = Реплика 2: [8, 9]

    Пакет 3 = Реплика 2: [10]

    Пакет 4 = Реплика 2: [11]

  • ДАННЫЕ: это автошардирует элементы для всех рабочих процессов. Каждый из рабочих будет читать весь набор данных и обрабатывать только назначенный ему сегмент. Все остальные осколки будут отброшены. Обычно это используется, если количество входных файлов меньше, чем количество воркеров, и вы хотите улучшить сегментирование данных по всем воркерам. Обратной стороной является то, что весь набор данных будет прочитан каждым рабочим. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик равно 2.

    • Рабочий 0:

    Пакет 1 = Реплика 1: [0, 1]

    Пакет 2 = Реплика 1: [4, 5]

    Пакет 3 = Реплика 1: [8, 9]

    • Рабочий 1:

    Пакет 1 = Реплика 2: [2, 3]

    Пакет 2 = Реплика 2: [6, 7]

    Пакет 3 = Реплика 2: [10, 11]

  • ВЫКЛ: Если вы отключите автошард, каждый работник будет обрабатывать все данные. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик равно 2. Тогда каждый воркер увидит следующее распределение:

    • Рабочий 0:

    Пакет 1 = Реплика 1: [0, 1]

    Пакет 2 = Реплика 1: [2, 3]

    Пакет 3 = Реплика 1: [4, 5]

    Пакет 4 = Реплика 1: [6, 7]

    Партия 5 = копия 1: [8, 9]

    Пакет 6 = Реплика 1: [10, 11]

    • Рабочий 1:

    Пакет 1 = Реплика 2: [0, 1]

    Пакет 2 = Реплика 2: [2, 3]

    Пакет 3 = Реплика 2: [4, 5]

    Пакет 4 = Реплика 2: [6, 7]

    Пакет 5 = Реплика 2: [8, 9]

    Партия 6 = Реплика 2: [10, 11]

Предзагрузка

По умолчанию tf.distribute добавляет преобразование предварительной выборки в конец предоставленного пользователем экземпляра tf.data.Dataset . Аргумент преобразования prefetch, который равен buffer_size , равен числу реплик в синхронизации.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

использование

Этот API-интерфейс принимает функцию ввода и возвращает экземпляр tf.distribute.DistributedDataset . Функция ввода, которую передают пользователи, имеет аргумент tf.distribute.InputContext и должна возвращать экземпляр tf.data.Dataset . С помощью этого API tf.distribute не вносит никаких дальнейших изменений в экземпляр tf.data.Dataset пользователя, возвращаемый из функции ввода. Пользователь несет ответственность за пакетирование и сегментирование набора данных. tf.distribute вызывает функцию ввода на устройстве ЦП каждого из рабочих. Помимо того, что пользователи могут указывать собственную логику пакетирования и сегментирования, этот API также демонстрирует лучшую масштабируемость и производительность по сравнению с tf.distribute.Strategy.experimental_distribute_dataset при использовании для обучения нескольких рабочих.

 mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

свойства

дозирующий

Экземпляр tf.data.Dataset который является возвращаемым значением функции ввода, должен быть упакован с использованием размера пакета для каждой реплики. Размер пакета на реплику - это глобальный размер пакета, деленный на количество реплик, участвующих в обучении синхронизации. Это потому, что tf.distribute вызывает функцию ввода на устройстве ЦП каждого из рабочих. Набор данных, созданный для данного воркера, должен быть готов к использованию всеми репликами этого воркера.

Sharding

Объект tf.distribute.InputContext который неявно передается в качестве аргумента функции ввода пользователя, создается tf.distribute под капотом. В нем содержится информация о количестве рабочих, текущем идентификаторе рабочего и т. Д. Эта функция ввода может обрабатывать сегментирование в соответствии с политиками, установленными пользователем с использованием этих свойств, которые являются частью объекта tf.distribute.InputContext .

Предзагрузка

tf.distribute не добавляет преобразование предварительной выборки в конец tf.data.Dataset возвращаемого функцией ввода, предоставленной пользователем.

Распределенные итераторы

Подобно tf.data.Dataset экземплярам tf.data.Dataset , вам нужно будет создать итератор tf.distribute.DistributedDataset экземпляров tf.distribute.DistributedDataset чтобы выполнить итерацию по нему и получить доступ к элементам в tf.distribute.DistributedDataset . Ниже приведены способы, которыми вы можете создать tf.distribute.DistributedIterator и использовать его для обучения вашей модели:

Обычаи

Используйте конструкцию цикла Pythonic for

Вы можете использовать удобный цикл tf.distribute.DistributedDataset для итерации по tf.distribute.DistributedDataset . Элементы, возвращаемые из tf.distribute.DistributedIterator могут быть одним tf.Tensor или tf.distribute.DistributedValues который содержит значение для каждой реплики. Размещение цикла внутри функции tf.function повысит производительность. Однако break и return в настоящее время не поддерживаются, если цикл помещен внутри tf.function . Мы также не поддерживаем размещение цикла внутри tf.function при использовании многопользовательских стратегий, таких как tf.distribute.experimental.MultiWorkerMirroredStrategy и tf.distribute.TPUStrategy . Размещение цикла внутри tf.function работает для одного рабочего tf.distribute.TPUStrategy но не при использовании модулей TPU.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Используйте iter для создания явного итератора

Чтобы перебирать элементы в экземпляре tf.distribute.DistributedDataset , вы можете создать tf.distribute.DistributedIterator используя для него iter API. С явным итератором вы можете выполнять итерацию за фиксированное количество шагов. Для того , чтобы получить следующий элемент из tf.distribute.DistributedIterator экземпляра dist_iterator , вы можете вызвать next(dist_iterator) , dist_iterator.get_next() или dist_iterator.get_next_as_optional() . Первые два по сути одинаковы:

 num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
 
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

С next() или tf.distribute.DistributedIterator.get_next() , если tf.distribute.DistributedIterator достиг своего конца, будет выдана ошибка OutOfRange. Клиент может поймать ошибку на стороне Python и продолжить выполнение другой работы, такой как контрольные точки и оценка. Однако это не сработает, если вы используете цикл обучения хоста (т. tf.function несколько шагов для каждой tf.function ), который выглядит так:

 @tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
 

train_fn содержит несколько шагов, tf.range тело шага в tf.range . В этом случае различные итерации в цикле без зависимости могут начаться параллельно, поэтому ошибка OutOfRange может быть вызвана на более поздних итерациях до завершения вычисления предыдущих итераций. Как только выдается ошибка OutOfRange, все операции в функции будут немедленно прекращены. Если это какой-то случай, который вы хотели бы избежать, альтернативой, которая не выдает ошибку tf.distribute.DistributedIterator.get_next_as_optional() является tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional возвращает tf.experimental.Optional который содержит следующий элемент или не содержит значения, если tf.distribute.DistributedIterator достиг своего конца.

 # You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
 
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Использование свойства element_spec

Если вы передаете элементы распределенного набора данных в функцию tf.function и хотите tf.TypeSpec гарантию tf.TypeSpec , вы можете указать аргумент input_signature для функции tf.function . Выход распределенного набора данных - tf.distribute.DistributedValues который может представлять входные данные для одного устройства или нескольких устройств. Чтобы получить tf.TypeSpec соответствующий этому распределенному значению, вы можете использовать свойство element_spec распределенного набора данных или распределенного объекта итератора.

 global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs
  
  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Частичные партии

Частичные пакеты встречаются, когда tf.data.Dataset пользователями экземпляры tf.data.Dataset могут содержать размеры пакетов, которые не делятся поровну на количество реплик, или когда мощность экземпляра набора данных не делится на размер пакета. Это означает, что когда набор данных распределен по нескольким репликам, next вызов на некоторых итераторах приведет к OutOfRangeError. Чтобы обработать этот вариант использования, tf.distribute возвращает фиктивные пакеты размером 0 в репликах, у которых больше нет данных для обработки.

Для случая одного рабочего, если данные не возвращаются при next вызове итератора, создаются фиктивные пакеты с размером пакета 0, которые используются вместе с реальными данными в наборе данных. В случае частичных пакетов последний глобальный пакет данных будет содержать реальные данные наряду с фиктивными пакетами данных. Условие остановки обработки данных теперь проверяет, есть ли данные на какой-либо из реплик. Если нет данных ни на одной из реплик, выдается ошибка OutOfRange.

Для случая с несколькими рабочими логическое значение, представляющее наличие данных по каждому из рабочих, агрегируется с использованием перекрестной связи реплик, и это используется, чтобы определить, все ли рабочие завершили обработку распределенного набора данных. Поскольку это связано с общением между сотрудниками, возникает некоторое снижение производительности.

Предостережения

  • При использовании API tf.distribute.Strategy.experimental_distribute_dataset с настройкой нескольких рабочих пользователи передают tf.data.Dataset который читает из файлов. Если для tf.data.experimental.AutoShardPolicy задано значение AUTO или FILE , фактический размер пакета на шаг может быть меньше, чем глобальный размер пакета, определенный пользователем. Это может произойти, когда оставшиеся элементы в файле меньше глобального размера пакета. Пользователи могут либо исчерпать набор данных, не завися от количества выполняемых шагов, либо установить tf.data.experimental.AutoShardPolicy в DATA чтобы обойти его.

  • Преобразования набора данных с tf.distribute состояния в настоящее время не поддерживаются с помощью tf.distribute и любые tf.distribute состоянием, которые могут иметь набор данных, в настоящее время игнорируются. Например, если в вашем наборе данных есть map_fn который использует tf.random.uniform для поворота изображения, то у вас есть граф набора данных, который зависит от состояния (т. tf.random.uniform Случайного начального числа) на локальной машине, где выполняется процесс python.

  • Экспериментальные tf.data.experimental.OptimizationOptions , которые отключены по умолчанию, могут в определенных контекстах - например, при использовании вместе с tf.distribute - вызывать снижение производительности. Их следует включать только после того, как вы убедитесь, что они улучшают производительность вашей рабочей нагрузки в настройке распределения.

  • Порядок, в котором данные обрабатываются рабочими при использовании tf.distribute.experimental_distribute_dataset или tf.distribute.experimental_distribute_datasets_from_function , не гарантируется. Обычно это требуется, если вы используете tf.distribute для прогнозирования масштаба. Однако вы можете вставить индекс для каждого элемента в пакете и соответственно упорядочить выходные данные. Следующий фрагмент является примером того, как упорядочить выходные данные.

 mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Как я могу распространять свои данные, если я не использую канонический экземпляр tf.data.Dataset?

Иногда пользователи не могут использовать tf.data.Dataset для представления своих входных данных и, tf.data.Dataset вышеупомянутые API для распространения набора данных на несколько устройств. В таких случаях вы можете использовать необработанные тензоры или входные данные от генератора.

Используйте экспериментальную_distribute_values_from_function для произвольных тензорных входов

strategy.run принимает tf.distribute.DistributedValues который является выходом next(iterator) . Чтобы передать значения тензора, используйте experimental_distribute_values_from_function для построения tf.distribute.DistributedValues из сырых тензоров.

 mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Используйте tf.data.Dataset.from_generator, если ваш ввод от генератора

Если у вас есть функция генератора, которую вы хотите использовать, вы можете создать экземпляр tf.data.Dataset с from_generator API from_generator .

 mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)