Assistez au symposium Women in ML le 7 décembre Inscrivez-vous maintenant

Boucle de formation personnalisée avec Keras et MultiWorkerMirroredStrategy

Restez organisé à l'aide des collections Enregistrez et classez les contenus selon vos préférences.

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Ce didacticiel illustre la formation multi-travailleurs avec une API de boucle de formation personnalisée, distribuée via MultiWorkerMirroredStrategy, de sorte qu'un modèle Keras conçu pour s'exécuter sur un seul travailleur peut fonctionner de manière transparente sur plusieurs travailleurs avec un changement de code minimal.

Nous utilisons des boucles d'entraînement personnalisées pour entraîner notre modèle car elles nous donnent de la flexibilité et un plus grand contrôle sur l'entraînement. De plus, il est plus facile de déboguer le modèle et la boucle d'apprentissage. Des informations plus détaillées sont disponibles dans Rédaction d'une boucle de formation à partir de zéro .

Si vous cherchez comment utiliser MultiWorkerMirroredStrategy avec keras model.fit , reportez-vous plutôt à ce didacticiel .

Le guide Distributed Training in TensorFlow est disponible pour un aperçu des stratégies de distribution prises en charge par TensorFlow pour ceux qui souhaitent approfondir leur compréhension des API tf.distribute.Strategy .

Installer

Tout d'abord, quelques importations nécessaires.

import json
import os
import sys

Avant d'importer TensorFlow, apportez quelques modifications à l'environnement.

Désactivez tous les GPU. Cela évite les erreurs causées par les travailleurs essayant tous d'utiliser le même GPU. Pour une application réelle, chaque travailleur serait sur une machine différente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Réinitialisez la variable d'environnement TF_CONFIG , vous en saurez plus plus tard.

os.environ.pop('TF_CONFIG', None)

Assurez-vous que le répertoire actuel se trouve sur le chemin de python. Cela permet au notebook d'importer ultérieurement les fichiers écrits par %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Importez maintenant TensorFlow.

import tensorflow as tf

Définition du jeu de données et du modèle

Créez ensuite un fichier mnist.py avec une configuration simple de modèle et de jeu de données. Ce fichier python sera utilisé par les processus de travail dans ce tutoriel :

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configuration multi-travailleur

Entrons maintenant dans le monde de la formation multi-travailleurs. Dans TensorFlow, la variable d'environnement TF_CONFIG est requise pour l'entraînement sur plusieurs machines, chacune ayant éventuellement un rôle différent. TF_CONFIG utilisé ci-dessous est une chaîne JSON utilisée pour spécifier la configuration du cluster sur chaque travailleur faisant partie du cluster. Il s'agit de la méthode par défaut pour spécifier un cluster, à l'aide cluster_resolver.TFConfigClusterResolver , mais d'autres options sont disponibles dans le module distribute.cluster_resolver .

Décrivez votre cluster

