Eine Frage haben? Verbinden Sie sich mit der Community im TensorFlow Forum Visit Forum

Multi-Worker-Training mit Keras

Auf TensorFlow.org ansehen In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Überblick

Dieses Tutorial demonstriert das verteilte Training für mehrere Mitarbeiter mit dem Keras-Modell unter Verwendung der tf.distribute.Strategy API, insbesondere tf.distribute.MultiWorkerMirroredStrategy . Mit Hilfe dieser Strategie kann ein Keras-Modell, das für die Ausführung auf einem einzelnen Worker entwickelt wurde, nahtlos auf mehreren Workern mit minimaler Codeänderung arbeiten.

Der Leitfaden für verteilte Schulungen in TensorFlow bietet einen Überblick über die Verteilungsstrategien, die TensorFlow unterstützt, für diejenigen, die an einem tieferen Verständnis der tf.distribute.Strategy APIs interessiert sind.

Einrichten

Zunächst einige notwendige Importe.

import json
import os
import sys

Nehmen Sie vor dem Importieren von TensorFlow einige Änderungen an der Umgebung vor.

Deaktivieren Sie alle GPUs. Dies verhindert Fehler, die dadurch verursacht werden, dass die Mitarbeiter alle versuchen, dieselbe GPU zu verwenden. Für eine reale Anwendung würde sich jeder Arbeiter auf einer anderen Maschine befinden.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG Umgebungsvariable TF_CONFIG . Sie werden später mehr darüber TF_CONFIG .

os.environ.pop('TF_CONFIG', None)

Stellen Sie sicher, dass sich das aktuelle Verzeichnis im Python-Pfad befindet. Dadurch kann das Notebook die von %%writefile geschriebenen Dateien später %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Importieren Sie nun TensorFlow.

import tensorflow as tf

Datensatz- und Modelldefinition

Erstellen Sie als Nächstes eine mnist.py Datei mit einem einfachen Modell- und Dataset-Setup. Diese Python-Datei wird von den Worker-Prozessen in diesem Tutorial verwendet:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Versuchen Sie, das Modell für eine kleine Anzahl von Epochen zu trainieren und beobachten Sie die Ergebnisse eines einzelnen Arbeiters, um sicherzustellen, dass alles richtig funktioniert. Mit fortschreitendem Training sollte der Verlust sinken und die Genauigkeit sollte zunehmen.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2783 - accuracy: 0.2152
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2298 - accuracy: 0.3783
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1673 - accuracy: 0.5096
<tensorflow.python.keras.callbacks.History at 0x7fb074a356d0>

Multi-Worker-Konfiguration

Kommen wir nun in die Welt des Multi-Worker-Trainings. In TensorFlow wird die Umgebungsvariable TF_CONFIG für das Training auf mehreren Maschinen benötigt, von denen jeder möglicherweise eine andere Rolle hat. TF_CONFIG ist ein JSON-String, der verwendet wird, um die Clusterkonfiguration auf jedem Worker anzugeben, der Teil des Clusters ist.

Hier ist eine Beispielkonfiguration:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Hier ist dieselbe TF_CONFIG als JSON-String serialisiert:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Es gibt zwei Komponenten von TF_CONFIG : cluster und task .

  • cluster ist für alle Arbeiter gleich und liefert Informationen über den Ausbildungscluster, der ein Diktier ist, das aus verschiedenen Arten von Jobs besteht, wie z. B. worker . Beim MultiWorkerMirroredStrategy -Training mit MultiWorkerMirroredStrategy übernimmt normalerweise ein worker etwas mehr Verantwortung, wie das Speichern von Prüfpunkten und das Schreiben einer Zusammenfassungsdatei für TensorBoard zusätzlich zu dem, was ein normaler worker tut. Ein solcher Arbeitnehmer als die bezeichnet wird chief Arbeiter, und es ist üblich , dass der worker mit index 0 als Chef ernannt worker (in der Tat ist dies, wie tf.distribute.Strategy implementiert ist).

  • task liefert Informationen über die aktuelle Aufgabe und ist für jeden Arbeiter unterschiedlich. Es gibt den type und den index dieses Arbeiters an.

