Odpowiedz już dziś na lokalne wydarzenie TensorFlow Everywhere!
Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Rozproszone dane wejściowe

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Interfejsy API tf.distribute zapewniają użytkownikom łatwy sposób skalowania szkolenia z jednej maszyny do wielu maszyn. Podczas skalowania modelu użytkownicy muszą również rozdzielać swoje dane wejściowe na wiele urządzeń. tf.distribute zapewnia interfejsy API, za pomocą których możesz automatycznie dystrybuować swoje dane wejściowe na urządzenia.

Ten przewodnik przedstawia różne sposoby tworzenia rozproszonych tf.distribute danych i iteratorów przy użyciu interfejsów API tf.distribute . Dodatkowo omówione zostaną następujące tematy:

Ten przewodnik nie obejmuje wykorzystania rozproszonych danych wejściowych za pomocą interfejsów API Keras.

Rozproszone zbiory danych

Aby używać tf.distribute API tf.distribute do skalowania, zaleca się, aby użytkownicy używalitf.data.Dataset do reprezentowania swoich danych wejściowych. tf.distribute została stworzona do wydajnej pracy ztf.data.Dataset (na przykład automatyczne pobieranie danych na każdym urządzeniu akceleratora), a optymalizacje wydajności są regularnie włączane do implementacji. Jeśli masz przypadek użycia czegoś innego niżtf.data.Dataset , zapoznaj się ztf.data.Dataset sekcją tego przewodnika. W nierozproszonej pętli szkoleniowej użytkownicy najpierw tworzą instancjętf.data.Dataset a następnietf.data.Dataset po elementach. Na przykład:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Aby umożliwić użytkownikom korzystanie tf.distribute strategii tf.distribute przy minimalnych zmianach w istniejącym kodzie użytkownika, wprowadzono dwa interfejsy API, które dystrybuują instancjętf.data.Dataset i zwracają obiekt rozproszonego zestawu danych. Użytkownik może następnie wykonać iterację po tej instancji rozproszonego zestawu danych i wytrenować swój model tak jak poprzednio. Przyjrzyjmy się teraz tf.distribute.Strategy.experimental_distribute_dataset dwóm interfejsom API - tf.distribute.Strategy.experimental_distribute_dataset i tf.distribute.Strategy.distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

Stosowanie

Ten interfejs API przyjmuje instancjętf.data.Dataset jako dane wejściowe i zwraca instancję tf.distribute.DistributedDataset . Wejściowy zestaw danych należy podzielić na partię z wartością równą globalnej wielkości partii. Ten globalny rozmiar partii to liczba próbek, które chcesz przetworzyć na wszystkich urządzeniach w jednym kroku. Możesz iterować ten rozproszony zestaw danych w sposób Pythonic lub utworzyć iterator za pomocą iter . Zwrócony obiekt nie jest instancjątf.data.Dataset i nie obsługuje żadnych innych interfejsów API, które w jakikolwiek sposób przekształcają lub sprawdzają zbiór danych. Jest to zalecany interfejs API, jeśli nie masz określonych sposobów, w których chcesz podzielić dane wejściowe na różne repliki.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Nieruchomości

Dozowanie

tf.distribute łączy wejściową instancjętf.data.Dataset z nowym rozmiarem partii, który jest równy globalnemu rozmiarowi partii podzielonemu przez liczbę synchronizowanych replik. Liczba zsynchronizowanych replik jest równa liczbie urządzeń, które biorą udział w redukcji gradientu podczas treningu. Gdy użytkownik wywołuje next w iteratorze rozproszonym, dla każdej repliki zwracany jest rozmiar partii danych na replikę. Kardynalność ponownie zestawionego zbioru danych będzie zawsze wielokrotnością liczby replik. Oto kilka przykładów:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Bez dystrybucji:
    • Partia 1: [0, 1, 2, 3]
    • Partia 2: [4, 5]
    • Przy dystrybucji na 2 repliki. Ostatnia partia ([4, 5]) jest rozdzielana między 2 repliki.

    • Partia 1:

      • Replika 1: [0, 1]
      • Replika 2: [2, 3]
    • Partia 2:

      • Replika 2: [4]
      • Replika 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Bez dystrybucji:
    • Partia 1: [[0], [1], [2], [3]]
    • Przy dystrybucji na 5 replik:
    • Partia 1:
      • Replika 1: [0]
      • Replika 2: [1]
      • Replika 3: [2]
      • Replika 4: [3]
      • Replika 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Bez dystrybucji:
    • Partia 1: [0, 1, 2, 3]
    • Partia 2: [4, 5, 6, 7]
    • Przy dystrybucji na 3 repliki:
    • Partia 1:
      • Replika 1: [0, 1]
      • Replika 2: [2, 3]
      • Replika 3: []
    • Partia 2:
      • Replika 1: [4, 5]
      • Replika 2: [6, 7]
      • Replika 3: []

