Individuelle Trainingsschleife mit Keras und MultiWorkerMirroredStrategy

Auf TensorFlow.org ansehen In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Überblick

Dieses Tutorial demonstriert das Multiworker-Training mit einer benutzerdefinierten Trainingsschleifen-API, die über MultiWorkerMirroredStrategy verteilt wird, sodass ein Keras-Modell, das für die Ausführung auf einem einzelnen Worker entwickelt wurde , mit minimalen Codeänderungen nahtlos an mehreren Workern arbeiten kann.

Wir verwenden benutzerdefinierte Trainingsschleifen, um unser Modell zu trainieren, weil sie uns Flexibilität und eine größere Kontrolle über das Training geben. Darüber hinaus ist es einfacher, das Modell und die Trainingsschleife zu debuggen. Ausführlichere Informationen finden Sie unter Erstellen einer Trainingsschleife von Grund auf .

Wenn Sie MultiWorkerMirroredStrategy model.fit , wie Sie MultiWorkerMirroredStrategy mit keras model.fit , model.fit stattdessen dieses Tutorial .

Der Leitfaden für verteilte Schulungen in TensorFlow bietet einen Überblick über die Verteilungsstrategien, die TensorFlow unterstützt, für diejenigen, die an einem tieferen Verständnis der tf.distribute.Strategy APIs interessiert sind.

Einrichten

Zunächst einige notwendige Importe.

import json
import os
import sys

Nehmen Sie vor dem Importieren von TensorFlow einige Änderungen an der Umgebung vor.

Deaktivieren Sie alle GPUs. Dies verhindert Fehler, die dadurch verursacht werden, dass die Mitarbeiter alle versuchen, dieselbe GPU zu verwenden. Für eine reale Anwendung würde sich jeder Arbeiter auf einer anderen Maschine befinden.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG Umgebungsvariable TF_CONFIG . Sie werden später mehr darüber TF_CONFIG .

os.environ.pop('TF_CONFIG', None)

Stellen Sie sicher, dass sich das aktuelle Verzeichnis im Python-Pfad befindet. Dadurch kann das Notebook die von %%writefile geschriebenen Dateien später %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Importieren Sie nun TensorFlow.

import tensorflow as tf

Datensatz- und Modelldefinition

Erstellen Sie als Nächstes eine mnist.py Datei mit einem einfachen Modell- und Dataset-Setup. Diese Python-Datei wird von den Worker-Prozessen in diesem Tutorial verwendet:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Multi-Worker-Konfiguration

Kommen wir nun in die Welt des Multi-Worker-Trainings. In TensorFlow wird die Umgebungsvariable TF_CONFIG für das Training auf mehreren Maschinen benötigt, von denen jeder möglicherweise eine andere Rolle hat. TF_CONFIG das unten verwendet wird, ist ein JSON-String, der verwendet wird, um die Clusterkonfiguration auf jedem Worker anzugeben, der Teil des Clusters ist. Dies ist die Standardmethode zum Angeben eines Clusters unter Verwendung von cluster_resolver.TFConfigClusterResolver , aber im Modul " distribute.cluster_resolver stehen andere Optionen zur Verfügung.

Beschreiben Sie Ihren Cluster

Hier ist eine Beispielkonfiguration:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Hier ist dieselbe TF_CONFIG als JSON-String serialisiert:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Es gibt zwei Komponenten von TF_CONFIG : cluster und task .

  • cluster ist für alle Arbeiter gleich und liefert Informationen über den Ausbildungscluster, der ein Diktier ist, das aus verschiedenen Arten von Jobs besteht, wie z. B. worker . Beim MultiWorkerMirroredStrategy -Training mit MultiWorkerMirroredStrategy übernimmt normalerweise ein worker etwas mehr Verantwortung, wie das Speichern von Prüfpunkten und das Schreiben einer Zusammenfassungsdatei für TensorBoard zusätzlich zu dem, was ein normaler worker tut. Ein solcher Arbeitnehmer als die bezeichnet wird chief Arbeiter, und es ist üblich , dass der worker mit index 0 als Chef ernannt worker (in der Tat ist dies, wie tf.distribute.Strategy implementiert ist).

  • task liefert Informationen über die aktuelle Aufgabe und ist für jeden Arbeiter unterschiedlich. Es gibt den type und den index dieses Arbeiters an.