In diesem Beispiel legen Sie den Task - type auf "worker" und der Task - index auf 0 . Diese Maschine ist der erste Arbeiter und wird zum Hauptarbeiter ernannt und erledigt mehr Arbeit als die anderen. Beachten Sie, dass auf anderen Maschinen die Umgebungsvariable TF_CONFIG gesetzt sein muss und sie das gleiche cluster Diktat haben sollte, aber einen anderen Task- type oder Task- index je nach den Rollen dieser Maschinen.

Zur Veranschaulichung zeigt dieses Tutorial, wie man eine TF_CONFIG mit 2 Arbeitern auf localhost . In der Praxis würden Benutzer mehrere Worker an externen IP-Adressen/Ports erstellen und TF_CONFIG für jeden Worker entsprechend TF_CONFIG .

In diesem Beispiel verwenden Sie 2 Arbeiter, die TF_CONFIG des ersten Arbeiters ist oben gezeigt. Für den zweiten Worker würden Sie tf_config['task']['index']=1

Oben ist tf_config nur eine lokale Variable in Python. Um es tatsächlich zum Konfigurieren des Trainings zu verwenden, muss dieses Wörterbuch als JSON serialisiert und in die Umgebungsvariable TF_CONFIG werden.

Umgebungsvariablen und Unterprozesse in Notebooks

Unterprozesse erben Umgebungsvariablen von ihren Eltern. Wenn Sie also in diesem jupyter notebook Prozess eine Umgebungsvariable jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Sie können auf die Umgebungsvariable von einem Unterprozess aus zugreifen:

echo ${GREETINGS}
Hello TensorFlow!

Im nächsten Abschnitt verwenden Sie dies, um TF_CONFIG an die Worker-Subprozesse zu übergeben. Sie würden Ihre Jobs nie wirklich auf diese Weise starten, aber für die Zwecke dieses Tutorials reicht es aus: Um ein minimales Multi-Worker-Beispiel zu demonstrieren.

Wähle die richtige Strategie

In TensorFlow gibt es zwei Hauptformen des verteilten Trainings:

  • Synchrones Training, bei dem die Trainingsschritte zwischen den Workern und Replikaten synchronisiert werden, und
  • Asynchrones Training, bei dem die Trainingsschritte nicht strikt synchronisiert werden.

MultiWorkerMirroredStrategy , die empfohlene Strategie für das synchrone Training MultiWorkerMirroredStrategy Mitarbeiter, wird in diesem Handbuch demonstriert. Verwenden Sie zum Trainieren des Modells eine Instanz von tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy erstellt Kopien aller Variablen in den Schichten des Modells auf jedem Gerät über alle Worker hinweg. Es verwendet CollectiveOps , eine TensorFlow-Operation für die kollektive Kommunikation, um Gradienten zu aggregieren und die Variablen synchron zu halten. Der tf.distribute.Strategy Leitfaden enthält weitere Details zu dieser Strategie.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy bietet mehrere Implementierungen über den Parameter CommunicationOptions . RING implementiert ringbasierte Kollektive unter Verwendung von gRPC als hostübergreifende Kommunikationsschicht. NCCL verwendet NCCL von Nvidia , um Kollektive zu implementieren. AUTO die Auswahl auf die Laufzeit. Die beste Wahl der kollektiven Implementierung hängt von der Anzahl und Art der GPUs und der Netzwerkverbindung im Cluster ab.

Trainiere das Modell

Mit der Integration der tf.distribute.Strategy API in tf.keras ist die einzige Änderung, die Sie vornehmen werden, um das Training auf mehrere Mitarbeiter zu verteilen, das Einschließen des Modellerstellungs- und model.compile() Aufrufs in strategy.scope() . Der Geltungsbereich der Verteilungsstrategie bestimmt, wie und wo die Variablen erstellt werden, und im Fall von MultiWorkerMirroredStrategy handelt es sich bei den erstellten Variablen um MirroredVariable s, die auf jedem der Worker repliziert werden.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Um tatsächlich mit MultiWorkerMirroredStrategy Sie Worker-Prozesse TF_CONFIG und ihnen eine TF_CONFIG .

