Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Rozproszone dane wejściowe

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Interfejsy API tf.distribute zapewniają użytkownikom łatwy sposób skalowania szkolenia z jednej maszyny do wielu maszyn. Podczas skalowania modelu użytkownicy muszą również rozdzielać swoje dane wejściowe na wiele urządzeń. tf.distribute zapewnia interfejsy API, za pomocą których można automatycznie dystrybuować dane wejściowe na urządzenia.

Ten przewodnik przedstawia różne sposoby tworzenia rozproszonych tf.distribute danych i iteratorów przy użyciu interfejsów API tf.distribute . Dodatkowo omówione zostaną następujące tematy:

Ten przewodnik nie obejmuje wykorzystania rozproszonych danych wejściowych za pomocą interfejsów API Keras.

Rozproszone zbiory danych

Aby korzystać z interfejsów API tf.distribute do skalowania, zaleca się, aby użytkownicy używali tf.data.Dataset do reprezentowania swoich danych wejściowych. tf.distribute została stworzona, aby efektywnie współpracować z tf.data.Dataset (na przykład automatyczne pobieranie danych na każdym urządzeniu akceleratora), a optymalizacje wydajności są regularnie włączane do implementacji. Jeśli masz przypadek użycia czegoś innego niż tf.data.Dataset , zapoznaj się z tf.data.Dataset sekcją tego przewodnika. W nierozproszonej pętli szkoleniowej użytkownicy najpierw tworzą instancję tf.data.Dataset a następnie tf.data.Dataset po elementach. Na przykład:

 import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.3.0

 global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

 
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Aby umożliwić użytkownikom korzystanie tf.distribute strategii tf.distribute przy minimalnych zmianach w istniejącym kodzie użytkownika, wprowadzono dwa interfejsy API, które dystrybuowałyby instancję tf.data.Dataset i tf.data.Dataset rozproszony obiekt zestawu danych. Użytkownik może następnie wykonać iterację po tej instancji rozproszonego zestawu danych i wytrenować swój model tak, jak poprzednio. Przyjrzyjmy się teraz tf.distribute.Strategy.experimental_distribute_dataset dwóm interfejsom API - tf.distribute.Strategy.experimental_distribute_dataset i tf.distribute.Strategy.experimental_distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

Stosowanie

Ten interfejs API przyjmuje instancję tf.data.Dataset jako dane wejściowe i zwraca instancję tf.distribute.DistributedDataset . Wejściowy zestaw danych należy podzielić na partię z wartością równą globalnemu rozmiarowi partii. Ten globalny rozmiar partii to liczba próbek, które chcesz przetworzyć na wszystkich urządzeniach w jednym kroku. Możesz iterować po tym rozproszonym zestawie danych w stylu Pythona lub utworzyć iterator za pomocą iter . Zwrócony obiekt nie jest instancją tf.data.Dataset i nie obsługuje żadnych innych interfejsów API, które w jakikolwiek sposób przekształcają lub sprawdzają zbiór danych. Jest to zalecany interfejs API, jeśli nie masz określonych sposobów, w których chcesz podzielić dane wejściowe na różne repliki.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Nieruchomości

Dozowanie

tf.distribute dopasowuje wejściową instancję tf.data.Dataset do nowego rozmiaru partii, który jest równy globalnemu rozmiarowi partii podzielonemu przez liczbę synchronizowanych replik. Liczba zsynchronizowanych replik jest równa liczbie urządzeń, które biorą udział w redukcji gradientu podczas treningu. Gdy użytkownik wywołuje next w iteratorze rozproszonym, dla każdej repliki zwracany jest rozmiar partii danych na replikę. Kardynalność ponownie zestawionego zbioru danych będzie zawsze wielokrotnością liczby replik. Oto kilka przykładów:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    Bez dystrybucji:

    Partia 1: [0, 1, 2, 3]

    Partia 2: [4, 5]

    Przy dystrybucji na 2 repliki:

    Partia 1: Replika 1: [0, 1] Replika 2: [2, 3]

    Partia 2: Replika 2: [4] Replika 2: [5]

    Ostatnia partia ([4, 5]) jest rozdzielana między 2 repliki.

  • tf.data.Dataset.range(4).batch(4)

    Bez dystrybucji:

    Partia 1: [[0], [1], [2], [3]]

    Przy dystrybucji na 5 replik:

    Partia 1: Replika 1: [0] Replika 2: [1] Replika 3: [2] Replika 4: [3] Replika 5: []

  • tf.data.Dataset.range(8).batch(4)

    Bez dystrybucji:

    Partia 1: [0, 1, 2, 3]

    Partia 2: [4, 5, 6, 7]

    Przy dystrybucji na 3 repliki:

    Partia 1: Replika 1: [0, 1] Replika 2: [2, 3] Replika 3: []

    Partia 2: Replika 1: [4, 5] Replika 2: [6, 7] Replika 3: []