Ponowne zestawianie zbioru danych ma złożoność przestrzeni, która rośnie liniowo wraz z liczbą replik. Oznacza to, że w przypadku szkolenia wielu pracowników potok wejściowy może napotkać błędy OOM.

Sharding

tf.distribute również automatycznie wprowadza zbiór danych wejściowych podczas szkolenia z MultiWorkerMirroredStrategy wielu pracowników za pomocą MultiWorkerMirroredStrategy i TPUStrategy . Każdy zestaw danych jest tworzony na urządzeniu CPU pracownika. Automatyczne udostępnianie zestawu danych dla zestawu procesów roboczych oznacza, że ​​każdemu pracownikowi jest przypisany podzbiór całego zestawu danych (jeśli ustawiono właściwy tf.data.experimental.AutoShardPolicy ). Ma to zapewnić, że na każdym etapie każdy pracownik będzie przetwarzał globalną partię nienakładających się elementów zestawu danych. Autosharding ma kilka różnych opcji, które można określić za pomocą tf.data.experimental.DistributeOptions . Zauważ, że nie ma automatycznego udostępniania w szkoleniu z wieloma pracownikami z ParameterServerStrategy , a więcej informacji na temat tworzenia zestawu danych za pomocą tej strategii można znaleźć w samouczku Strategia serwera parametrów .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

Istnieją trzy różne opcje, które można ustawić dla tf.data.experimental.AutoShardPolicy :

  • AUTO: Jest to opcja domyślna, co oznacza, że ​​zostanie podjęta próba fragmentacji przez FILE. Próba podzielenia na fragmenty przez FILE nie powiedzie się, jeśli nie zostanie wykryty zestaw danych oparty na pliku. tf.distribute powróci do shardingu przez DATA. Należy zauważyć, że jeśli wejściowy zestaw danych jest oparty na plikach, ale liczba plików jest mniejsza niż liczba procesów InvalidArgumentError zostanie zgłoszony InvalidArgumentError . Jeśli tak się stanie, jawnie ustaw zasady na AutoShardPolicy.DATA lub podziel źródło wejściowe na mniejsze pliki, tak aby liczba plików była większa niż liczba procesów roboczych.
  • PLIK: Jest to opcja, jeśli chcesz podzielić pliki wejściowe na wszystkich pracowników. Należy użyć tej opcji, jeśli liczba plików wejściowych jest znacznie większa niż liczba pracowników, a dane w plikach są równomiernie rozłożone. Wadą tej opcji jest bezczynność pracowników, jeśli dane w plikach nie są równomiernie rozłożone. Jeśli liczba plików jest mniejsza niż liczba pracowników, zostanie zgłoszony InvalidArgumentError . W takim przypadku jawnie ustaw zasady na AutoShardPolicy.DATA . Na przykład, roześlijmy 2 pliki na 2 pracowników z 1 repliką każdy. Plik 1 zawiera [0, 1, 2, 3, 4, 5], a plik 2 zawiera [6, 7, 8, 9, 10, 11]. Niech całkowita liczba synchronizowanych replik będzie wynosić 2, a globalny rozmiar partii - 4.

    • Pracownik 0:
    • Partia 1 = Replika 1: [0, 1]
    • Partia 2 = replika 1: [2, 3]
    • Partia 3 = replika 1: [4]
    • Partia 4 = replika 1: [5]
    • Pracownik 1:
    • Partia 1 = replika 2: [6, 7]
    • Partia 2 = replika 2: [8, 9]
    • Partia 3 = replika 2: [10]
    • Partia 4 = replika 2: [11]
  • DANE: Spowoduje to automatyczne udostępnienie elementów wszystkim pracownikom. Każdy z pracowników odczyta cały zestaw danych i przetworzy tylko przypisany do niego fragment. Wszystkie inne odłamki zostaną odrzucone. Jest to zwykle używane, jeśli liczba plików wejściowych jest mniejsza niż liczba procesów roboczych i chcesz uzyskać lepsze fragmentowanie danych na wszystkich pracowników. Wadą jest to, że cały zbiór danych zostanie odczytany dla każdego pracownika. Na przykład, roześlijmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech całkowita liczba synchronizowanych replik wyniesie 2.

    • Pracownik 0:
    • Partia 1 = Replika 1: [0, 1]
    • Partia 2 = replika 1: [4, 5]
    • Partia 3 = replika 1: [8, 9]
    • Pracownik 1:
    • Partia 1 = replika 2: [2, 3]
    • Partia 2 = replika 2: [6, 7]
    • Partia 3 = Replika 2: [10, 11]
  • WYŁĄCZONE: Jeśli wyłączysz automatyczne udostępnianie, każdy pracownik będzie przetwarzał wszystkie dane. Na przykład, roześlijmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech całkowita liczba synchronizowanych replik wyniesie 2. Następnie każdy pracownik zobaczy następujący rozkład:

    • Pracownik 0:
    • Partia 1 = Replika 1: [0, 1]
    • Partia 2 = replika 1: [2, 3]
    • Partia 3 = replika 1: [4, 5]
    • Partia 4 = replika 1: [6, 7]
    • Partia 5 = replika 1: [8, 9]
    • Partia 6 = replika 1: [10, 11]

    • Pracownik 1:

    • Partia 1 = Replika 2: [0, 1]

    • Partia 2 = replika 2: [2, 3]

    • Partia 3 = replika 2: [4, 5]

    • Partia 4 = replika 2: [6, 7]

    • Partia 5 = replika 2: [8, 9]

    • Partia 6 = replika 2: [10, 11]