Wie die mnist.py geschriebene mnist.py Datei ist hier die main.py , die jeder der Worker ausführen wird:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Beachten Sie im global_batch_size Code-Snippet, dass global_batch_size , das an Dataset.batch wird, auf per_worker_batch_size * num_workers . Dadurch wird sichergestellt, dass jeder Worker Batches von per_worker_batch_size Beispielen verarbeitet, unabhängig von der Anzahl der Worker.

Das aktuelle Verzeichnis enthält nun beide Python-Dateien:

ls *.py
main.py
mnist.py

Also json-serialisieren Sie die TF_CONFIG und fügen Sie sie den Umgebungsvariablen hinzu:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Jetzt können Sie einen Worker-Prozess starten, der main.py und die TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Zu dem obigen Befehl sind einige Dinge zu beachten:

  1. Es verwendet die %%bash die eine "Magie" des Notebooks ist , um einige Bash-Befehle auszuführen.
  2. Es verwendet das Flag --bg , um den bash Prozess im Hintergrund auszuführen, da dieser Worker nicht beendet wird. Es wartet auf alle Arbeiter, bevor es beginnt.

Der Worker-Prozess im Hintergrund druckt keine Ausgabe an dieses Notebook, daher leitet &> seine Ausgabe in eine Datei um, damit Sie sehen können, was passiert ist.

Warten Sie also einige Sekunden, bis der Vorgang gestartet wird:

import time
time.sleep(10)

Sehen Sie sich nun an, was bisher in die Logdatei des Arbeiters ausgegeben wurde:

cat job_0.log
2021-06-16 18:40:46.618023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:47.717322: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:48.633623: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:48.633682: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633691: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633802: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:48.633834: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:48.633841: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:48.634554: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:48.639603: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:48.640062: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

Die letzte Zeile der Protokolldatei sollte Started server with target: grpc://localhost:12345 . Der erste Arbeiter ist jetzt bereit und wartet darauf, dass alle anderen Arbeiter bereit sind, fortzufahren.

Aktualisieren Sie also die tf_config für den Prozess des zweiten Workers zum Abholen:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Starten Sie nun den zweiten Arbeiter. Dadurch wird das Training gestartet, da alle Mitarbeiter aktiv sind (daher ist kein Hintergrund für diesen Prozess erforderlich):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 53ms/step - loss: 2.2947 - accuracy: 0.1365
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2710 - accuracy: 0.2564
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2412 - accuracy: 0.3920
2021-06-16 18:40:56.710304: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:57.818915: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:58.745385: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:58.745442: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:58.745451: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:58.745567: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:58.745603: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:58.745609: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:58.746272: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:58.751609: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:58.752063: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:40:59.797443: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-06-16 18:41:00.007608: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:41:00.007984: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz

Wenn Sie nun die vom ersten Worker geschriebenen Protokolle erneut überprüfen, werden Sie feststellen, dass er am Training dieses Modells teilgenommen hat:

cat job_0.log
2021-06-16 18:40:46.618023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:47.717322: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:48.633623: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:48.633682: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633691: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633802: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:48.633834: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:48.633841: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:48.634554: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:48.639603: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:48.640062: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:40:59.794960: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-06-16 18:41:00.007264: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:41:00.007667: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch 1/3
70/70 [==============================] - 6s 53ms/step - loss: 2.2947 - accuracy: 0.1365
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2710 - accuracy: 0.2564
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2412 - accuracy: 0.3920

Es überrascht nicht, dass dies langsamer lief als der Testlauf zu Beginn dieses Tutorials. Das Ausführen mehrerer Worker auf einem einzigen Computer verursacht nur zusätzlichen Aufwand. Das Ziel war hier nicht, die Einarbeitungszeit zu verbessern, sondern nur ein Beispiel für die Ausbildung von mehreren Arbeitern zu geben.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Multi-Worker-Training in der Tiefe

