Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Verteilte Eingabe

Ansicht auf TensorFlow.org In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Die tf.distribute- APIs bieten Benutzern eine einfache Möglichkeit, ihr Training von einem einzelnen Computer auf mehrere Computer zu skalieren. Bei der Skalierung ihres Modells müssen Benutzer ihre Eingaben auch auf mehrere Geräte verteilen. tf.distribute bietet APIs, mit denen Sie Ihre Eingaben automatisch auf Geräte verteilen können.

Dieses Handbuch zeigt Ihnen die verschiedenen Möglichkeiten, wie Sie verteilte tf.distribute und Iteratoren mithilfe von tf.distribute APIs erstellen können. Darüber hinaus werden folgende Themen behandelt:

Dieses Handbuch behandelt nicht die Verwendung verteilter Eingaben mit Keras-APIs.

Verteilte Datensätze

Um tf.distribute APIs zum tf.distribute zu verwenden, wird empfohlen, dass Benutzer tf.data.Dataset , um ihre Eingabe darzustellen. tf.distribute wurde entwickelt, um effizient mit tf.data.Dataset (z. B. automatisches Vorabrufen von Daten auf jedes Beschleunigergerät), wobei Leistungsoptimierungen regelmäßig in die Implementierung einbezogen werden. Wenn Sie einen Anwendungsfall für die Verwendung von etwas anderem als tf.data.Dataset , tf.data.Dataset Sie bitte einen späteren Abschnitt in diesem Handbuch. In einer nicht verteilten Trainingsschleife erstellen Benutzer zuerst eine tf.data.Dataset Instanz und iterieren dann über die Elemente. Beispielsweise:

 import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.3.0

 global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

 
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Damit Benutzer die Strategie tf.distribute mit minimalen Änderungen am vorhandenen Code eines Benutzers verwenden können, wurden zwei APIs eingeführt, die eine tf.data.Dataset Instanz verteilen und ein verteiltes Dataset-Objekt zurückgeben. Ein Benutzer könnte dann über diese verteilte Dataset-Instanz iterieren und sein Modell wie zuvor trainieren. Schauen wir uns nun die beiden APIs - tf.distribute.Strategy.experimental_distribute_dataset und tf.distribute.Strategy.experimental_distribute_datasets_from_function - genauer an:

tf.distribute.Strategy.experimental_distribute_dataset

Verwendung

Diese API verwendet eine tf.data.Dataset Instanz als Eingabe und gibt eine tf.distribute.DistributedDataset Instanz zurück. Sie sollten das Eingabedatensatz mit einem Wert stapeln, der der globalen Stapelgröße entspricht. Diese globale Stapelgröße gibt die Anzahl der Proben an, die Sie in einem Schritt auf allen Geräten verarbeiten möchten. Sie können dieses verteilte Dataset pythonisch durchlaufen oder mit iter einen Iterator erstellen. Das zurückgegebene Objekt ist keine tf.data.Dataset Instanz und unterstützt keine anderen APIs, die das Dataset in irgendeiner Weise transformieren oder untersuchen. Dies ist die empfohlene API, wenn Sie nicht über bestimmte Methoden verfügen, mit denen Sie Ihre Eingaben über verschiedene Replikate hinweg speichern möchten.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Eigenschaften

Batching

tf.distribute sendet die eingegebene tf.data.Dataset Instanz mit einer neuen tf.distribute neu, die der globalen tf.data.Dataset geteilt durch die Anzahl der synchronisierten Replikate entspricht. Die Anzahl der synchronen Replikate entspricht der Anzahl der Geräte, die während des Trainings an der Gradienten-Allreduzierung teilnehmen. Wenn ein Benutzer den verteilten Iterator als next aufruft, wird auf jedem Replikat eine Stapelgröße von Daten pro Replikat zurückgegeben. Die Kardinalität des neu zugesetzten Datensatzes ist immer ein Vielfaches der Anzahl der Replikate. Hier einige Beispiele:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    Ohne Verteilung:

    Charge 1: [0, 1, 2, 3]

    Charge 2: [4, 5]

    Mit Verteilung auf 2 Repliken:

    Charge 1: Replik 1: [0, 1] Replik 2: [2, 3]

    Charge 2: Replik 2: [4] Replik 2: [5]

    Der letzte Stapel ([4, 5]) wird auf 2 Replikate aufgeteilt.

  • tf.data.Dataset.range(4).batch(4)

    Ohne Verteilung:

    Charge 1: [[0], [1], [2], [3]]

    Mit Verteilung auf 5 Repliken:

    Stapel 1: Replik 1: [0] Replik 2: [1] Replik 3: [2] Replik 4: [3] Replik 5: []

  • tf.data.Dataset.range(8).batch(4)

    Ohne Verteilung:

    Charge 1: [0, 1, 2, 3]

    Charge 2: [4, 5, 6, 7]

    Mit Verteilung auf 3 Repliken:

    Charge 1: Replik 1: [0, 1] Replik 2: [2, 3] Replik 3: []

    Charge 2: Replik 1: [4, 5] Replik 2: [6, 7] Replik 3: []