Pobieranie wstępne

Domyślnie tf.distribute dodaje transformację pobierania wstępnego na końcu instancjitf.data.Dataset przez użytkownika. Argument transformacji pobierania wstępnego, którym jest buffer_size jest równy liczbie synchronizowanych replik.

tf.distribute.Strategy.distribute_datasets_from_function

Stosowanie

Ten interfejs API przyjmuje funkcję wejściową i zwraca instancję tf.distribute.DistributedDataset . Funkcja wejściowa, którą przekazują użytkownicy, ma argument tf.distribute.InputContext i powinna zwracać instancjętf.data.Dataset . Dzięki temu interfejsowi API tf.distribute nie wprowadza żadnych dalszych zmian w instancjitf.data.Dataset użytkownika zwróconej przez funkcję wejściową. Za partię i fragmentację zestawu danych odpowiada użytkownik. tf.distribute wywołuje funkcję wejściową na urządzeniu CPU każdego z pracowników. Oprócz umożliwienia użytkownikom określenia własnej logiki przetwarzania wsadowego i fragmentacji, ten interfejs API wykazuje również lepszą skalowalność i wydajność w porównaniu z tf.distribute.Strategy.experimental_distribute_dataset gdy jest używany do szkolenia wielu pracowników.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Nieruchomości

Dozowanie

Instancjatf.data.Dataset która jest wartością zwracaną funkcji wejściowej, powinna być grupowana przy użyciu rozmiaru partii na replikę. Rozmiar partii na replikę to globalny rozmiar partii podzielony przez liczbę replik biorących udział w szkoleniu synchronizacji. Dzieje się tak, ponieważ tf.distribute wywołuje funkcję wejściową na urządzeniu CPU każdego z pracowników. Zestaw danych utworzony dla danego pracownika powinien być gotowy do użycia przez wszystkie repliki tego pracownika.

Sharding

Obiekt tf.distribute.InputContext który jest niejawnie przekazywany jako argument do funkcji wejściowej użytkownika, jest tworzony przez tf.distribute pod maską. Zawiera informacje o liczbie pracowników, bieżącym identyfikatorze pracownika itp. Ta funkcja wejściowa może obsługiwać fragmentowanie zgodnie z zasadami ustawionymi przez użytkownika przy użyciu tych właściwości, które są częścią obiektu tf.distribute.InputContext .

Pobieranie wstępne

tf.distribute nie dodaje transformacji pobierania wstępnego na końcutf.data.Dataset zwróconego przez funkcję wejściową dostarczoną przez użytkownika.

Rozproszone Iteratory

Podobnie jak w przypadkutf.data.Dataset instancjitf.data.Dataset , konieczne będzie utworzenie iteratora w instancjach tf.distribute.DistributedDataset celu ich iteracji i uzyskania dostępu do elementów w tf.distribute.DistributedDataset . Poniżej przedstawiono sposoby tworzenia tf.distribute.DistributedIterator i używania go do trenowania modelu:

