Verteiltes Training mit TensorFlow

Auf TensorFlow.org ansehen In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Überblick

tf.distribute.Strategy ist ein TensorFlow API Ausbildung über mehrere GPUs, mehrere Maschinen oder TPUs zu verteilen. Mit dieser API können Sie Ihre vorhandenen Modelle und Ihren Trainingscode mit minimalen Codeänderungen verteilen.

tf.distribute.Strategy wurde mit diesen wichtigsten Ziele im Auge:

  • Einfach zu bedienen und unterstützt mehrere Benutzersegmente, einschließlich Forscher, Ingenieure für maschinelles Lernen usw.
  • Bieten Sie eine gute Leistung aus der Box.
  • Einfacher Wechsel zwischen Strategien.

tf.distribute.Strategy kann mit einer High-Level - API verwendet werden , wie Keras , und kann auch verwendet werden individuelle Trainingsschlaufen (und im allgemeinen kann jede Berechnung unter Verwendung TensorFlow) zu verteilen.

In TensorFlow 2.x können Sie Ihre Programme mit Spannung oder in einem Diagramm mit ausführen tf.function . tf.distribute.Strategy beabsichtigt , diese beiden Arten der Ausführung zu unterstützen, aber funktioniert am besten mit tf.function . Eager - Modus wird für Debugging - Zwecke nur und nicht für unterstützten tf.distribute.TPUStrategy . Obwohl der Schwerpunkt dieses Handbuchs auf Schulungen liegt, kann diese API auch verwendet werden, um Auswertungen und Vorhersagen auf verschiedenen Plattformen zu verteilen.

Sie können mit tf.distribute.Strategy mit sehr wenigen Änderungen an Ihrem Code, da die zugrundeliegenden Komponenten von TensorFlow verändert wurden Strategie-bewusst zu werden. Dazu gehören Variablen, Ebenen, Modelle, Optimierer, Metriken, Zusammenfassungen und Prüfpunkte.

In diesem Leitfaden lernen Sie verschiedene Arten von Strategien kennen und wie Sie sie in verschiedenen Situationen anwenden können. Um zu erfahren , wie Performance - Probleme debuggen, finden Sie in der Optimieren TensorFlow GPU - Performance Guide.

# Import TensorFlow
import tensorflow as tf

Arten von Strategien

tf.distribute.Strategy beabsichtigt entlang verschiedener Achsen eine Reihe von Anwendungsfällen abzudecken. Einige dieser Kombinationen werden derzeit unterstützt und andere werden in Zukunft hinzugefügt. Einige dieser Achsen sind:

  • Synchrone vs asynchroner Ausbildung: Dies sind zwei gemeinsame Wege , mit Datenparallelität Ausbildung zu verteilen. Beim Sync-Training trainieren alle Worker synchron über verschiedene Slices von Eingabedaten und aggregieren Gradienten bei jedem Schritt. Beim asynchronen Training trainieren alle Worker unabhängig die Eingabedaten und aktualisieren Variablen asynchron. Normalerweise wird das Sync-Training über die All-Reduce- und Async-Through-Parameter-Server-Architektur unterstützt.
  • Hardware - Plattform: Sie können Ihre Ausbildung auf mehrere GPUs auf einer Maschine oder mehreren Maschinen in einem Netzwerk skalieren möchten (mit 0 oder mehr GPUs je) oder auf Cloud - TPUs.

Um diese Anwendungsfälle zu unterstützen, stehen 6 Strategien zur Verfügung. Im nächsten Abschnitt wird erläutert, welche davon in welchen Szenarien in TensorFlow unterstützt werden. Hier eine kurze Übersicht:

Schulungs-API Gespiegelte Strategie TPUS-Strategie MultiWorkerMirroredStrategy CentralStorageStrategie ParameterServerStrategie
Keras-API Unterstützt Unterstützt Unterstützt Experimentelle Unterstützung Experimentelle Unterstützung
Benutzerdefinierte Trainingsschleife Unterstützt Unterstützt Unterstützt Experimentelle Unterstützung Experimentelle Unterstützung
Schätzer-API Eingeschränkter Support Nicht unterstützt Eingeschränkter Support Eingeschränkter Support Eingeschränkter Support

Gespiegelte Strategie

