Присоединяйтесь к нам практически на женщин в МЛ симпозиуме по 19 октября Зарегистрируйтесь сейчас

Распределенный ввод

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

API-интерфейсы tf.distribute предоставляют пользователям простой способ масштабирования обучения с одной машины на несколько машин. При масштабировании своей модели пользователи также должны распределять свои входные данные по нескольким устройствам. tf.distribute предоставляет API-интерфейсы, с помощью которых вы можете автоматически распределять ввод между устройствами.

Это руководство покажет вам различные способы создания распределенных наборов данных и итераторов с tf.distribute API tf.distribute . Кроме того, будут рассмотрены следующие темы:

В этом руководстве не рассматривается использование распределенного ввода с API Keras.

Распределенные наборы данных

Чтобы использовать API tf.distribute для масштабирования, рекомендуется, чтобы пользователи использовалиtf.data.Dataset для представления своих входных данных. tf.distribute был создан для эффективной работы сtf.data.Dataset (например, автоматическая предварительная выборка данных на каждом устройстве-ускорителе) с оптимизацией производительности, которая регулярно включается в реализацию. Если у вас есть вариант использования чего-то другого, кромеtf.data.Dataset , обратитесь к следующему разделу этого руководства. В нераспределенном цикле обучения пользователи сначала создают экземплярtf.data.Dataset а затем перебирают элементы. Например:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Чтобы позволить пользователям использовать стратегию tf.distribute с минимальными изменениями в существующем коде пользователя, были введены два API, которые будут распространять экземплярtf.data.Dataset и возвращать объект распределенного набора данных. Затем пользователь может перебирать этот экземпляр распределенного набора данных и обучать свою модель, как и раньше. Давайте теперь рассмотрим два API - tf.distribute.Strategy.experimental_distribute_dataset и tf.distribute.Strategy.distribute_datasets_from_function более подробно:

tf.distribute.Strategy.experimental_distribute_dataset

Применение

Этот API принимаетtf.data.Dataset экземпляр в качестве входных данных и возвращает tf.distribute.DistributedDataset экземпляр. Вы должны пакетировать входной набор данных со значением, равным глобальному размеру пакета. Этот глобальный размер пакета - это количество образцов, которые вы хотите обработать на всех устройствах за 1 шаг. Вы можете перебирать этот распределенный набор данных в стиле Pythonic или создать итератор с помощью iter .tf.data.Dataset объект не является экземпляромtf.data.Dataset и не поддерживает никаких других API, которые каким-либо образом преобразуют или проверяют набор данных. Это рекомендуемый API, если у вас нет конкретных способов сегментировать ввод по разным репликам.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Характеристики

Дозирование

tf.distribute загружает входной экземплярtf.data.Dataset с новым размером пакета, равным глобальному размеру пакета, деленному на количество синхронизируемых реплик. Количество синхронизируемых реплик равно количеству устройств, участвующих в сокращении градиента во время обучения. Когда пользователь вызывает next на распределенном итераторе, размер пакета данных для каждой реплики возвращается на каждой реплике. Количество элементов повторно сопоставленного набора данных всегда будет кратно количеству реплик. Вот пара примеров:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Без раздачи:
    • Пакет 1: [0, 1, 2, 3]
    • Пакет 2: [4, 5]
    • С раздачей более 2 реплик. Последний пакет ([4, 5]) делится между двумя репликами.

    • Партия 1:

      • Реплика 1: [0, 1]
      • Реплика 2: [2, 3]
    • Партия 2:

      • Реплика 2: [4]
      • Реплика 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Без раздачи:
    • Пакет 1: [[0], [1], [2], [3]]
    • При раздаче более 5 реплик:
    • Партия 1:
      • Реплика 1: [0]
      • Реплика 2: [1]
      • Реплика 3: [2]
      • Реплика 4: [3]
      • Реплика 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Без раздачи:
    • Пакет 1: [0, 1, 2, 3]
    • Пакет 2: [4, 5, 6, 7]
    • При раздаче более 3-х реплик:
    • Партия 1:
      • Реплика 1: [0, 1]
      • Реплика 2: [2, 3]
      • Реплика 3: []
    • Партия 2:
      • Реплика 1: [4, 5]
      • Реплика 2: [6, 7]
      • Реплика 3: []

Повторное объединение набора данных имеет пространственную сложность, которая линейно увеличивается с количеством реплик. Это означает, что для варианта использования обучения с несколькими рабочими конвейер ввода может столкнуться с ошибками OOM.

Шардинг