Voici un exemple de configuration :

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Voici le même TF_CONFIG sérialisé en tant que chaîne JSON :

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Il existe deux composants de TF_CONFIG : cluster et task .

  • cluster est le même pour tous les travailleurs et fournit des informations sur le cluster de formation, qui est un dict composé de différents types d'emplois tels que worker . Dans la formation multi-travailleurs avec MultiWorkerMirroredStrategy , il y a généralement un worker qui assume un peu plus de responsabilités, comme enregistrer un point de contrôle et écrire un fichier récapitulatif pour TensorBoard en plus de ce qu'un worker régulier fait. Un tel travailleur est appelé le travailleur chief , et il est d'usage que le worker avec l' index 0 soit nommé worker principal (en fait, c'est ainsi que tf.distribute.Strategy est implémenté).

  • task fournit des informations sur la tâche en cours et est différente sur chaque travailleur. Il spécifie le type et l' index de ce travailleur.

Dans cet exemple, vous définissez le type de tâche sur "worker" et l' index de tâche sur 0 . Cette machine est le premier ouvrier et sera nommée ouvrier en chef et fera plus de travail que les autres. Notez que d'autres machines devront également avoir la variable d'environnement TF_CONFIG définie, et elle doit avoir le même dict cluster , mais un type de tâche ou index de tâche différent selon les rôles de ces machines.

À des fins d'illustration, ce tutoriel montre comment on peut définir un TF_CONFIG avec 2 workers sur localhost . En pratique, les utilisateurs créeraient plusieurs nœuds de calcul sur des adresses IP/ports externes et définiraient TF_CONFIG sur chaque nœud de travail de manière appropriée.

Dans cet exemple, vous utiliserez 2 travailleurs, le TF_CONFIG du premier travailleur est illustré ci-dessus. Pour le deuxième travailleur, vous tf_config['task']['index']=1

Ci-dessus, tf_config est juste une variable locale en python. Pour l'utiliser réellement pour configurer la formation, ce dictionnaire doit être sérialisé en tant que JSON et placé dans la variable d'environnement TF_CONFIG .

Variables d'environnement et sous-processus dans les notebooks

Les sous-processus héritent des variables d'environnement de leur parent. Donc, si vous définissez une variable d'environnement dans ce processus de jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Vous pouvez accéder à la variable d'environnement à partir d'un sous-processus :

echo ${GREETINGS}
Hello TensorFlow!

Dans la section suivante, vous l'utiliserez pour transmettre le TF_CONFIG aux sous-processus de travail. Vous ne lanceriez jamais vraiment vos tâches de cette façon, mais c'est suffisant pour les besoins de ce didacticiel : pour illustrer un exemple minimal multi-travailleur.

MultiWorkerMirroredStrategy

Pour former le modèle, utilisez une instance de tf.distribute.MultiWorkerMirroredStrategy , qui crée des copies de toutes les variables dans les couches du modèle sur chaque appareil sur tous les travailleurs. Le guide tf.distribute.Strategy contient plus de détails sur cette stratégie.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Utilisez tf.distribute.Strategy.scope pour spécifier qu'une stratégie doit être utilisée lors de la création de votre modèle. Cela vous place dans le « contexte de répliques croisées » pour cette stratégie, ce qui signifie que la stratégie contrôle des éléments tels que le placement des variables.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Partage automatique de vos données entre les travailleurs

Dans la formation multi-travailleurs, le partage de l'ensemble de données n'est pas nécessairement nécessaire, mais il vous donne une sémantique unique qui rend plus de formation plus reproductible, c'est-à-dire que la formation sur plusieurs travailleurs doit être la même que la formation sur un seul travailleur. Remarque : les performances peuvent être affectées dans certains cas.

Voir : distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Définir une boucle de formation personnalisée et former le modèle

Spécifier un optimiseur

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Définir une étape d'entraînement avec tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Enregistrement et restauration des points de contrôle

L'implémentation de points de contrôle dans une boucle d'entraînement personnalisée nécessite que l'utilisateur le gère au lieu d'utiliser un rappel keras. Il vous permet de sauvegarder les poids du modèle et de les restaurer sans avoir à sauvegarder le modèle entier.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Ici, vous allez créer un tf.train.Checkpoint qui suit le modèle, qui est géré par un tf.train.CheckpointManager afin que seul le dernier point de contrôle soit conservé.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Maintenant, lorsque vous avez besoin de restaurer, vous pouvez trouver le dernier point de contrôle enregistré à l'aide de la fonction pratique tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Après avoir restauré le point de contrôle, vous pouvez continuer à entraîner votre boucle d'entraînement personnalisée.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Configuration complète du code sur les travailleurs

Pour exécuter réellement avec MultiWorkerMirroredStrategy , vous devez exécuter des processus de travail et leur transmettre un TF_CONFIG .

Comme le fichier mnist.py écrit précédemment, voici le main.py qui contient le même code que nous avons parcouru étape par étape précédemment dans ce colab, nous l'écrivons simplement dans un fichier afin que chacun des travailleurs l'exécute :

Fichier: main.py

Writing main.py

Former et évaluer

Le répertoire courant contient maintenant les deux fichiers Python :

ls *.py
main.py
mnist.py

Donc json-sérialisez le TF_CONFIG et ajoutez-le aux variables d'environnement :

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Maintenant, vous pouvez lancer un processus de travail qui exécutera le main.py et utilisera TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Il y a quelques points à noter à propos de la commande ci-dessus :

  1. Il utilise le %%bash qui est un bloc- notes "magique" pour exécuter certaines commandes bash.
  2. Il utilise l'indicateur --bg pour exécuter le processus bash en arrière-plan, car ce travailleur ne se terminera pas. Il attend tous les travailleurs avant de commencer.

Le processus de travail en arrière-plan n'imprime pas la sortie sur ce bloc-notes, donc le &> redirige sa sortie vers un fichier, afin que vous puissiez voir ce qui s'est passé.

Alors, attendez quelques secondes que le processus démarre :

import time
time.sleep(20)

Regardez maintenant ce qui a été généré dans le fichier journal du travailleur jusqu'à présent :

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

La dernière ligne du fichier journal doit indiquer : Started server with target: grpc://localhost:12345 . Le premier travailleur est maintenant prêt et attend que tous les autres travailleurs soient prêts à continuer.

Donc, mettez à jour le tf_config pour que le processus du deuxième travailleur récupère :

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lancez maintenant le deuxième ouvrier. Cela démarrera la formation puisque tous les travailleurs sont actifs (il n'est donc pas nécessaire de mettre ce processus en arrière-plan) :

python main.py > /dev/null 2>&1

Maintenant, si vous revérifiez les journaux écrits par le premier nœud de calcul, vous verrez qu'il a participé à la formation de ce modèle :

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formation multi-travailleurs approfondie

Ce didacticiel a démontré un flux de travail de Custom Training Loop de la configuration multi-travailleur. Une description détaillée d'autres sujets est disponible dans le model.fit's guide de la configuration multi-travailleur et applicable aux CTL.

Voir également

  1. Le guide Distributed Training in TensorFlow fournit un aperçu des stratégies de distribution disponibles.
  2. Modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.
  3. La section Performances du guide fournit des informations sur d'autres stratégies et outils que vous pouvez utiliser pour optimiser les performances de vos modèles TensorFlow.