tf.distribute.MirroredStrategy unterstützt synchrone Ausbildung auf mehrere GPUs auf einer Maschine verteilt. Es erstellt ein Replikat pro GPU-Gerät. Jede Variable im Modell wird über alle Replikate gespiegelt. Zusammen bilden diese Variablen eine einzige begriffliche Variable namens MirroredVariable . Diese Variablen werden durch Anwenden identischer Aktualisierungen synchron gehalten.

Effiziente All-Reduce-Algorithmen werden verwendet, um die Variablenaktualisierungen über die Geräte hinweg zu kommunizieren. All-Reduce aggregiert Tensoren über alle Geräte hinweg, indem sie sie addiert und auf jedem Gerät verfügbar macht. Es handelt sich um einen verschmolzenen Algorithmus, der sehr effizient ist und den Synchronisationsaufwand erheblich reduzieren kann. Es sind viele All-Reduce-Algorithmen und Implementierungen verfügbar, abhängig von der Art der Kommunikation, die zwischen den Geräten verfügbar ist. Standardmäßig verwendet es die die NVIDIA Collective Communication Library ( NCCL ) als ganz reduzieren Umsetzung. Sie können aus einigen anderen Optionen wählen oder Ihre eigenen schreiben.

Hier ist der einfachste Weg zur Schaffung von MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
2021-08-21 01:24:44.677825: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.686081: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.687041: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.689423: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-21 01:24:44.690022: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.690987: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.691896: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.284404: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.285446: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.286341: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.287150: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 14648 MB memory:  -> device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Dies wird eine schaffen MirroredStrategy Instanz, die alle GPUs verwenden werden, die auf TensorFlow sichtbar sind, und NCCL-wie die Quergerätekommunikation.

Wenn Sie nur einige der GPUs auf Ihrem Computer verwenden möchten, können Sie dies wie folgt tun:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Wenn Sie die geräteübergreifende Kommunikation außer Kraft setzen möchten, können Sie dies tun, die mit cross_device_ops Argument durch eine Instanz der Versorgung tf.distribute.CrossDeviceOps . Derzeit tf.distribute.HierarchicalCopyAllReduce und tf.distribute.ReductionToOneDevice sind zwei andere Optionen als tf.distribute.NcclAllReduce , die Standardeinstellung ist.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUS-Strategie

tf.distribute.TPUStrategy können Sie Ihre TensorFlow Training auf Tensor Processing Units (TPUs) laufen. TPUs sind die spezialisierten ASICs von Google, die entwickelt wurden, um Machine-Learning-Workloads drastisch zu beschleunigen. Sie sind auf Google Colab , der TPU Forschung Wolke und Wolke TPU .

In Bezug auf die verteilte Architektur Ausbildung, TPUStrategy ist die gleiche MirroredStrategy -es Synchrongeräte verteilt Ausbildung. TPUs bieten ihre eigene Implementierung effizienter all- reduzieren und andere kollektive Operationen über mehrere TPU Kerne, die in den Bereichen TPUStrategy .

Hier ist , wie Sie instanziiert würde TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

Die TPUClusterResolver Instanz hilft , den TPUs zu lokalisieren. In Colab müssen Sie keine Argumente dafür angeben.

Wenn Sie dies für Cloud TPUs verwenden möchten:

  • Sie müssen die Namen der TPU - Ressource in dem angeben tpu Argumente.
  • Sie müssen das tpu System explizit zu Beginn des Programms initialisieren. Dies ist erforderlich, bevor TPUs zur Berechnung verwendet werden können. Durch die Initialisierung des TPU-Systems wird auch der TPU-Speicher gelöscht. Daher ist es wichtig, diesen Schritt zuerst abzuschließen, um einen Zustandsverlust zu vermeiden.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy ist sehr ähnlich wie MirroredStrategy . Es implementiert synchrones verteiltes Training für mehrere Mitarbeiter, jeder mit potenziell mehreren GPUs. Ähnlich wie tf.distribute.MirroredStrategy , erstellt er Kopien aller Variablen im Modell auf jedem Gerät über alle Arbeitnehmer.