Ponowne zestawianie zbioru danych ma złożoność przestrzeni, która rośnie liniowo wraz z liczbą replik. Oznacza to, że w przypadku zastosowania szkolenia z wieloma pracownikami potok wejściowy może napotkać błędy OOM.

Sharding

tf.distribute również automatycznie wprowadza zbiór danych wejściowych podczas szkolenia dla wielu pracowników. Każdy zestaw danych jest tworzony na urządzeniu CPU pracownika. Automatyczne udostępnianie zestawu danych dla zestawu procesów roboczych oznacza, że ​​każdemu pracownikowi jest przypisywany podzbiór całego zestawu danych (jeśli ustawiono właściwy tf.data.experimental.AutoShardPolicy ). Ma to na celu zapewnienie, że na każdym etapie każdy pracownik będzie przetwarzał globalną wielkość partii nienakładających się elementów zestawu danych. Autosharding ma kilka różnych opcji, które można określić za pomocą tf.data.experimental.DistributeOptions .

 dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
 

Istnieją trzy różne opcje, które można ustawić dla tf.data.experimental.AutoShardPolicy :

  • AUTO: Jest to opcja domyślna, co oznacza, że ​​zostanie podjęta próba fragmentacji przez FILE. Próba podzielenia na fragmenty przez FILE nie powiedzie się, jeśli nie zostanie wykryty zestaw danych oparty na pliku. tf.distribute powróci do shardingu przez DATA. Zwróć uwagę, że jeśli wejściowy zestaw danych jest oparty na plikach, ale liczba plików jest mniejsza niż liczba pracowników, zostanie zgłoszony błąd.
  • PLIK: Jest to opcja, jeśli chcesz podzielić pliki wejściowe na wszystkich pracowników. Jeśli liczba plików jest mniejsza niż liczba pracowników, wystąpi błąd. Należy użyć tej opcji, jeśli liczba plików wejściowych jest znacznie większa niż liczba pracowników, a dane w plikach są równomiernie rozłożone. Wadą tej opcji jest bezczynność pracowników, jeśli dane w plikach nie są równomiernie rozłożone. Na przykład, roześlijmy 2 pliki na 2 pracowników z 1 repliką każdy. Plik 1 zawiera [0, 1, 2, 3, 4, 5], a plik 2 zawiera [6, 7, 8, 9, 10, 11]. Niech całkowita liczba synchronizowanych replik będzie wynosić 2, a globalny rozmiar partii - 4.

    • Pracownik 0:

    Partia 1 = Replika 1: [0, 1]

    Partia 2 = replika 1: [2, 3]

    Partia 3 = replika 1: [4]

    Partia 4 = replika 1: [5]

    • Pracownik 1:

    Partia 1 = replika 2: [6, 7]

    Partia 2 = replika 2: [8, 9]

    Partia 3 = replika 2: [10]

    Partia 4 = replika 2: [11]

  • DANE: Spowoduje to automatyczne udostępnienie elementów wszystkim pracownikom. Każdy z pracowników odczyta cały zestaw danych i przetworzy tylko przypisany do niego fragment. Wszystkie inne odłamki zostaną odrzucone. Jest to zwykle używane, jeśli liczba plików wejściowych jest mniejsza niż liczba procesów roboczych i chcesz uzyskać lepsze fragmentowanie danych na wszystkich pracowników. Wadą jest to, że cały zestaw danych zostanie odczytany dla każdego pracownika. Na przykład, roześlijmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech całkowita liczba synchronizowanych replik wyniesie 2.

    • Pracownik 0:

    Partia 1 = Replika 1: [0, 1]

    Partia 2 = replika 1: [4, 5]

    Partia 3 = replika 1: [8, 9]

    • Pracownik 1:

    Partia 1 = replika 2: [2, 3]

    Partia 2 = replika 2: [6, 7]

    Partia 3 = Replika 2: [10, 11]

  • WYŁĄCZONE: Jeśli wyłączysz automatyczne udostępnianie, każdy pracownik będzie przetwarzał wszystkie dane. Na przykład, roześlijmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech całkowita liczba synchronizowanych replik wyniesie 2. Następnie każdy pracownik zobaczy następujący rozkład:

    • Pracownik 0:

    Partia 1 = Replika 1: [0, 1]

    Partia 2 = replika 1: [2, 3]

    Partia 3 = replika 1: [4, 5]

    Partia 4 = replika 1: [6, 7]

    Partia 5 = replika 1: [8, 9]

    Partia 6 = replika 1: [10, 11]

    • Pracownik 1:

    Partia 1 = replika 2: [0, 1]

    Partia 2 = replika 2: [2, 3]

    Partia 3 = replika 2: [4, 5]

    Partia 4 = replika 2: [6, 7]

    Partia 5 = replika 2: [8, 9]

    Partia 6 = replika 2: [10, 11]