Zastosowania

Użyj konstrukcji pętli for w języku Pythonic

Możesz użyć przyjaznej dla użytkownika pętli Pythonic, aby iterować po tf.distribute.DistributedDataset . Elementy zwracane z tf.distribute.DistributedIterator mogą być pojedynczym tf.Tensor lub tf.distribute.DistributedValues które zawierają wartość na replikę. Umieszczenie pętli wewnątrz funkcji tf.function wydajność. Jednak break i return są obecnie obsługiwane w przypadku pętli w tf.distribute.DistributedDataset umieszczonym wewnątrz funkcji tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Użyj iter aby utworzyć jawny iterator

Iteracyjne nad elementami w tf.distribute.DistributedDataset przykład, można utworzyć tf.distribute.DistributedIterator używając iter API na nim. Dzięki jawnemu iteratorowi możesz iterować dla ustalonej liczby kroków. W celu uzyskania następnego elementu z tf.distribute.DistributedIterator instancji dist_iterator można wywołać next(dist_iterator) , dist_iterator.get_next() lub dist_iterator.get_next_as_optional() . Pierwsze dwa są zasadniczo takie same:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

Za pomocą next() lub tf.distribute.DistributedIterator.get_next() , jeśli tf.distribute.DistributedIterator osiągnął koniec, zostanie zgłoszony błąd OutOfRange. Klient może wychwycić błąd po stronie Pythona i kontynuować inne prace, takie jak sprawdzanie punktów kontrolnych i ocena. Jednak to nie zadziała, jeśli używasz pętli treningowej hosta (tj. tf.function wiele kroków na tf.function ), Która wygląda następująco:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn zawiera wiele kroków, zawijając tf.range kroku wewnątrz tf.range . W takim przypadku różne iteracje w pętli bez zależności mogą rozpocząć się równolegle, więc błąd OutOfRange może zostać wyzwolony w późniejszych iteracjach przed zakończeniem obliczeń poprzednich iteracji. Gdy zostanie zgłoszony błąd OutOfRange, wszystkie operacje w funkcji zostaną natychmiast zakończone. Jeśli jest to przypadek, którego chciałbyś uniknąć, alternatywą, która nie tf.distribute.DistributedIterator.get_next_as_optional() błędu OutOfRange, jest tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional zwraca tf.experimental.Optional który zawiera następny element lub brak wartości, jeśli tf.distribute.DistributedIterator dobiegł końca.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Korzystanie z właściwości element_spec

Jeśli przechodzą elementy rozproszonego zbioru danych do tf.function i chcesz tf.TypeSpec gwarancji, można określić input_signature argument tf.function . Dane wyjściowe rozproszonego zestawu danych to tf.distribute.DistributedValues które mogą reprezentować dane wejściowe dla pojedynczego urządzenia lub wielu urządzeń. Aby uzyskać wartość tf.TypeSpec odpowiadającą tej rozproszonej wartości, można użyć właściwości element_spec rozproszonego zestawu danych lub rozproszonego obiektu iteratora.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Częściowe partie

Częściowe partie są napotykane, gdy instancjetf.data.Dataset przez użytkowników mogą zawierać rozmiary partii, których nie można równo podzielić przez liczbę replik lub gdy liczność instancji zestawu danych nie jest podzielna przez rozmiar partii. Oznacza to, że gdy zestaw danych jest dystrybuowany w wielu replikach, next wywołanie niektórych iteratorów spowoduje wystąpienie OutOfRangeError. Aby obsłużyć ten przypadek użycia, tf.distribute zwraca fikcyjne partie partii o rozmiarze 0 dla replik, które nie mają więcej danych do przetworzenia.

W przypadku pojedynczego procesu roboczego, jeśli dane nie zostaną zwrócone przez next wywołanie iteratora, tworzone są fikcyjne partie o rozmiarze partii 0 i używane wraz z rzeczywistymi danymi w zestawie danych. W przypadku partii częściowych ostatnia globalna partia danych będzie zawierała dane rzeczywiste wraz z fikcyjnymi partiami danych. Warunek zatrzymania przetwarzania danych sprawdza teraz, czy którakolwiek z replik zawiera dane. Jeśli w żadnej z replik nie ma danych, zostanie zgłoszony błąd OutOfRange.