Hier ist der einfachste Weg zur Schaffung von MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy hat zwei Implementierungen für geräteübergreifende Kommunikation. CommunicationImplementation.RING ist RPC -Basis und unterstützt sowohl CPUs und GPUs. CommunicationImplementation.NCCL verwendet NCCL und bietet state-of-art Performance auf GPUs aber es CPUs nicht unterstützt. CollectiveCommunication.AUTO aufschiebt die Wahl Tensorflow. Sie können sie wie folgt angeben:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Einer der Hauptunterschiede, um Multi-Worker-Training im Vergleich zum Multi-GPU-Training in Gang zu bringen, ist das Multi-Worker-Setup. Die TF_CONFIG Umgebungsvariable ist der Standardweg in TensorFlow die Clusterkonfiguration an jeden Arbeiter zu spezifizieren , den Teil des Clusters ist. Erfahren Sie mehr über Aufstellen TF_CONFIG .

ParameterServerStrategie

Parameterservertraining ist eine gängige datenparallele Methode zum Hochskalieren des Modelltrainings auf mehreren Computern. Ein Parameterserver-Trainingscluster besteht aus Workern und Parameterservern. Variablen werden auf Parameterservern erstellt und in jedem Schritt von den Arbeitern gelesen und aktualisiert. Überprüfen Sie den out - Parameter - Server Training Tutorial für weitere Einzelheiten.

In TensorFlow 2, Parameter - Server Training verwendet eine zentrale Koordinator basierte Architektur über die tf.distribute.experimental.coordinator.ClusterCoordinator Klasse.

In dieser Implementierung werden die worker und parameter server - tf.distribute.Server parameter server laufen Aufgaben tf.distribute.Server s , die für die Aufgaben des Koordinators hören. Der Koordinator erstellt Ressourcen, verteilt Schulungsaufgaben, schreibt Prüfpunkte und kümmert sich um Aufgabenfehler.

In der Programmierung auf dem Koordinator ausgeführt wird , erhalten Sie ein verwenden ParameterServerStrategy Objekt einen Trainingsschritt und verwenden Sie einen definieren ClusterCoordinator zum Versand Trainingsschritt zu Remote - Mitarbeitern. So erstellen Sie sie am einfachsten:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

In TensorFlow 1, ParameterServerStrategy ist nur mit einem Estimator über verfügbar tf.compat.v1.distribute.experimental.ParameterServerStrategy Symbol.

CentralStorageStrategie

tf.distribute.experimental.CentralStorageStrategy tut Synchron Training auch. Variablen werden nicht gespiegelt, stattdessen werden sie auf der CPU platziert und Operationen werden über alle lokalen GPUs repliziert. Wenn nur eine GPU vorhanden ist, werden alle Variablen und Operationen auf dieser GPU platziert.

Erstellen Sie eine Instanz von CentralStorageStrategy von:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Dies wird eine erstellen CentralStorageStrategy Instanz , die alle sichtbaren GPUs und CPU verwenden. Die Aktualisierung von Variablen auf Replikaten wird aggregiert, bevor sie auf Variablen angewendet wird.

Andere Strategien

Zusätzlich zu den oben genannten Strategien, gibt es zwei weitere Strategien , die für das Prototyping nützlich sein könnten und das Debuggen bei der Verwendung von tf.distribute APIs.

Standardstrategie

Die Standardstrategie ist eine Verteilungsstrategie, die vorhanden ist, wenn keine explizite Verteilungsstrategie im Geltungsbereich ist. Es implementiert die tf.distribute.Strategy Schnittstelle , sondern ist ein Pass-Through und gibt keine tatsächliche Verteilung. Zum Beispiel strategy.run(fn) einfach anrufen fn . Code, der mit dieser Strategie geschrieben wurde, sollte sich genauso verhalten wie Code, der ohne Strategie geschrieben wurde. Sie können es sich als "no-op"-Strategie vorstellen.

Die Standardstrategie ist ein Singleton – und man kann nicht mehr Instanzen davon erstellen. Es kann erhalten werden unter Verwendung tf.distribute.get_strategy außerhalb einer expliziten Strategie des Umfangs (die gleiche API , die verwendet werden können , die aktuelle Strategie innerhalb einer expliziten Strategie des Oszilloskops zu bekommen).

default_strategy = tf.distribute.get_strategy()