Pobieranie wstępne

Domyślnie tf.distribute dodaje transformację pobierania wstępnego na końcu instancji tf.data.Dataset przez użytkownika. Argument transformacji pobierania wstępnego, którym jest buffer_size jest równy liczbie synchronizowanych replik.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

Stosowanie

Ten interfejs API przyjmuje funkcję wejściową i zwraca instancję tf.distribute.DistributedDataset . Funkcja wejściowa, którą przekazują użytkownicy, ma argument tf.distribute.InputContext i powinna zwracać instancję tf.data.Dataset . Dzięki temu API tf.distribute nie wprowadza żadnych dalszych zmian w instancji tf.data.Dataset użytkownika zwróconej przez funkcję wejściową. Za partię i fragmentację zbioru danych odpowiada użytkownik. tf.distribute wywołuje funkcję wejściową na urządzeniu CPU każdego z pracowników. Oprócz umożliwienia użytkownikom określenia własnej logiki przetwarzania wsadowego i fragmentacji, ten interfejs API wykazuje również lepszą skalowalność i wydajność w porównaniu z tf.distribute.Strategy.experimental_distribute_dataset gdy jest używany do szkolenia wielu pracowników.

 mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Nieruchomości

Dozowanie

Instancja tf.data.Dataset która jest wartością zwracaną funkcji wejściowej, powinna być grupowana przy użyciu rozmiaru partii na replikę. Rozmiar partii na replikę to globalny rozmiar partii podzielony przez liczbę replik biorących udział w szkoleniu synchronizacji. Dzieje się tak, ponieważ tf.distribute wywołuje funkcję wejściową na urządzeniu CPU każdego z pracowników. Zbiór danych utworzony dla danego pracownika powinien być gotowy do użycia przez wszystkie repliki tego pracownika.

Sharding

Obiekt tf.distribute.InputContext który jest niejawnie przekazywany jako argument do funkcji wejściowej użytkownika, jest tworzony przez tf.distribute pod maską. Zawiera informacje o liczbie pracowników, bieżącym identyfikatorze pracownika itp. Ta funkcja wejściowa może obsługiwać fragmentowanie zgodnie z zasadami ustawionymi przez użytkownika przy użyciu tych właściwości, które są częścią obiektu tf.distribute.InputContext .

Pobieranie wstępne

tf.distribute nie dodaje transformacji pobierania wstępnego na końcu tf.data.Dataset zwróconego przez funkcję wejściową dostarczoną przez użytkownika.

Rozproszone Iteratory

Podobnie jak w przypadku tf.data.Dataset instancji tf.data.Dataset , konieczne będzie utworzenie iteratora w instancjach tf.distribute.DistributedDataset celu ich iteracji i uzyskania dostępu do elementów w tf.distribute.DistributedDataset . Poniżej przedstawiono sposoby tworzenia tf.distribute.DistributedIterator i używania go do trenowania modelu:

Zastosowania

Użyj konstrukcji pętli for w języku Pythonic

Możesz użyć przyjaznej dla użytkownika pętli Pythonic, aby iterować po tf.distribute.DistributedDataset . Elementy zwracane z tf.distribute.DistributedIterator mogą być pojedynczym tf.Tensor lub tf.distribute.DistributedValues które zawierają wartość na replikę. Umieszczenie pętli wewnątrz funkcji tf.function wydajność. Jednak break i return są obecnie obsługiwane, jeśli pętla jest umieszczona wewnątrz funkcji tf.function . Nie obsługujemy również umieszczania pętli wewnątrz funkcji tf.function przy korzystaniu ze strategii wielu pracowników, takich jak tf.distribute.experimental.MultiWorkerMirroredStrategy i tf.distribute.TPUStrategy . Umieszczenie pętli wewnątrz tf.function działa dla pojedynczego pracownika tf.distribute.TPUStrategy ale nie w przypadku korzystania z TPU strąków.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Użyj iter aby utworzyć jawny iterator

Iteracyjne nad elementami w tf.distribute.DistributedDataset przykład, można utworzyć tf.distribute.DistributedIterator używając iter API na nim. Za pomocą jawnego iteratora możesz iterować dla ustalonej liczby kroków. W celu uzyskania następnego elementu z tf.distribute.DistributedIterator instancji dist_iterator można wywołać next(dist_iterator) , dist_iterator.get_next() lub dist_iterator.get_next_as_optional() . Pierwsze dwa są zasadniczo takie same:

 num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
 
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

Za pomocą next() lub tf.distribute.DistributedIterator.get_next() , jeśli tf.distribute.DistributedIterator osiągnął koniec, zostanie zgłoszony błąd OutOfRange. Klient może złapać błąd po stronie Pythona i kontynuować inne prace, takie jak sprawdzanie punktów kontrolnych i ocena. Jednak to nie zadziała, jeśli używasz pętli treningowej hosta (tj. tf.function wiele kroków na tf.function ), Która wygląda następująco:

 @tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
 

train_fn zawiera wiele kroków, opakowując tf.range kroku wewnątrz tf.range . W takim przypadku różne iteracje w pętli bez zależności mogą rozpocząć się równolegle, więc błąd OutOfRange może zostać wyzwolony w późniejszych iteracjach przed zakończeniem obliczeń poprzednich iteracji. Po zgłoszeniu błędu OutOfRange wszystkie operacje w funkcji zostaną natychmiast zakończone. Jeśli jest to przypadek, którego chciałbyś uniknąć, alternatywą, która nie tf.distribute.DistributedIterator.get_next_as_optional() błędu OutOfRange, jest tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional zwraca tf.experimental.Optional który zawiera następny element lub nie zawiera żadnej wartości, jeśli tf.distribute.DistributedIterator dobiegła końca.

 # You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
 
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Korzystanie z właściwości element_spec

Jeśli przechodzą elementy rozproszonego zbioru danych do tf.function i chcesz tf.TypeSpec gwarancji, można określić input_signature argument tf.function . Dane wyjściowe rozproszonego zestawu danych to tf.distribute.DistributedValues które mogą reprezentować dane wejściowe dla pojedynczego urządzenia lub wielu urządzeń. Aby uzyskać tf.TypeSpec odpowiadającą tej rozproszonej wartości, można użyć właściwości element_spec rozproszonego zestawu danych lub rozproszonego obiektu iteratora.

 global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs
  
  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Częściowe partie

Częściowe partie są napotykane, gdy instancje tf.data.Dataset przez użytkowników mogą zawierać rozmiary partii, których nie można równo podzielić przez liczbę replik lub gdy liczność instancji zestawu danych nie jest podzielna przez rozmiar partii. Oznacza to, że gdy zestaw danych jest dystrybuowany w wielu replikach, next wywołanie niektórych iteratorów spowoduje wystąpienie OutOfRangeError. Aby obsłużyć ten przypadek użycia, tf.distribute zwraca fikcyjne partie partii o rozmiarze 0 dla replik, które nie mają więcej danych do przetworzenia.