In diesem Beispiel legen Sie den Task - type auf "worker" und der Task - index auf 0 . Diese Maschine ist der erste Arbeiter und wird zum Hauptarbeiter ernannt und erledigt mehr Arbeit als die anderen. Beachten Sie, dass auf anderen Maschinen die Umgebungsvariable TF_CONFIG gesetzt sein muss und sie das gleiche cluster Diktat haben sollte, aber einen anderen Task- type oder Task- index je nach den Rollen dieser Maschinen.

Zur Veranschaulichung zeigt dieses Tutorial, wie man eine TF_CONFIG mit 2 Arbeitern auf localhost . In der Praxis würden Benutzer mehrere Worker an externen IP-Adressen/Ports erstellen und TF_CONFIG für jeden Worker entsprechend TF_CONFIG .

In diesem Beispiel verwenden Sie 2 Arbeiter, die TF_CONFIG des ersten Arbeiters ist oben gezeigt. Für den zweiten Worker würden Sie tf_config['task']['index']=1

Oben ist tf_config nur eine lokale Variable in Python. Um es tatsächlich zum Konfigurieren des Trainings zu verwenden, muss dieses Wörterbuch als JSON serialisiert und in die Umgebungsvariable TF_CONFIG werden.

Umgebungsvariablen und Unterprozesse in Notebooks

Unterprozesse erben Umgebungsvariablen von ihren Eltern. Wenn Sie also in diesem jupyter notebook Prozess eine Umgebungsvariable jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Sie können auf die Umgebungsvariable von einem Subprozess aus zugreifen:

echo ${GREETINGS}
Hello TensorFlow!

Im nächsten Abschnitt verwenden Sie dies, um die TF_CONFIG an die Worker- TF_CONFIG zu übergeben. Sie würden Ihre Jobs nie wirklich auf diese Weise starten, aber für die Zwecke dieses Tutorials reicht es aus: Um ein minimales Multi-Worker-Beispiel zu demonstrieren.

MultiWorkerMirroredStrategy

Verwenden Sie zum Trainieren des Modells eine Instanz von tf.distribute.MultiWorkerMirroredStrategy , die Kopien aller Variablen in den Schichten des Modells auf jedem Gerät über alle Worker hinweg erstellt. Der tf.distribute.Strategy Leitfaden enthält weitere Details zu dieser Strategie.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

Verwenden Sie tf.distribute.Strategy.scope um anzugeben, dass beim tf.distribute.Strategy.scope Ihres Modells eine Strategie verwendet werden soll. Dies versetzt Sie in den " Cross-Replica-Kontext " für diese Strategie, was bedeutet, dass der Strategie die Kontrolle über Dinge wie die Platzierung von Variablen übertragen wird.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Teilen Sie Ihre Daten automatisch für alle Mitarbeiter

Beim Training mit mehreren Mitarbeitern ist das Sharding von Datensätzen nicht unbedingt erforderlich, es gibt Ihnen jedoch genau eine Semantik, wodurch mehr Training reproduzierbar wird, dh das Training an mehreren Mitarbeitern sollte dasselbe sein wie das Training an einem Mitarbeiter. Hinweis: Die Leistung kann in einigen Fällen beeinträchtigt sein.

Siehe: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Definieren Sie eine benutzerdefinierte Trainingsschleife und trainieren Sie das Modell

Optimierer angeben

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Definiere einen Trainingsschritt mit tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Checkpoint speichern und wiederherstellen