Diese Strategie dient zwei Hauptzwecken:

  • Es ermöglicht das bedingungslose Schreiben von verteilungsfähigem Bibliothekscode. Zum Beispiel in tf.optimizer s Sie verwenden können tf.distribute.get_strategy und verwenden Sie diese Strategie zur Verringerung des Gradienten-es wird immer wieder zurückkehren , eine Strategie Objekt , auf das Sie anrufen können , die Strategy.reduce API.
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Ähnlich wie Bibliothekscode kann er verwendet werden, um Endbenutzerprogramme so zu schreiben, dass sie mit und ohne Verteilungsstrategie arbeiten, ohne dass eine bedingte Logik erforderlich ist. Hier ist ein Beispiel-Code-Snippet, das dies veranschaulicht:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategie

tf.distribute.OneDeviceStrategy ist eine Strategie , alle Variablen und Berechnungen auf einem einzigen bestimmten Gerät zu platzieren.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Diese Strategie unterscheidet sich in vielerlei Hinsicht von der Default-Strategie. In der Standardstrategie bleibt die Variablenplatzierungslogik im Vergleich zur Ausführung von TensorFlow ohne Verteilungsstrategie unverändert. Aber bei der Verwendung von OneDeviceStrategy , alle in ihrem Umfang erstellt Variablen werden auf dem angegebenen Gerät explizit gesetzt. Darüber hinaus über alle genannten Funktionen OneDeviceStrategy.run wird auch auf das angegebene Gerät platziert werden.

Durch diese Strategie verteilte Eingaben werden vorab an das angegebene Gerät geholt. In der Standardstrategie gibt es keine Eingabeverteilung.

Ähnlich wie die Standardstrategie könnte diese Strategie auch verwendet werden, um Ihren Code zu testen, bevor Sie zu anderen Strategien wechseln, die tatsächlich auf mehrere Geräte/Maschinen verteilen. Dies wird die Vertriebsstrategie Maschinen etwas mehr als die Standard - Strategie, aber nicht in vollem Umfang zu nutzen, beispielsweise ausübt MirroredStrategy oder TPUStrategy . Wenn Sie Code wünschen, der sich so verhält, als ob es keine Strategie gäbe, verwenden Sie die Standardstrategie.

Bisher haben Sie die verschiedenen verfügbaren Strategien gesehen und wie Sie sie instanziieren können. Die nächsten Abschnitte zeigen die verschiedenen Möglichkeiten, wie Sie sie verwenden können, um Ihr Training zu verteilen.

Mit tf.distribute.Strategy mit tf.keras.Model.fit

tf.distribute.Strategy wird in integrierte tf.keras , die Umsetzung der TensorFlow ist , ist Keras API - Spezifikation . tf.keras ist ein High-Level - API zu bauen und für Eisenbahnmodelle. Durch die Integration in tf.keras Backend, es nahtlose für Sie Ihre Ausbildung in der Keras Ausbildung Framework geschrieben verteilen Model.fit .

Folgendes müssen Sie in Ihrem Code ändern:

  1. Erstellen Sie eine Instanz des entsprechenden tf.distribute.Strategy .
  2. Bewegen Sie die Erstellung von Keras Modell, Optimierer und Metriken innerhalb strategy.scope .

TensorFlow-Verteilungsstrategien unterstützen alle Arten von Keras-Modellen – sequentiell, funktional und untergeordnet.

Hier ist ein Code-Snippet, um dies für ein sehr einfaches Keras-Modell mit einer dichten Ebene zu tun:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

In diesem Beispiel wird MirroredStrategy , so können Sie dies auf einer Maschine mit mehreren GPUs laufen können. strategy.scope() gibt an , Keras die Strategie , die Ausbildung zu verteilen zu verwenden. Durch das Erstellen von Modellen/Optimierern/Metriken innerhalb dieses Bereichs können Sie verteilte Variablen anstelle von regulären Variablen erstellen. Sobald dies eingerichtet ist, können Sie Ihr Modell wie gewohnt anpassen. MirroredStrategy kümmert sich das Modell der Ausbildung auf die verfügbaren GPUs replizieren, Aggregieren Steigungen und vieles mehr.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-08-21 01:24:46.237677: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-08-21 01:24:46.271153: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 0.0086
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0038
2021-08-21 01:24:49.147347: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.0024
0.002372059039771557