W przypadku wielu pracowników wartość logiczna reprezentująca obecność danych o każdym z pracowników jest agregowana za pomocą komunikacji między replikami i służy do określenia, czy wszyscy pracownicy zakończyli przetwarzanie rozproszonego zestawu danych. Ponieważ wymaga to komunikacji między pracownikami, wiąże się to z pewnym spadkiem wydajności.

Ostrzeżenia

  • Podczas korzystania z interfejsów API tf.distribute.Strategy.experimental_distribute_dataset z konfiguracją dla wielu pracowników, użytkownicy przekazujątf.data.Dataset który czyta z plików. Jeśli tf.data.experimental.AutoShardPolicy jest ustawiona na AUTO lub FILE , rzeczywisty rozmiar partii na krok może być mniejszy niż globalny rozmiar partii zdefiniowany przez użytkownika. Może się tak zdarzyć, gdy pozostałe elementy w pliku są mniejsze niż globalny rozmiar wsadu. Użytkownicy mogą albo wyczerpać zbiór danych, nie zależnie od liczby kroków do uruchomienia, albo ustawić tf.data.experimental.AutoShardPolicy na DATA aby obejść ten problem.

  • Transformacje stanowych tf.distribute danych nie są obecnie obsługiwane przez tf.distribute a wszelkie stanowe tf.distribute które może mieć ten zestaw danych, są obecnie ignorowane. Na przykład, jeśli twój zestaw danych ma map_fn który używa tf.random.uniform do obracania obrazu, masz wykres zestawu danych, który zależy od stanu (tj. Losowego ziarna) na lokalnej maszynie, na której jest wykonywany proces Pythona.

  • Eksperymentalny tf.data.experimental.OptimizationOptions które są domyślnie wyłączone, mogą w niektórych kontekstach - na przykład używane razem z tf.distribute - powodować pogorszenie wydajności. Należy je włączać dopiero po sprawdzeniu, czy poprawiają wydajność obciążenia w ustawieniu dystrybucji.

  • Zapoznaj się z tym przewodnikiem, aby dowiedzieć się, jak ogólnie zoptymalizować potok wejściowy za pomocą tf.data . Kilka dodatkowych wskazówek:

    • Jeśli masz wielu robotników i korzystania tf.data.Dataset.list_files aby utworzyć zestaw danych z wszystkich plików pasujących jeden lub więcej wzorców glob, należy pamiętać, aby ustawić seed argumentu lub zestaw shuffle=False , tak aby każdy pracownik Shard plik konsekwentnie.

    • Jeśli potok wejściowy obejmuje zarówno tasowanie danych na poziomie rekordu, jak i analizowanie danych, chyba że nieprzetworzone dane są znacznie większe niż przeanalizowane dane (co zwykle nie ma miejsca), najpierw przetasuj, a następnie przeanalizuj, jak pokazano w poniższym przykładzie. Może to wpłynąć korzystnie na zużycie pamięci i wydajność.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) utrzymuje wewnętrzny bufor elementów buffer_size , a tym samym zmniejszenie buffer_size może złagodzić problem z OOM.

  • Kolejność, w jakiej dane są przetwarzane przez pracowników podczas korzystania z tf.distribute.experimental_distribute_dataset lub tf.distribute.distribute_datasets_from_function nie jest gwarantowana. Jest to zwykle wymagane, jeśli używasz tf.distribute do prognozowania skali. Możesz jednak wstawić indeks dla każdego elementu w partii i odpowiednio zamówić wyjścia. Poniższy fragment kodu jest przykładem sposobu zamawiania wyników.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Jak dystrybuować moje dane, jeśli nie używam kanonicznej instancji tf.data.Dataset?

Czasami użytkownicy nie mogą używaćtf.data.Dataset do reprezentowania swoich danych wejściowych, a następnie wymienionych powyżej interfejsów API do dystrybucji zestawu danych do wielu urządzeń. W takich przypadkach można użyć surowych tensorów lub wejść z generatora.

Użyj Experimental_distribute_values_from_function dla dowolnych danych wejściowych tensora

strategy.run przyjmuje tf.distribute.DistributedValues który jest wyjście next(iterator) . Aby przekazać wartości tensorów, użyj experimental_distribute_values_from_function do skonstruowania tf.distribute.DistributedValues z surowych tensorów.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Użyj tf.data.Dataset.from_generator, jeśli dane wejściowe pochodzą z generatora

Jeśli masz funkcję generatora, której chcesz użyć, możesz utworzyć instancjętf.data.Dataset za pomocą interfejsu API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)