Bisher hat dieses Tutorial ein grundlegendes Multi-Worker-Setup demonstriert. Der Rest dieses Dokuments befasst sich im Detail mit anderen Faktoren, die für reale Anwendungsfälle nützlich oder wichtig sein können.

Datensatz-Sharding

Beim Training mit mehreren Mitarbeitern ist das Sharding von Datensätzen erforderlich, um Konvergenz und Leistung sicherzustellen.

Das Beispiel im vorherigen Abschnitt basiert auf dem standardmäßigen Autosharding, das von der tf.distribute.Strategy API bereitgestellt wird. Sie können das tf.data.experimental.AutoShardPolicy steuern, indem Sie die tf.data.experimental.AutoShardPolicy der tf.data.experimental.DistributeOptions . Weitere Informationen zum automatischen Sharding finden Sie im Handbuch zur verteilten Eingabe .

Hier ist ein kurzes Beispiel zum Deaktivieren des automatischen Shardings, damit jedes Replikat jedes Beispiel verarbeitet (nicht empfohlen):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Auswertung

Wenn Sie passieren validation_data in model.fit , wird es für jede Epoche zwischen Ausbildung und Bewertung abwechseln. Die Auswertung unter Verwendung von validation_data wird auf dieselbe Gruppe von Arbeitern verteilt und die Bewertungsergebnisse werden aggregiert und stehen für alle Arbeiter zur Verfügung. Ähnlich wie beim Training wird das Validierungs-Dataset automatisch auf Dateiebene geteilt. Sie müssen eine globale Batchgröße im Validierungs-Dataset festlegen und validation_steps festlegen. Zur Auswertung wird auch ein wiederholter Datensatz empfohlen.

Alternativ können Sie auch eine weitere Aufgabe erstellen, die periodisch Prüfpunkte liest und die Auswertung durchführt. Dies ist, was Estimator tut. Dies ist jedoch keine empfohlene Methode zur Durchführung der Evaluierung, und daher werden ihre Details ausgelassen.

Performance

Sie haben jetzt ein Keras-Modell, das mit MultiWorkerMirroredStrategy für die Ausführung in mehreren MultiWorkerMirroredStrategy . Sie können die folgenden Techniken ausprobieren, um die Leistung des Trainings für mehrere Mitarbeiter mit MultiWorkerMirroredStrategy zu MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy bietet mehrere gemeinsame Kommunikationsimplementierungen . RING implementiert ringbasierte Kollektive unter Verwendung von gRPC als hostübergreifende Kommunikationsschicht. NCCL verwendet NCCL von Nvidia , um Kollektive zu implementieren. AUTO die Auswahl auf die Laufzeit. Die beste Wahl der kollektiven Implementierung hängt von der Anzahl und Art der GPUs und der Netzwerkverbindung im Cluster ab. Um die automatische Auswahl zu überschreiben, geben Sie den Parameter communication_options des MultiWorkerMirroredStrategy von MultiWorkerMirroredStrategy , zB communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • tf.float die Variablen nach tf.float in tf.float . Das offizielle ResNet-Modell enthält ein Beispiel dafür, wie dies bewerkstelligt werden kann.

Fehlertoleranz

Beim synchronen Training würde der Cluster fehlschlagen, wenn einer der Worker ausfällt und kein Fehlerwiederherstellungsmechanismus vorhanden ist. Die Verwendung von Keras mit tf.distribute.Strategy den Vorteil der Fehlertoleranz in Fällen, in denen Arbeiter sterben oder anderweitig instabil sind. Dies tun Sie, indem Sie den Trainingsstatus im verteilten Dateisystem Ihrer Wahl beibehalten, sodass beim Neustart der Instanz, die zuvor fehlgeschlagen oder vorzeitig beendet wurde, der Trainingsstatus wiederhergestellt wird.

Wenn ein Worker nicht verfügbar ist, schlagen andere Worker fehl (möglicherweise nach einer Zeitüberschreitung). In solchen Fällen muss der nicht verfügbare Worker neu gestartet werden, ebenso wie andere Worker, die ausgefallen sind.