Hier ein tf.data.Dataset bietet die Ausbildung und eval - Eingang. Sie können auch NumPy-Arrays verwenden:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0017
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 7.4622e-04
2021-08-21 01:24:50.486957: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
<keras.callbacks.History at 0x7f12401ede10>

In beiden Fällen mit- Dataset oder Numpy-Charge jeder der gegebenen Eingabe wird zu gleichen Teilen unter den mehreren Repliken. Zum Beispiel, wenn Sie verwenden MirroredStrategy mit 2 GPUs, jede Charge von Größe 10 wird zwischen den zwei GPUs erhalten unterteilt, wobei jeder Aufnahme 5 Eingabebeispiele in jedem Schritt. Jede Epoche wird dann schneller trainieren, wenn Sie mehr GPUs hinzufügen. Normalerweise möchten Sie Ihre Batchgröße erhöhen, wenn Sie weitere Beschleuniger hinzufügen, um die zusätzliche Rechenleistung effektiv zu nutzen. Je nach Modell müssen Sie auch Ihre Lernrate neu einstellen. Sie können mit strategy.num_replicas_in_sync die Anzahl der Replikate zu bekommen.

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

Was wird jetzt unterstützt?

Schulungs-API Gespiegelte Strategie TPUS-Strategie MultiWorkerMirroredStrategy ParameterServerStrategie CentralStorageStrategie
Keras-APIs Unterstützt Unterstützt Unterstützt Experimentelle Unterstützung Experimentelle Unterstützung

Beispiele und Tutorials

Hier ist eine Liste von Tutorials und Beispielen, die die obige Integration durchgängig mit Keras veranschaulichen:

  1. Tutorial zu trainieren MNIST mit MirroredStrategy .
  2. Tutorial MNIST mit trainieren MultiWorkerMirroredStrategy .
  3. Leitfaden auf die Ausbildung MNIST mit TPUStrategy .
  4. Tutorial für Parameter - Server Ausbildung in TensorFlow 2 mit ParameterServerStrategy .
  5. TensorFlow Modell Garden Repository Sammlungen von state-of-the-art - Modelle implementiert unter Verwendung verschiedener Strategien enthalten.

Mit tf.distribute.Strategy mit benutzerdefinierten Trainingsschlaufen

Wie Sie gesehen haben, mit tf.distribute.Strategy mit Keras model.fit erfordert nur ein paar Zeilen Code zu ändern. Mit etwas mehr Aufwand, können Sie auch tf.distribute.Strategy mit benutzerdefinierten Trainingsschlaufen.

Wenn Sie mehr Flexibilität und Kontrolle über Ihre Trainingsschleifen benötigen, als dies mit Estimator oder Keras möglich ist, können Sie benutzerdefinierte Trainingsschleifen schreiben. Wenn Sie beispielsweise ein GAN verwenden, möchten Sie möglicherweise jede Runde eine andere Anzahl von Generator- oder Diskriminatorschritten ausführen. Ebenso sind die High-Level-Frameworks nicht sehr geeignet für die Ausbildung zum Reinforcement Learning.

Die tf.distribute.Strategy Klassen bieten einen Kernsatz von Methoden zur Unterstützung individueller Trainingsschlaufen. Deren Verwendung kann anfänglich eine geringfügige Umstrukturierung des Codes erfordern, aber sobald dies erledigt ist, sollten Sie in der Lage sein, zwischen GPUs, TPUs und mehreren Maschinen zu wechseln, indem Sie einfach die Strategieinstanz ändern.

Hier sehen Sie einen kurzen Ausschnitt, der diesen Anwendungsfall für ein einfaches Trainingsbeispiel veranschaulicht, das dasselbe Keras-Modell wie zuvor verwendet.