Die Implementierung von Prüfpunkten in einer benutzerdefinierten Trainingsschleife erfordert, dass der Benutzer sie verarbeitet, anstatt einen Keras-Callback zu verwenden. Sie können die Gewichte des Modells speichern und wiederherstellen, ohne das gesamte Modell speichern zu müssen.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Hier erstellen Sie einen tf.train.Checkpoint , der das Modell verfolgt, der von einem tf.train.CheckpointManager verwaltet wird, sodass nur der neueste Prüfpunkt erhalten bleibt.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Wenn Sie jetzt eine Wiederherstellung durchführen müssen, können Sie mit der praktischen Funktion tf.train.latest_checkpoint den neuesten gespeicherten tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Nachdem Sie den Kontrollpunkt wiederhergestellt haben, können Sie mit dem Training Ihrer benutzerdefinierten Trainingsschleife fortfahren.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

Vollständige Code-Einrichtung für Worker

Um tatsächlich mit MultiWorkerMirroredStrategy Sie Worker-Prozesse TF_CONFIG und ihnen eine TF_CONFIG .

Wie die mnist.py geschriebene Datei mnist.py ist hier die Datei main.py , die denselben Code enthält, den wir zuvor in dieser Kollaboration Schritt für Schritt durchgegangen sind. Wir schreiben ihn nur in eine Datei, damit jeder der Worker ihn ausführen wird:

Datei: main.py

Writing main.py

Trainieren und bewerten

Das aktuelle Verzeichnis enthält nun beide Python-Dateien:

ls *.py
main.py
mnist.py

Also json-serialisieren Sie die TF_CONFIG und fügen Sie sie zu den Umgebungsvariablen hinzu:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Jetzt können Sie einen Worker-Prozess starten, der main.py und die TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Zu dem obigen Befehl sind einige Dinge zu beachten:

  1. Es verwendet die %%bash die eine "Magie" des Notebooks ist , um einige Bash-Befehle auszuführen.
  2. Es verwendet das Flag --bg , um den bash Prozess im Hintergrund auszuführen, da dieser Worker nicht beendet wird. Es wartet auf alle Arbeiter, bevor es beginnt.

Der Worker-Prozess im Hintergrund druckt keine Ausgabe an dieses Notebook, daher leitet &> seine Ausgabe in eine Datei um, damit Sie sehen können, was passiert ist.

Warten Sie also einige Sekunden, bis der Vorgang gestartet wird:

import time
time.sleep(20)

Sehen Sie sich nun an, was bisher in die Logdatei des Arbeiters ausgegeben wurde:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

Die letzte Zeile der Protokolldatei sollte Started server with target: grpc://localhost:12345 . Der erste Arbeiter ist jetzt bereit und wartet darauf, dass alle anderen Arbeiter bereit sind, fortzufahren.

Aktualisieren Sie also die tf_config für den Prozess des zweiten Workers zum Abholen:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Starten Sie nun den zweiten Arbeiter. Dadurch wird das Training gestartet, da alle Arbeiter aktiv sind (daher ist kein Hintergrund für diesen Prozess erforderlich):

python main.py > /dev/null 2>&1

Wenn Sie nun die vom ersten Worker geschriebenen Protokolle erneut überprüfen, werden Sie feststellen, dass er am Training dieses Modells teilgenommen hat:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Multi-Worker-Training in der Tiefe

In diesem Tutorial wurde ein Custom Training Loop Workflow des Multi-Worker-Setups demonstriert. Eine detaillierte Beschreibung anderer Themen finden Sie im model.fit's guide zum Multi-Worker-Setup und gilt für CTLs.

Siehe auch

  1. Der Leitfaden „ Verteiltes Training in TensorFlow“ bietet einen Überblick über die verfügbaren Verteilungsstrategien.
  2. Offizielle Modelle , von denen viele so konfiguriert werden können, dass sie mehrere Vertriebsstrategien ausführen.
  3. Der Abschnitt Leistung im Handbuch enthält Informationen zu anderen Strategien und Tools, mit denen Sie die Leistung Ihrer TensorFlow-Modelle optimieren können.