ModelCheckpoint-Rückruf

ModelCheckpoint Callback bietet keine Fehlertoleranzfunktion mehr, bitte verwenden BackupAndRestore stattdessen den BackupAndRestore Callback.

Der ModelCheckpoint Callback kann weiterhin zum Speichern von Prüfpunkten verwendet werden. Wenn das Training jedoch unterbrochen oder erfolgreich beendet wurde, um das Training vom Checkpoint aus fortzusetzen, ist der Benutzer dafür verantwortlich, das Modell manuell zu laden.

Optional kann der Benutzer das Modell/die Gewichte außerhalb des ModelCheckpoint Rückrufs speichern und wiederherstellen.

Modell speichern und laden

Um Ihr Modell mit model.save oder tf.saved_model.save , muss das Speicherziel für jeden Worker unterschiedlich sein. Bei den Nicht-Chefarbeitern müssen Sie das Modell in einem temporären Verzeichnis speichern und beim Chef müssen Sie das Modell im bereitgestellten Modellverzeichnis speichern. Die temporären Verzeichnisse auf dem Worker müssen eindeutig sein, um Fehler zu vermeiden, die dadurch entstehen, dass mehrere Worker versuchen, an denselben Speicherort zu schreiben. Das in allen Verzeichnissen gespeicherte Modell ist identisch und normalerweise sollte nur das vom Chef gespeicherte Modell zum Wiederherstellen oder Bereitstellen referenziert werden. Sie sollten eine Bereinigungslogik haben, die die temporären Verzeichnisse löscht, die von den Arbeitern erstellt wurden, sobald Ihre Schulung abgeschlossen ist.

Der Grund, warum Sie den Chef und die Arbeiter gleichzeitig sparen müssen, liegt darin, dass Sie möglicherweise während des Checkpointing Variablen aggregieren, was erfordert, dass sowohl der Chef als auch die Arbeiter am allreduce-Kommunikationsprotokoll teilnehmen. Auf der anderen Seite führt das Speichern von Chief und Workers im selben Modellverzeichnis zu Fehlern aufgrund von Konflikten.

Mit MultiWorkerMirroredStrategy wird das Programm auf jedem Worker ausgeführt, und um zu wissen, ob der aktuelle Worker Chef ist, nutzt es das Cluster-Resolver-Objekt mit den Attributen task_type und task_id . task_type sagt Ihnen, was der aktuelle Job ist (zB 'worker'), und task_id sagt Ihnen die Kennung des Arbeiters. Der Arbeiter mit der ID 0 wird als Hauptarbeiter bezeichnet.

Im folgenden Code-Snippet stellt write_filepath den zu schreibenden write_filepath bereit, der von der Worker-ID abhängt. Im Fall von Chief (Worker mit ID 0) schreibt es in den ursprünglichen Dateipfad; für andere erstellt es ein temporäres Verzeichnis (mit ID im Verzeichnispfad), in das geschrieben werden kann:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Damit können Sie jetzt speichern:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Wie oben beschrieben, sollte das Modell später nur von dem Pfad geladen werden, in dem der Chef gespeichert wurde, also entfernen wir die temporären, die die Nicht-Chefarbeiter gespeichert haben:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Wenn es nun Zeit zum Laden ist, verwenden wir die praktische tf.keras.models.load_model API und fahren mit der weiteren Arbeit fort. tf.keras.models.load_model Sie hier davon aus, dass Sie nur einen einzelnen Worker zum Laden und Fortsetzen des Trainings verwenden. In diesem Fall rufen Sie tf.keras.models.load_model innerhalb einer anderen strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.3081 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2914 - accuracy: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7fb08ad02dd0>

Checkpoint speichern und wiederherstellen