Erstellen Sie zunächst das Modell und den Optimierer innerhalb des Geltungsbereichs der Strategie. Dadurch wird sichergestellt, dass alle mit dem Modell und dem Optimierer erstellten Variablen gespiegelte Variablen sind.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Als Nächstes erstellen Sie die Eingabedatei und ruft tf.distribute.Strategy.experimental_distribute_dataset den Datensatz basierend auf der Strategie zu verteilen.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-08-21 01:24:50.715370: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Definieren Sie dann einen Schritt des Trainings. Verwenden tf.GradientTape zu berechnen Steigungen und Optimierer diese Gradienten anzuwenden Ihres Modells Variablen zu aktualisieren. Um diesen Trainingsschritt zu verteilen, steckt es in einer Funktion train_step und übergibt es an tf.distrbute.Strategy.run zusammen mit den Daten - Set - Eingängen Sie den got dist_dataset erstellt vor:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Einige andere Dinge, die Sie im obigen Code beachten sollten:

  1. Sie verwendet tf.nn.compute_average_loss den Verlust zu berechnen. tf.nn.compute_average_loss fasst das pro Beispiel Verlust und teilt die Summe durch die global_batch_size. Dies ist wichtig , weil später , nachdem die Gradienten auf jeder Replik berechnet werden, werden sie über die Repliken aggregiert werden , indem sie summiert werden .
  2. Sie können auch die verwendeten tf.distribute.Strategy.reduce API die Ergebnisse von zurück aggregieren tf.distribute.Strategy.run . tf.distribute.Strategy.run liefert Ergebnisse von jedem lokalen Replik in der Strategie, und es gibt mehr Möglichkeiten , um dieses Ergebnis zu verbrauchen. Sie können reduce sie einen aggregierten Wert zu erhalten. Sie können auch tun tf.distribute.Strategy.experimental_local_results die Liste bekommen von Werten in dem Ergebnis enthalten sind , je eine pro lokale Replik.
  3. Wenn Sie anrufen apply_gradients innerhalb einer Vertriebsstrategie Umfang, wird ihr Verhalten geändert. Insbesondere führt es vor dem Anwenden von Gradienten auf jede parallele Instanz während des synchronen Trainings eine Gesamtsummenreproduktion der Gradienten durch.

Schließlich , sobald Sie den Trainingsschritt definiert haben, können Sie über iterieren dist_dataset und die Ausbildung in einer Schleife ausgeführt:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(1.3084135, shape=(), dtype=float32)
tf.Tensor(1.2977839, shape=(), dtype=float32)
tf.Tensor(1.2872384, shape=(), dtype=float32)
tf.Tensor(1.2767767, shape=(), dtype=float32)
tf.Tensor(1.2663989, shape=(), dtype=float32)
tf.Tensor(1.256105, shape=(), dtype=float32)
tf.Tensor(1.2458944, shape=(), dtype=float32)
tf.Tensor(1.2357674, shape=(), dtype=float32)
tf.Tensor(1.2257235, shape=(), dtype=float32)
tf.Tensor(1.2157627, shape=(), dtype=float32)
tf.Tensor(1.2058848, shape=(), dtype=float32)
tf.Tensor(1.1960893, shape=(), dtype=float32)
tf.Tensor(1.1863762, shape=(), dtype=float32)
tf.Tensor(1.1767453, shape=(), dtype=float32)
tf.Tensor(1.1671963, shape=(), dtype=float32)
tf.Tensor(1.1577287, shape=(), dtype=float32)
tf.Tensor(1.1483426, shape=(), dtype=float32)
tf.Tensor(1.1390375, shape=(), dtype=float32)
tf.Tensor(1.1298131, shape=(), dtype=float32)
tf.Tensor(1.1206692, shape=(), dtype=float32)

In dem obigen Beispiel iteriert Sie über dist_dataset Eingang zu Ihrer Ausbildung. Sie sind auch mit dem mitgelieferten tf.distribute.Strategy.make_experimental_numpy_dataset zu unterstützen NumPy Eingänge. Sie können diese API verwenden , einen Datensatz vor dem Aufruf erstellen tf.distribute.Strategy.experimental_distribute_dataset .