tf.distribute также автоматически масштабирует входной набор данных при обучении нескольких сотрудников с помощью MultiWorkerMirroredStrategy и TPUStrategy . Каждый набор данных создается на ЦП рабочего. Автошардинг набора данных по набору рабочих означает, что каждому рабочему назначается подмножество всего набора данных (если установлен правильный tf.data.experimental.AutoShardPolicy ). Это необходимо для обеспечения того, чтобы на каждом этапе каждый работник обрабатывал глобальный размер пакета неперекрывающихся элементов набора данных. Автошардинг имеет несколько различных параметров, которые можно указать с помощью tf.data.experimental.DistributeOptions . Обратите внимание, что при обучении нескольких рабочих с помощью ParameterServerStrategy не используется автошардинг, а дополнительную информацию о создании наборов данных с помощью этой стратегии можно найти в руководстве по стратегии сервера параметров .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

Для tf.data.experimental.AutoShardPolicy можно установить три различных параметра:

  • АВТО: это параметр по умолчанию, который означает, что будет предпринята попытка разбить на ФАЙЛ. Попытка сегментировать с помощью FILE завершается ошибкой, если файловый набор данных не обнаружен. tf.distribute вернется к сегментированию по ДАННЫМ. Обратите внимание: если входной набор данных основан на файлах, но количество файлов меньше, чем количество рабочих, будет InvalidArgumentError . В этом случае явно установите для политики значение AutoShardPolicy.DATA или разделите источник ввода на файлы меньшего размера, чтобы количество файлов было больше, чем количество рабочих процессов.
  • ФАЙЛ: это вариант, если вы хотите разделить входные файлы на всех рабочих. Вы должны использовать эту опцию, если количество входных файлов намного больше, чем количество рабочих, и данные в файлах распределены равномерно. Обратной стороной этого варианта является наличие простаивающих рабочих, если данные в файлах распределяются неравномерно. Если количество файлов меньше, чем количество рабочих, будет InvalidArgumentError . В этом случае явно установите для политики значение AutoShardPolicy.DATA . Например, давайте распределим 2 файла по 2 воркерам по 1 реплике в каждом. Файл 1 содержит [0, 1, 2, 3, 4, 5], а файл 2 содержит [6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик будет 2, а размер глобального пакета - 4.

    • Рабочий 0:
    • Пакет 1 = Реплика 1: [0, 1]
    • Пакет 2 = Реплика 1: [2, 3]
    • Пакет 3 = Реплика 1: [4]
    • Пакет 4 = Реплика 1: [5]
    • Рабочий 1:
    • Пакет 1 = Реплика 2: [6, 7]
    • Пакет 2 = Реплика 2: [8, 9]
    • Пакет 3 = Реплика 2: [10]
    • Пакет 4 = Реплика 2: [11]
  • ДАННЫЕ: это автоматически перекроет элементы для всех рабочих процессов. Каждый из рабочих будет читать весь набор данных и обрабатывать только назначенный ему сегмент. Все остальные осколки будут отброшены. Обычно это используется, если количество входных файлов меньше количества воркеров, и вы хотите улучшить сегментирование данных по всем воркерам. Обратной стороной является то, что весь набор данных будет прочитан каждым рабочим. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик равно 2.

    • Рабочий 0:
    • Пакет 1 = Реплика 1: [0, 1]
    • Пакет 2 = Реплика 1: [4, 5]
    • Пакет 3 = Реплика 1: [8, 9]
    • Рабочий 1:
    • Пакет 1 = Реплика 2: [2, 3]
    • Пакет 2 = Реплика 2: [6, 7]
    • Пакет 3 = Реплика 2: [10, 11]
  • ВЫКЛ: Если вы отключите автошардинг, каждый воркер будет обрабатывать все данные. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизируемых реплик равно 2. Тогда каждый воркер увидит следующее распределение:

    • Рабочий 0:
    • Пакет 1 = Реплика 1: [0, 1]
    • Пакет 2 = Реплика 1: [2, 3]
    • Пакет 3 = Реплика 1: [4, 5]
    • Пакет 4 = Реплика 1: [6, 7]
    • Пакет 5 = Реплика 1: [8, 9]
    • Пакет 6 = Реплика 1: [10, 11]

    • Рабочий 1:

    • Пакет 1 = Реплика 2: [0, 1]

    • Пакет 2 = Реплика 2: [2, 3]

    • Пакет 3 = Реплика 2: [4, 5]

    • Пакет 4 = Реплика 2: [6, 7]

    • Пакет 5 = Реплика 2: [8, 9]

    • Пакет 6 = Реплика 2: [10, 11]

Предварительная загрузка

По умолчанию tf.distribute добавляет преобразование предварительной выборки в конец предоставленного пользователем экземпляраtf.data.Dataset . Аргумент преобразования предварительной выборки, который равен buffer_size , равен количеству синхронизируемых реплик.

tf.distribute.Strategy.distribute_datasets_from_function

Применение

Этот API принимает входную функцию и возвращает экземпляр tf.distribute.DistributedDataset . Функция ввода, которую передают пользователи, имеет аргумент tf.distribute.InputContext и должна возвращать экземплярtf.data.Dataset . С помощью этого API tf.distribute не вносит никаких дальнейших изменений в пользовательский экземплярtf.data.Dataset возвращаемый функцией ввода. Пользователь несет ответственность за пакетирование и сегментирование набора данных. tf.distribute вызывает функцию ввода на устройстве ЦП каждого из рабочих. Помимо того, что пользователи могут указывать собственную логику пакетирования и сегментирования, этот API также демонстрирует лучшую масштабируемость и производительность по сравнению с tf.distribute.Strategy.experimental_distribute_dataset при использовании для обучения нескольких рабочих.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Характеристики

Дозирование

Экземплярtf.data.Dataset который является возвращаемым значением функции ввода, должен быть упакован с использованием размера пакета для каждой реплики. Размер пакета на реплику - это глобальный размер пакета, деленный на количество реплик, участвующих в обучении синхронизации. Это связано с тем, что tf.distribute вызывает функцию ввода на устройстве ЦП каждого из рабочих процессов. Набор данных, созданный для данного воркера, должен быть готов к использованию всеми репликами этого воркера.

Шардинг

Объект tf.distribute.InputContext который неявно передается в качестве аргумента функции ввода пользователя, создается tf.distribute под капотом. Он содержит информацию о количестве рабочих, текущем идентификаторе рабочего и т. Д. Эта функция ввода может обрабатывать сегментирование в соответствии с политиками, установленными пользователем, с использованием этих свойств, которые являются частью объекта tf.distribute.InputContext .

Предварительная загрузка

tf.distribute не добавляет преобразование предварительной выборки в конецtf.data.Dataset возвращаемого функцией ввода, предоставленной пользователем.

Распределенные итераторы

Подобноtf.data.Dataset экземплярамtf.data.Dataset , вам потребуется создать итератор tf.distribute.DistributedDataset экземпляров tf.distribute.DistributedDataset чтобы перебирать его и получать доступ к элементам в tf.distribute.DistributedDataset . Ниже приведены способы, которыми вы можете создать tf.distribute.DistributedIterator и использовать его для обучения вашей модели:

Использование

Используйте Pythonic для конструкции цикла

Вы можете использовать удобный цикл tf.distribute.DistributedDataset для перебора tf.distribute.DistributedDataset . Элементы, возвращаемые из tf.distribute.DistributedIterator могут быть одним tf.Tensor или tf.distribute.DistributedValues который содержит значение для каждой реплики. Размещение цикла внутри tf.function даст прирост производительности. Однако в настоящее время break и return не поддерживаются для цикла над tf.distribute.DistributedDataset который помещается внутри tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Используйте iter для создания явного итератора

Чтобы перебирать элементы в экземпляре tf.distribute.DistributedDataset , вы можете создать tf.distribute.DistributedIterator используя для него iter API. С помощью явного итератора вы можете выполнять итерацию для фиксированного количества шагов. Для того , чтобы получить следующий элемент из tf.distribute.DistributedIterator экземпляра dist_iterator , вы можете вызвать next(dist_iterator) , dist_iterator.get_next() или dist_iterator.get_next_as_optional() . Первые два по сути одинаковы:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

С next() или tf.distribute.DistributedIterator.get_next() , если tf.distribute.DistributedIterator достиг своего конца, будет выдана ошибка OutOfRange. Клиент может поймать ошибку на стороне Python и продолжить выполнение другой работы, такой как контрольные точки и оценка. Однако это не сработает, если вы используете цикл обучения хоста (т. tf.function несколько шагов для каждой tf.function ), который выглядит так:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn содержит несколько шагов, tf.range тело шага в tf.range . В этом случае разные итерации в цикле без зависимости могут запускаться параллельно, поэтому ошибка OutOfRange может быть вызвана в последующих итерациях до завершения вычисления предыдущих итераций. Как только возникает ошибка OutOfRange, все операции в функции сразу же завершаются. Если вы хотели бы избежать этого случая, альтернативой, которая не вызывает ошибку tf.distribute.DistributedIterator.get_next_as_optional() является tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional возвращает tf.experimental.Optional который содержит следующий элемент или не содержит значения, если tf.distribute.DistributedIterator достиг своего конца.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Использование свойства element_spec

Если вы передаете элементы распределенного набора данных в tf.function и хотите tf.TypeSpec гарантии, вы можете указать input_signature аргумент tf.function . Результатом распределенного набора данных является tf.distribute.DistributedValues который может представлять вход для одного или нескольких устройств. Чтобы получить tf.TypeSpec соответствующий этому распределенному значению, вы можете использовать свойство element_spec распределенного набора данных или объекта распределенного итератора.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Частичные партии

Частичные пакеты встречаются, когда экземплярыtf.data.Dataset пользователями, могут содержать размеры пакетов, которые не делятся равномерно на количество реплик, или когда количество элементов набора данных не делится на размер пакета. Это означает, что когда набор данных распределяется по нескольким репликам, next вызов некоторых итераторов приведет к ошибке OutOfRangeError. Чтобы справиться с этим вариантом использования, tf.distribute возвращает фиктивные пакеты размером 0 на репликах, у которых больше нет данных для обработки.

Для случая одного рабочего, если данные не возвращаются при next вызове итератора, создаются фиктивные пакеты с размером пакета 0, которые используются вместе с реальными данными в наборе данных. В случае частичных пакетов последний глобальный пакет данных будет содержать реальные данные наряду с фиктивными пакетами данных. Условие остановки обработки данных теперь проверяет, есть ли данные у какой-либо из реплик. Если нет данных ни на одной из реплик, выдается ошибка OutOfRange.

Для случая с несколькими рабочими логическое значение, представляющее наличие данных по каждому из рабочих, агрегируется с использованием перекрестной связи реплик, и это используется, чтобы определить, все ли рабочие завершили обработку распределенного набора данных. Поскольку это связано с взаимодействием между сотрудниками, возникает некоторое снижение производительности.

Предостережения

  • При использовании API tf.distribute.Strategy.experimental_distribute_dataset с настройкой нескольких рабочих, пользователи передаютtf.data.Dataset который читает из файлов. Если для tf.data.experimental.AutoShardPolicy задано значение AUTO или FILE , фактический размер пакета на шаг может быть меньше, чем глобальный размер пакета, определенный пользователем. Это может произойти, когда оставшиеся элементы в файле меньше глобального размера пакета. Пользователи могут исчерпать набор данных вне зависимости от количества выполняемых шагов или установить для tf.data.experimental.AutoShardPolicy значение DATA чтобы обойти это.

  • Преобразования набора данных с tf.distribute состояния в настоящее время не поддерживаются tf.distribute и любые tf.distribute с tf.distribute состояния, которые может иметь набор данных, в настоящее время игнорируются. Например, если в вашем наборе данных есть map_fn который использует tf.random.uniform для поворота изображения, тогда у вас есть граф набора данных, который зависит от состояния (т.е. случайного начального числа) на локальном компьютере, на котором выполняется процесс python.

  • Экспериментальные tf.data.experimental.OptimizationOptions , которые отключены по умолчанию, могут в определенных контекстах - например, при использовании вместе с tf.distribute - вызывать снижение производительности. Их следует включать только после того, как вы убедитесь, что они улучшают производительность вашей рабочей нагрузки в настройке распространения.

  • Пожалуйста, обратитесь к этому руководству, чтобы узнать, как в целом оптимизировать конвейер ввода с помощью tf.data . Несколько дополнительных советов:

    • Если у вас несколько рабочих tf.data.Dataset.list_files и вы используете tf.data.Dataset.list_files для создания набора данных из всех файлов, соответствующих одному или нескольким шаблонам глобусов, не забудьте установить seed аргумент или установить shuffle=False чтобы каждый рабочий сегментировал файл последовательно.

    • Если ваш входной конвейер включает в себя как перетасовку данных на уровне записи, так и анализ данных, если неанализируемые данные значительно больше, чем проанализированные данные (что обычно не так), сначала перемешайте, а затем проанализируйте, как показано в следующем примере. Это может улучшить использование памяти и производительность.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) поддерживает внутренний буфер элементов buffer_size , и, таким образом, уменьшение buffer_size может облегчить проблему OOM.

  • Порядок, в котором данные обрабатываются рабочими при использовании tf.distribute.experimental_distribute_dataset или tf.distribute.distribute_datasets_from_function , не гарантируется. Обычно это требуется, если вы используете tf.distribute для прогнозирования масштаба. Однако вы можете вставить индекс для каждого элемента в пакете и соответственно упорядочить выходные данные. Следующий фрагмент является примером того, как заказывать выходные данные.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Как мне распространять свои данные, если я не использую канонический экземпляр tf.data.Dataset?

Иногда пользователи не могут использоватьtf.data.Dataset для представления своих входных данных, а затем упомянутые выше API-интерфейсы для распространения набора данных на несколько устройств. В таких случаях вы можете использовать необработанные тензоры или входные данные от генератора.

Используйте экспериментальную_distribute_values_from_function для произвольных входных значений тензора

strategy.run принимает tf.distribute.DistributedValues который является выходом next(iterator) . Чтобы передать значения тензора, используйте experimental_distribute_values_from_function для построения tf.distribute.DistributedValues из необработанных тензоров.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Используйте tf.data.Dataset.from_generator, если ваш ввод от генератора

Если у вас есть функция генератора, которую вы хотите использовать, вы можете создать экземплярtf.data.Dataset с from_generator API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)