Andererseits können Sie mit Checkpointing die Gewichtungen des Modells speichern und wiederherstellen, ohne das gesamte Modell speichern zu müssen. Hier erstellen Sie einen tf.train.Checkpoint , der das Modell verfolgt, der von einem tf.train.CheckpointManager verwaltet wird, sodass nur der neueste Prüfpunkt erhalten bleibt.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Sobald der CheckpointManager eingerichtet ist, können Sie die gespeicherten Checkpoints speichern und entfernen.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Wenn Sie jetzt eine Wiederherstellung durchführen müssen, können Sie mit der praktischen Funktion tf.train.latest_checkpoint den neuesten gespeicherten tf.train.latest_checkpoint . Nachdem Sie den Kontrollpunkt wiederhergestellt haben, können Sie mit dem Training fortfahren.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 12ms/step - loss: 2.3080 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 11ms/step - loss: 2.2896 - accuracy: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7fb08a2b6bd0>

Rückruf von BackupAndRestore

Der Rückruf von BackupAndRestore bietet Fehlertoleranzfunktionen, indem das Modell und die aktuelle Epochennummer in einer temporären Prüfpunktdatei unter backup_dir Argument BackupAndRestore in BackupAndRestore . Dies geschieht am Ende jeder Epoche.

Sobald Jobs unterbrochen und neu gestartet werden, stellt der Callback den letzten Prüfpunkt wieder her und das Training wird am Anfang der unterbrochenen Epoche fortgesetzt. Jegliches Teiltraining, das bereits in der unvollendeten Epoche vor der Unterbrechung durchgeführt wurde, wird verworfen, damit es den endgültigen Modellzustand nicht beeinflusst.

Um es zu verwenden, stellen tf.keras.callbacks.experimental.BackupAndRestore beim Aufruf von tf.keras.Model.fit() eine Instanz von tf.keras.callbacks.experimental.BackupAndRestore tf.keras.Model.fit() .

Bei MultiWorkerMirroredStrategy pausiert der gesamte Cluster, wenn ein Worker unterbrochen wird, bis der unterbrochene Worker neu gestartet wird. Andere Worker werden ebenfalls neu gestartet und der unterbrochene Worker tritt dem Cluster wieder bei. Dann liest jeder Worker die zuvor gespeicherte Checkpoint-Datei und nimmt ihren früheren Zustand wieder auf, wodurch der Cluster wieder synchron wird. Dann geht das Training weiter.

BackupAndRestore Rückruf von BackupAndRestore verwendet CheckpointManager , um den Trainingsstatus zu speichern und wiederherzustellen, wodurch eine Datei namens checkpoint generiert wird, die vorhandene Prüfpunkte zusammen mit dem neuesten verfolgt. Aus diesem Grund sollte backup_dir nicht wiederverwendet werden, um andere Prüfpunkte zu speichern, um Namenskollisionen zu vermeiden.

Derzeit unterstützt BackupAndRestore Callback Single Worker ohne Strategie, MirroredStrategy und MultiWorker mit MultiWorkerMirroredStrategy. Im Folgenden sind zwei Beispiele für die Ausbildung mit mehreren Mitarbeitern und die Ausbildung für einzelne Mitarbeiter aufgeführt.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2774 - accuracy: 0.2183
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2264 - accuracy: 0.3663
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1693 - accuracy: 0.4643
<tensorflow.python.keras.callbacks.History at 0x7fb08a032790>

Wenn Sie das Verzeichnis von backup_dir BackupAndRestore , das Sie in BackupAndRestore angegeben BackupAndRestore , werden Sie möglicherweise einige vorübergehend generierte Prüfpunktdateien bemerken. Diese Dateien werden zum Wiederherstellen der zuvor verlorenen Instanzen benötigt und werden nach erfolgreichem Beenden Ihres Trainings von der Bibliothek am Ende von tf.keras.Model.fit() .

Siehe auch

  1. Der Leitfaden „ Verteiltes Training in TensorFlow“ bietet einen Überblick über die verfügbaren Verteilungsstrategien.
  2. Offizielle Modelle , von denen viele so konfiguriert werden können, dass sie mehrere Vertriebsstrategien ausführen.
  3. Der Abschnitt Leistung im Handbuch enthält Informationen zu anderen Strategien und Tools, mit denen Sie die Leistung Ihrer TensorFlow-Modelle optimieren können.