Eine andere Möglichkeit, Ihre Daten zu iterieren, besteht darin, explizit Iteratoren zu verwenden. Sie können dies tun, wenn Sie eine bestimmte Anzahl von Schritten ausführen möchten, anstatt über das gesamte Dataset zu iterieren. Die obige Iteration würde jetzt modifiziert werden , um zunächst einen Iterator zu erstellen und dann explizit aufrufen next sie auf, die Eingangsdaten zu erhalten.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(1.1116053, shape=(), dtype=float32)
tf.Tensor(1.1026212, shape=(), dtype=float32)
tf.Tensor(1.0937165, shape=(), dtype=float32)
tf.Tensor(1.0848908, shape=(), dtype=float32)
tf.Tensor(1.0761441, shape=(), dtype=float32)
tf.Tensor(1.0674756, shape=(), dtype=float32)
tf.Tensor(1.0588851, shape=(), dtype=float32)
tf.Tensor(1.0503721, shape=(), dtype=float32)
tf.Tensor(1.0419363, shape=(), dtype=float32)
tf.Tensor(1.0335773, shape=(), dtype=float32)

Dies umfasst die einfachste Fall der Verwendung tf.distribute.Strategy API individuelle Trainingsschlaufen zu verteilen.

Was wird jetzt unterstützt?

Schulungs-API Gespiegelte Strategie TPUS-Strategie MultiWorkerMirroredStrategy ParameterServerStrategie CentralStorageStrategie
Benutzerdefinierte Trainingsschleife Unterstützt Unterstützt Unterstützt Experimentelle Unterstützung Experimentelle Unterstützung

Beispiele und Tutorials

Hier sind einige Beispiele für die Verwendung einer Verteilungsstrategie mit benutzerdefinierten Trainingsschleifen:

  1. Tutorial MNIST mit trainieren MirroredStrategy .
  2. Leitfaden auf die Ausbildung MNIST mit TPUStrategy .
  3. TensorFlow Modell Garden Repository Sammlungen von state-of-the-art - Modelle implementiert unter Verwendung verschiedener Strategien enthalten.

Andere Themen

In diesem Abschnitt werden einige Themen behandelt, die für mehrere Anwendungsfälle relevant sind.

Umgebungsvariable TF_CONFIG einrichten

Für Mehrarbeiterausbildung, wie bereits erwähnt, müssen Sie das einrichten TF_CONFIG Umgebungsvariable für jeden binären im Cluster ausgeführt wird . Die TF_CONFIG Umgebungsvariable ist ein JSON - String , die angibt , welche Aufgaben bildet einen Cluster, dessenderen Adressen und jede Rolle Aufgabe im Cluster. Das tensorflow/ecosystem Repo bietet eine Kubernetes Vorlage, die Sätze bis TF_CONFIG für Ihre Trainingsaufgaben.

Es gibt zwei Komponenten von TF_CONFIG : ein Cluster und eine Aufgabe.

  • Ein Cluster stellt Informationen über den Trainingscluster bereit, bei dem es sich um ein Diktier handelt, das aus verschiedenen Arten von Jobs wie z. B. Arbeitern besteht. Beim Training mit mehreren Mitarbeitern übernimmt normalerweise ein Mitarbeiter etwas mehr Verantwortung, wie das Speichern von Prüfpunkten und das Schreiben einer Zusammenfassungsdatei für TensorBoard zusätzlich zu dem, was ein normaler Mitarbeiter tut. Solche Arbeiter als „Chef“ Arbeiter bezeichnet wird, und es ist üblich , dass der Arbeitnehmer mit dem Index 0 als Haupt Arbeiter ernannt wird (in der Tat ist dies, wie tf.distribute.Strategy implementiert ist).
  • Eine Aufgabe hingegen gibt Auskunft über die aktuelle Aufgabe. Der erste Komponentencluster ist für alle Worker gleich, und die zweite Komponentenaufgabe ist für jeden Worker unterschiedlich und gibt den Typ und den Index dieses Workers an.

Ein Beispiel für TF_CONFIG ist:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Diese TF_CONFIG gibt an, dass es drei Arbeiter und zwei "ps" Aufgaben in dem "cluster" zusammen mit ihren Gastgebern und Häfen. Der "task" Teil gibt die Rolle der aktuellen Task in dem "cluster" -Worker 1 (der zweite Arbeiter). Gültige Rollen in einem Cluster sind "chief" , "worker" , "ps" und "evaluator" . Es sollte nicht "ps" Job außer bei der Verwendung von tf.distribute.experimental.ParameterServerStrategy .

Was kommt als nächstes?

tf.distribute.Strategy ist aktiv in der Entwicklung. Probieren Sie es aus und geben Sie Ihr Feedback mit GitHub Fragen .