Das erneute Stapeln des Datasets weist eine Speicherplatzkomplexität auf, die linear mit der Anzahl der Replikate zunimmt. Dies bedeutet, dass für den Anwendungsfall der Multi-Worker-Schulung in der Eingabepipeline OOM-Fehler auftreten können.

Scherben

tf.distribute speichert den Eingabedatensatz auch automatisch in der Multi-Worker-Schulung. Jeder Datensatz wird auf dem CPU-Gerät des Workers erstellt. Das automatische Sharding eines Datasets über eine Gruppe von Workern bedeutet, dass jedem Worker eine Teilmenge des gesamten Datasets zugewiesen wird (wenn die richtige tf.data.experimental.AutoShardPolicy ist). Dies soll sicherstellen, dass bei jedem Schritt eine globale Stapelgröße nicht überlappender Datensatzelemente von jedem Mitarbeiter verarbeitet wird. Autosharding bietet verschiedene Optionen, die mit tf.data.experimental.DistributeOptions angegeben werden tf.data.experimental.DistributeOptions .

 dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
 

Es gibt drei verschiedene Optionen, die Sie für tf.data.experimental.AutoShardPolicy :

  • AUTO: Dies ist die Standardoption. Dies bedeutet, dass versucht wird, von FILE zu sharden. Der Versuch, durch FILE zu sharden, schlägt fehl, wenn ein dateibasiertes Dataset nicht erkannt wird. tf.distribute dann auf das Sharding durch DATA zurück. Beachten Sie, dass ein Fehler ausgelöst wird, wenn das Eingabedatensatz dateibasiert ist, die Anzahl der Dateien jedoch geringer ist als die Anzahl der Worker.
  • DATEI: Dies ist die Option, wenn Sie die Eingabedateien über alle Worker hinweg teilen möchten. Wenn die Anzahl der Dateien geringer ist als die Anzahl der Mitarbeiter, wird ein Fehler ausgegeben. Sie sollten diese Option verwenden, wenn die Anzahl der Eingabedateien viel größer als die Anzahl der Worker ist und die Daten in den Dateien gleichmäßig verteilt sind. Der Nachteil dieser Option besteht darin, dass Mitarbeiter inaktiv sind, wenn die Daten in den Dateien nicht gleichmäßig verteilt sind. Lassen Sie uns beispielsweise 2 Dateien auf 2 Worker mit jeweils 1 Replikat verteilen. Datei 1 enthält [0, 1, 2, 3, 4, 5] und Datei 2 enthält [6, 7, 8, 9, 10, 11]. Die Gesamtzahl der synchronisierten Replikate sei 2 und die globale Stapelgröße 4.

    • Arbeiter 0:

    Charge 1 = Replik 1: [0, 1]

    Charge 2 = Replik 1: [2, 3]

    Charge 3 = Replik 1: [4]

    Charge 4 = Replik 1: [5]

    • Arbeiter 1:

    Charge 1 = Replik 2: [6, 7]

    Charge 2 = Replik 2: [8, 9]

    Charge 3 = Replik 2: [10]

    Charge 4 = Replik 2: [11]

  • DATA: Dadurch werden die Elemente für alle Worker automatisch gespeichert. Jeder der Mitarbeiter liest den gesamten Datensatz und verarbeitet nur den ihm zugewiesenen Shard. Alle anderen Scherben werden verworfen. Dies wird im Allgemeinen verwendet, wenn die Anzahl der Eingabedateien geringer ist als die Anzahl der Mitarbeiter und Sie eine bessere Datenverteilung für alle Mitarbeiter wünschen. Der Nachteil ist, dass der gesamte Datensatz für jeden Mitarbeiter gelesen wird. Lassen Sie uns zum Beispiel 1 Dateien auf 2 Arbeiter verteilen. Datei 1 enthält [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Die Gesamtzahl der synchronen Replikate sei 2.

    • Arbeiter 0:

    Charge 1 = Replik 1: [0, 1]

    Charge 2 = Replik 1: [4, 5]

    Charge 3 = Replik 1: [8, 9]

    • Arbeiter 1:

    Charge 1 = Replik 2: [2, 3]

    Charge 2 = Replik 2: [6, 7]

    Charge 3 = Replik 2: [10, 11]

  • AUS: Wenn Sie das automatische Sharding deaktivieren, verarbeitet jeder Mitarbeiter alle Daten. Lassen Sie uns zum Beispiel 1 Dateien auf 2 Arbeiter verteilen. Datei 1 enthält [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Die Gesamtzahl der synchronisierten Replikate sei 2. Dann sieht jeder Worker die folgende Verteilung:

    • Arbeiter 0:

    Charge 1 = Replik 1: [0, 1]

    Charge 2 = Replik 1: [2, 3]

    Charge 3 = Replik 1: [4, 5]

    Charge 4 = Replik 1: [6, 7]

    Charge 5 = Replik 1: [8, 9]

    Charge 6 = Replik 1: [10, 11]

    • Arbeiter 1:

    Charge 1 = Replik 2: [0, 1]

    Charge 2 = Replik 2: [2, 3]

    Charge 3 = Replik 2: [4, 5]

    Charge 4 = Replik 2: [6, 7]

    Charge 5 = Replik 2: [8, 9]

    Charge 6 = Replik 2: [10, 11]

Prefetching

Standardmäßig fügt tf.distribute am Ende der vom Benutzer angegebenen tf.data.Dataset Instanz eine Prefetch-Transformation hinzu. Das Argument für die Prefetch-Transformation, buffer_size entspricht der Anzahl der synchronen Replikate.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

Verwendung

Diese API übernimmt eine Eingabefunktion und gibt eine tf.distribute.DistributedDataset Instanz zurück. Die Eingabefunktion, die Benutzer übergeben, hat ein tf.distribute.InputContext Argument und sollte eine tf.data.Dataset Instanz zurückgeben. Mit dieser API nimmt tf.distribute keine weiteren Änderungen an der von der Eingabefunktion zurückgegebenen tf.data.Dataset Instanz des Benutzers vor. Es liegt in der Verantwortung des Benutzers, den Datensatz zu stapeln und zu teilen. tf.distribute ruft die Eingabefunktion auf dem CPU-Gerät jedes Arbeiters auf. Diese API ermöglicht es Benutzern nicht nur, ihre eigene Batching- und Sharding-Logik anzugeben, tf.distribute.Strategy.experimental_distribute_dataset zeigt auch eine bessere Skalierbarkeit und Leistung im Vergleich zu tf.distribute.Strategy.experimental_distribute_dataset wenn sie für Schulungen für mehrere Mitarbeiter verwendet wird.

 mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Eigenschaften

Batching

Die tf.data.Dataset Instanz, die den Rückgabewert der Eingabefunktion darstellt, sollte unter Verwendung der Stapelgröße pro Replikat gestapelt werden. Die Stapelgröße pro Replikat ist die globale Stapelgröße geteilt durch die Anzahl der Replikate, die am Synchronisierungstraining teilnehmen. Dies liegt daran, dass tf.distribute die Eingabefunktion auf dem CPU-Gerät jedes Arbeiters aufruft. Das für einen bestimmten Worker erstellte Dataset sollte für alle Replikate dieses Workers bereit sein.

Scherben

Das tf.distribute.InputContext Objekt, das implizit als Argument an die Eingabefunktion des Benutzers übergeben wird, wird von tf.distribute unter der Haube erstellt. Es enthält Informationen zur Anzahl der Worker, zur aktuellen Worker-ID usw. Diese Eingabefunktion kann das Sharding gemäß den vom Benutzer festgelegten Richtlinien verarbeiten, indem diese Eigenschaften verwendet werden, die Teil des Objekts tf.distribute.InputContext .

Prefetching

tf.distribute keine Prefetch - Transformation am Ende ADD des tf.data.Dataset vom Benutzer bereitgestellten Eingabefunktion zurückgegeben.

Verteilte Iteratoren

Ähnlich wie bei nicht verteilten tf.data.Dataset Instanzen müssen Sie einen Iterator für die tf.distribute.DistributedDataset Instanzen tf.distribute.DistributedDataset , um darüber zu iterieren und auf die Elemente im tf.distribute.DistributedDataset . Auf folgende Weise können Sie einen tf.distribute.DistributedIterator erstellen und damit Ihr Modell trainieren:

Verwendungen

Verwenden Sie ein Pythonic for-Schleifenkonstrukt

Sie können eine benutzerfreundliche Pythonic-Schleife verwenden, um das tf.distribute.DistributedDataset . Die vom tf.distribute.DistributedIterator zurückgegebenen Elemente können ein einzelner tf.Tensor oder ein tf.distribute.DistributedValues der einen Wert pro Replikat enthält. Wenn Sie die Schleife in eine tf.function wird die Leistung tf.function . break und return werden derzeit jedoch nicht unterstützt, wenn sich die Schleife in einer tf.function . Wir unterstützen es auch nicht, die Schleife innerhalb einer tf.function wenn Multi-Worker-Strategien wie tf.distribute.experimental.MultiWorkerMirroredStrategy und tf.distribute.TPUStrategy . Das Platzieren der Schleife innerhalb von tf.function funktioniert für einzelne Worker tf.distribute.TPUStrategy jedoch nicht bei Verwendung von TPU-Pods.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Verwenden Sie iter , um einen expliziten Iterator zu erstellen

Um die Elemente in einer tf.distribute.DistributedDataset Instanz zu tf.distribute.DistributedDataset , können tf.distribute.DistributedIterator mithilfe der iter API einen tf.distribute.DistributedIterator erstellen. Mit einem expliziten Iterator können Sie für eine feste Anzahl von Schritten iterieren. Um das nächste Element von einer tf.distribute.DistributedIterator Instanz dist_iterator , können Sie next(dist_iterator) , dist_iterator.get_next() oder dist_iterator.get_next_as_optional() . Die beiden ersteren sind im Wesentlichen gleich:

 num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
 
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

Mit next() oder tf.distribute.DistributedIterator.get_next() wird ein OutOfRange-Fehler ausgelöst, wenn der tf.distribute.DistributedIterator sein Ende erreicht hat. Der Client kann den Fehler auf Python-Seite abfangen und andere Arbeiten wie Checkpointing und Evaluierung fortsetzen. Dies funktioniert jedoch nicht, wenn Sie eine Host-Trainingsschleife verwenden (dh mehrere Schritte pro tf.function ). tf.function sieht folgendermaßen aus:

 @tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
 

train_fn enthält mehrere Schritte, indem der Schrittkörper in einen tf.range . In diesem Fall können verschiedene Iterationen in der Schleife ohne Abhängigkeit parallel beginnen, sodass in späteren Iterationen ein OutOfRange-Fehler ausgelöst werden kann, bevor die Berechnung der vorherigen Iterationen abgeschlossen ist. Sobald ein OutOfRange-Fehler ausgelöst wird, werden alle Operationen in der Funktion sofort beendet. Wenn dies ein Fall ist, den Sie vermeiden möchten, ist tf.distribute.DistributedIterator.get_next_as_optional() eine Alternative, die keinen OutOfRange-Fehler tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional gibt ein tf.experimental.Optional das das nächste Element oder keinen Wert enthält, wenn der tf.distribute.DistributedIterator ein Ende erreicht hat.

 # You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
 
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Verwenden der Eigenschaft element_spec

Wenn Sie die Elemente eines verteilten Datensatzes an einen Pass tf.function und wollen eine tf.TypeSpec Garantie, können Sie das angeben input_signature Argument der tf.function . Die Ausgabe eines verteilten Datasets ist tf.distribute.DistributedValues das die Eingabe für ein einzelnes Gerät oder mehrere Geräte darstellen kann. Um die diesem verteilten Wert entsprechende tf.TypeSpec , können Sie die Eigenschaft element_spec des verteilten Datasets oder des verteilten Iteratorobjekts verwenden.

 global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs
  
  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Teilchargen

tf.data.Dataset treten auf, wenn von Benutzern erstellte tf.data.Dataset Instanzen tf.data.Dataset enthalten, die nicht gleichmäßig durch die Anzahl der Replikate teilbar sind, oder wenn die Kardinalität der Dataset-Instanz nicht durch die Stapelgröße teilbar ist. Wenn das Dataset auf mehrere Replikate verteilt ist, führt der next Aufruf einiger Iteratoren zu einem OutOfRangeError. Um diesen Anwendungsfall zu behandeln, gibt tf.distribute Dummy-Stapel der tf.distribute 0 für Replikate zurück, für die keine weiteren Daten verarbeitet werden müssen.

Wenn im Einzelfall keine Daten beim next Aufruf des Iterators zurückgegeben werden, werden Dummy-Stapel mit einer Stapelgröße von 0 erstellt und zusammen mit den tatsächlichen Daten im Datensatz verwendet. Bei Teilstapeln enthält der letzte globale Datenstapel neben Dummy-Datenstapeln auch echte Daten. Die Stoppbedingung zum Verarbeiten von Daten prüft nun, ob eines der Replikate Daten enthält. Wenn auf keinem der Replikate Daten vorhanden sind, wird ein OutOfRange-Fehler ausgegeben.

Für den Fall mit mehreren Mitarbeitern wird der boolesche Wert, der das Vorhandensein von Daten für jeden Mitarbeiter darstellt, mithilfe der Replikationsübergreifenden Kommunikation aggregiert. Auf diese Weise wird ermittelt, ob alle Mitarbeiter die Verarbeitung des verteilten Datensatzes abgeschlossen haben. Da dies eine arbeitnehmerübergreifende Kommunikation beinhaltet, ist eine gewisse Leistungsbeeinträchtigung erforderlich.

Vorsichtsmaßnahmen

  • Bei Verwendung von tf.distribute.Strategy.experimental_distribute_dataset APIs mit mehreren Worker-Setups übergeben Benutzer ein tf.data.Dataset , das aus Dateien liest. Wenn tf.data.experimental.AutoShardPolicy auf AUTO oder FILE , ist die tatsächliche tf.data.experimental.AutoShardPolicy pro Schritt möglicherweise kleiner als die benutzerdefinierte globale Stapelgröße. Dies kann passieren, wenn die verbleibenden Elemente in der Datei kleiner als die globale Stapelgröße sind. Benutzer können das Dataset entweder ohne Abhängigkeit von der Anzahl der auszuführenden Schritte tf.data.experimental.AutoShardPolicy oder tf.data.experimental.AutoShardPolicy auf DATA , um es zu tf.data.experimental.AutoShardPolicy .

  • Stateful-Dataset-Transformationen werden derzeit mit tf.distribute nicht unterstützt, und Stateful-Ops, über die das Dataset möglicherweise verfügt, werden derzeit ignoriert. Wenn Ihr Dataset beispielsweise eine map_fn , die tf.random.uniform , um ein Bild zu drehen, haben Sie ein Dataset-Diagramm, das vom Status (dh dem zufälligen Startwert) auf dem lokalen Computer abhängt, auf dem der Python-Prozess ausgeführt wird.

  • Experimentelle tf.data.experimental.OptimizationOptions , die standardmäßig deaktiviert sind, können in bestimmten Kontexten - beispielsweise in Verbindung mit tf.distribute - zu einer Leistungsverschlechterung führen. Sie sollten sie erst aktivieren, nachdem Sie überprüft haben, ob sie die Leistung Ihrer Workload in einer Verteilungseinstellung verbessern.

  • Die Reihenfolge, in der die Daten von den Mitarbeitern bei Verwendung von tf.distribute.experimental_distribute_dataset oder tf.distribute.experimental_distribute_datasets_from_function werden, kann nicht garantiert werden. Dies ist normalerweise erforderlich, wenn Sie tf.distribute , um die Vorhersage zu skalieren. Sie können jedoch für jedes Element einen Index in den Stapel einfügen und die Ausgaben entsprechend bestellen. Das folgende Snippet ist ein Beispiel für die Reihenfolge der Ausgaben.

 mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Wie verteile ich meine Daten, wenn ich keine kanonische tf.data.Dataset-Instanz verwende?

Manchmal können Benutzer kein tf.data.Dataset , um ihre Eingabe tf.data.Dataset , und anschließend die oben genannten APIs, um das Dataset auf mehrere Geräte zu verteilen. In solchen Fällen können Sie rohe Tensoren oder Eingaben von einem Generator verwenden.

Verwenden Sie experimentelle_Verteilungswerte_von_Funktion für beliebige Tensoreingaben

strategy.run akzeptiert tf.distribute.DistributedValues die die Ausgabe von next(iterator) . Verwenden Sie zum tf.distribute.DistributedValues experimental_distribute_values_from_function , um tf.distribute.DistributedValues aus rohen Tensoren zu tf.distribute.DistributedValues .

 mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Verwenden Sie tf.data.Dataset.from_generator, wenn Ihre Eingabe von einem Generator stammt

Wenn Sie über eine Generatorfunktion verfügen, die Sie verwenden möchten, können tf.data.Dataset mithilfe der from_generator API eine tf.data.Dataset Instanz from_generator .

 mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)