W przypadku pojedynczego procesu roboczego, jeśli dane nie są zwracane przez next wywołanie iteratora, tworzone są fikcyjne partie o rozmiarze partii 0 i są one używane wraz z rzeczywistymi danymi w zestawie danych. W przypadku partii częściowych ostatnia globalna partia danych będzie zawierała dane rzeczywiste wraz z fikcyjnymi partiami danych. Warunek zatrzymania przetwarzania danych sprawdza teraz, czy którakolwiek z replik zawiera dane. Jeśli nie ma danych w żadnej z replik, zostanie zgłoszony błąd OutOfRange.

W przypadku wielu pracowników wartość logiczna reprezentująca obecność danych o każdym z pracowników jest agregowana za pomocą komunikacji między replikami i służy do określenia, czy wszyscy pracownicy zakończyli przetwarzanie rozproszonego zestawu danych. Ponieważ wymaga to komunikacji między pracownikami, wiąże się to z pewnym spadkiem wydajności.

Ostrzeżenia

  • Podczas korzystania z interfejsów API tf.distribute.Strategy.experimental_distribute_dataset z konfiguracją dla wielu pracowników, użytkownicy przekazują tf.data.Dataset który czyta z plików. Jeśli tf.data.experimental.AutoShardPolicy jest ustawiona na AUTO lub FILE , rzeczywisty rozmiar partii na krok może być mniejszy niż globalny rozmiar partii zdefiniowany przez użytkownika. Może się tak zdarzyć, gdy pozostałe elementy w pliku są mniejsze niż globalny rozmiar wsadu. Użytkownicy mogą albo wyczerpać zbiór danych, nie zależnie od liczby kroków do uruchomienia, albo ustawić tf.data.experimental.AutoShardPolicy na DATA aby obejść ten problem.

  • Transformacje stanowych tf.distribute danych nie są obecnie obsługiwane przez tf.distribute a wszelkie stanowe tf.distribute które może mieć ten zestaw danych, są obecnie ignorowane. Na przykład, jeśli twój zestaw danych ma map_fn który używa tf.random.uniform do obrócenia obrazu, to masz wykres zestawu danych, który zależy od stanu (tj. Losowego ziarna) na lokalnej maszynie, na której jest wykonywany proces Pythona.

  • Eksperymentalny tf.data.experimental.OptimizationOptions które są domyślnie wyłączone, mogą w niektórych kontekstach - na przykład używane razem z tf.distribute - powodować pogorszenie wydajności. Należy je włączać dopiero po sprawdzeniu, czy poprawiają one wydajność obciążenia w ustawieniu dystrybucji.

  • Kolejność, w jakiej dane są przetwarzane przez pracowników podczas używania tf.distribute.experimental_distribute_dataset lub tf.distribute.experimental_distribute_datasets_from_function nie jest gwarantowana. Jest to zwykle wymagane, jeśli używasz tf.distribute do prognozowania skali. Możesz jednak wstawić indeks dla każdego elementu w partii i odpowiednio zamówić wyjścia. Poniższy fragment kodu jest przykładem sposobu zamawiania wyników.

 mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Jak mogę rozpowszechniać moje dane, jeśli nie używam kanonicznej instancji tf.data.Dataset?

Czasami użytkownicy nie mogą używać tf.data.Dataset do reprezentowania swoich danych wejściowych, a następnie wymienionych powyżej interfejsów API do dystrybucji zestawu danych do wielu urządzeń. W takich przypadkach można użyć surowych tensorów lub danych wejściowych z generatora.

Użyj funkcji experimental_distribute_values_from_function dla dowolnych danych wejściowych tensora

strategy.run przyjmuje tf.distribute.DistributedValues który jest wyjście next(iterator) . Aby przekazać wartości tensorów, użyj experimental_distribute_values_from_function do skonstruowania tf.distribute.DistributedValues z surowych tensorów.

 mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Użyj tf.data.Dataset.from_generator, jeśli dane wejściowe pochodzą z generatora

Jeśli masz funkcję generatora, której chcesz użyć, możesz utworzyć instancję tf.data.Dataset za pomocą interfejsu API from_generator .

 mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)