Formation multi-travailleurs avec Keras

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Ce didacticiel montre comment effectuer une formation distribuée multi-travailleur avec un modèle Keras et l'API Model.fit à l'aide de l'API tf.distribute.Strategy , en particulier la classe tf.distribute.MultiWorkerMirroredStrategy . Avec l'aide de cette stratégie, un modèle Keras conçu pour s'exécuter sur un seul travailleur peut fonctionner de manière transparente sur plusieurs travailleurs avec un minimum de modifications de code.

Pour ceux qui souhaitent approfondir leur compréhension des API tf.distribute.Strategy , le guide Formation distribuée dans TensorFlow est disponible pour un aperçu des stratégies de distribution prises en charge par TensorFlow.

Pour savoir comment utiliser MultiWorkerMirroredStrategy avec Keras et une boucle de formation personnalisée, reportez-vous à Boucle de formation personnalisée avec Keras et MultiWorkerMirroredStrategy .

Notez que le but de ce didacticiel est de montrer un exemple minimal multi-travailleur avec deux travailleurs.

Installer

Commencez par quelques importations nécessaires :

import json
import os
import sys

Avant d'importer TensorFlow, apportez quelques modifications à l'environnement :

  1. Désactivez tous les GPU. Cela évite les erreurs causées par les travailleurs essayant tous d'utiliser le même GPU. Dans une application réelle, chaque travailleur serait sur une machine différente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Réinitialisez la variable d'environnement TF_CONFIG (vous en apprendrez plus à ce sujet plus tard) :
os.environ.pop('TF_CONFIG', None)
  1. Assurez-vous que le répertoire actuel se trouve sur le chemin de Python. Cela permet au notebook d'importer ultérieurement les fichiers écrits par %%writefile :
if '.' not in sys.path:
  sys.path.insert(0, '.')

Importez maintenant TensorFlow :

import tensorflow as tf

Définition du jeu de données et du modèle

Ensuite, créez un fichier mnist_setup.py avec une configuration simple de modèle et de jeu de données. Ce fichier Python sera utilisé par les processus de travail dans ce tutoriel :

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Formation modèle sur un seul travailleur

Essayez de former le modèle pour un petit nombre d'époques et observez les résultats d' un seul travailleur pour vous assurer que tout fonctionne correctement. Au fur et à mesure que l'entraînement progresse, la perte devrait diminuer et la précision devrait augmenter.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Configuration multi-travailleurs

Entrons maintenant dans le monde de la formation multi-travailleurs.

Un cluster avec des jobs et des tâches

Dans TensorFlow, la formation distribuée implique : un 'cluster' avec plusieurs tâches, et chacune des tâches peut avoir une ou plusieurs 'task' .

Vous aurez besoin de la variable d'environnement de configuration TF_CONFIG pour vous entraîner sur plusieurs machines, chacune ayant éventuellement un rôle différent. TF_CONFIG est une chaîne JSON utilisée pour spécifier la configuration du cluster pour chaque travailleur faisant partie du cluster.

Il existe deux composants d'une variable TF_CONFIG : 'cluster' et 'task' .

  • Un 'cluster' est le même pour tous les travailleurs et fournit des informations sur le cluster de formation, qui est un dict composé de différents types d'emplois, tels que 'worker' ou 'chief' .

    • Dans la formation multi-travailleurs avec tf.distribute.MultiWorkerMirroredStrategy , il y a généralement un 'worker' qui assume des responsabilités, telles que la sauvegarde d'un point de contrôle et l'écriture d'un fichier récapitulatif pour TensorBoard, en plus de ce qu'un 'worker' normal fait. Un tel 'worker' est appelé ouvrier en chef (avec un nom d'emploi 'chief' ).
    • Il est d'usage que le 'chief' nomme 'index' 0 (en fait, c'est ainsi que tf.distribute.Strategy est implémenté).
  • Une 'task' fournit des informations sur la tâche en cours et est différente pour chaque travailleur. Il spécifie le 'type' et 'index' de ce travailleur.

Ci-dessous un exemple de configuration :

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Voici le même TF_CONFIG sérialisé en tant que chaîne JSON :

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Notez que tf_config n'est qu'une variable locale en Python. Pour pouvoir l'utiliser pour une configuration de formation, ce dict doit être sérialisé en tant que JSON et placé dans une variable d'environnement TF_CONFIG .

Dans l'exemple de configuration ci-dessus, vous définissez la tâche 'type' sur 'worker' et la tâche 'index' sur 0 . Par conséquent, cette machine est le premier travailleur. Il sera nommé travailleur 'chief' et effectuera plus de travail que les autres.

À des fins d'illustration, ce didacticiel montre comment vous pouvez configurer une variable TF_CONFIG avec deux travailleurs sur un localhost .

En pratique, vous créeriez plusieurs nœuds de calcul sur des adresses IP/ports externes et définiriez une variable TF_CONFIG sur chaque nœud de calcul en conséquence.

Dans ce didacticiel, vous utiliserez deux travailleurs :

  • Le TF_CONFIG du premier ( 'chief' ) travailleur est montré ci-dessus.
  • Pour le deuxième travailleur, vous tf_config['task']['index']=1

Variables d'environnement et sous-processus dans les notebooks

Les sous-processus héritent des variables d'environnement de leur parent.

Par exemple, vous pouvez définir une variable d'environnement dans ce processus Jupyter Notebook comme suit :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Ensuite, vous pouvez accéder à la variable d'environnement à partir d'un sous-processus :

echo ${GREETINGS}
Hello TensorFlow!

Dans la section suivante, vous utiliserez une méthode similaire pour transmettre le TF_CONFIG aux sous-processus de travail. Dans un scénario réel, vous ne lanceriez pas vos travaux de cette façon, mais c'est suffisant dans cet exemple.

Choisissez la bonne stratégie

Dans TensorFlow, il existe deux formes principales d'entraînement distribué :

  • Formation synchrone , où les étapes de la formation sont synchronisées entre les nœuds de calcul et les répliques, et
  • Formation asynchrone , où les étapes de formation ne sont pas strictement synchronisées (par exemple, formation du serveur de paramètres ).

Ce didacticiel montre comment effectuer une formation multi-travailleur synchrone à l'aide d'une instance de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crée des copies de toutes les variables dans les couches du modèle sur chaque appareil sur tous les travailleurs. Il utilise CollectiveOps , une opération TensorFlow pour la communication collective, pour agréger les gradients et synchroniser les variables. Le guide tf.distribute.Strategy contient plus de détails sur cette stratégie.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fournit plusieurs implémentations via le paramètre tf.distribute.experimental.CommunicationOptions : 1) RING implémente des collectifs basés sur des anneaux en utilisant gRPC comme couche de communication entre hôtes ; 2) NCCL utilise la bibliothèque de communication collective NVIDIA pour implémenter des collectifs ; et 3) AUTO reporte le choix au temps d'exécution. Le meilleur choix d'implémentation collective dépend du nombre et du type de GPU, ainsi que de l'interconnexion réseau dans le cluster.

Former le modèle

Avec l'intégration de l'API tf.distribute.Strategy dans tf.keras , le seul changement que vous ferez pour distribuer la formation à plusieurs travailleurs est d'inclure la construction du modèle et l'appel model.compile() dans strategy.scope() . La portée de la stratégie de distribution dicte comment et où les variables sont créées, et dans le cas de MultiWorkerMirroredStrategy , les variables créées sont MirroredVariable s et elles sont répliquées sur chacun des travailleurs.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Pour exécuter réellement avec MultiWorkerMirroredStrategy , vous devez exécuter des processus de travail et leur transmettre un TF_CONFIG .

Comme le fichier mnist_setup.py écrit précédemment, voici le main.py que chacun des workers exécutera :

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Dans l'extrait de code ci-dessus, notez que global_batch_size , qui est transmis à Dataset.batch , est défini sur per_worker_batch_size * num_workers . Cela garantit que chaque travailleur traite des lots d'exemples per_worker_batch_size quel que soit le nombre de travailleurs.

Le répertoire courant contient maintenant les deux fichiers Python :

ls *.py
main.py
mnist_setup.py

Donc json-sérialisez le TF_CONFIG et ajoutez-le aux variables d'environnement :

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Maintenant, vous pouvez lancer un processus de travail qui exécutera le main.py et utilisera TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Il y a quelques points à noter à propos de la commande ci-dessus :

  1. Il utilise le %%bash qui est un bloc- notes "magique" pour exécuter certaines commandes bash.
  2. Il utilise l'indicateur --bg pour exécuter le processus bash en arrière-plan, car ce travailleur ne se terminera pas. Il attend tous les travailleurs avant de commencer.

Le processus de travail en arrière-plan n'imprime pas la sortie sur ce bloc-notes, donc le &> redirige sa sortie vers un fichier afin que vous puissiez inspecter ultérieurement ce qui s'est passé dans un fichier journal.

Alors, attendez quelques secondes que le processus démarre :

import time
time.sleep(10)

Maintenant, inspectez ce qui a été généré dans le fichier journal du travailleur jusqu'à présent :

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

La dernière ligne du fichier journal doit indiquer : Started server with target: grpc://localhost:12345 . Le premier travailleur est maintenant prêt et attend que tous les autres travailleurs soient prêts à continuer.

Donc, mettez à jour le tf_config pour que le processus du deuxième travailleur récupère :

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lancez le deuxième ouvrier. Cela démarrera la formation puisque tous les travailleurs sont actifs (il n'est donc pas nécessaire de mettre ce processus en arrière-plan) :

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Si vous revérifiez les journaux écrits par le premier nœud de calcul, vous apprendrez qu'il a participé à l'entraînement de ce modèle :

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Sans surprise, cela s'est déroulé plus lentement que le test exécuté au début de ce didacticiel.

Exécuter plusieurs travailleurs sur une seule machine ne fait qu'ajouter des frais généraux.

Le but ici n'était pas d'améliorer le temps de formation, mais seulement de donner un exemple de formation multi-travailleurs.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formation multi-travailleurs approfondie

Jusqu'à présent, vous avez appris à effectuer une configuration multi-travailleur de base.

Pendant le reste du didacticiel, vous découvrirez en détail d'autres facteurs, qui peuvent être utiles ou importants pour des cas d'utilisation réels.

Partage de l'ensemble de données

Dans la formation multi-travailleurs, le partage des ensembles de données est nécessaire pour assurer la convergence et les performances.

L'exemple de la section précédente s'appuie sur le partitionnement automatique par défaut fourni par l'API tf.distribute.Strategy . Vous pouvez contrôler le partitionnement en définissant tf.data.experimental.AutoShardPolicy de tf.data.experimental.DistributeOptions .

Pour en savoir plus sur le partitionnement automatique , consultez le Guide de saisie distribuée .

Voici un exemple rapide de la façon de désactiver le partitionnement automatique, afin que chaque réplica traite chaque exemple ( non recommandé ) :

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Évaluation

Si vous transmettez le validation_data dans Model.fit , il alternera entre la formation et l'évaluation pour chaque époque. L'évaluation prenant validation_data est répartie sur le même ensemble de travailleurs et les résultats de l'évaluation sont agrégés et disponibles pour tous les travailleurs.

Semblable à la formation, l'ensemble de données de validation est automatiquement partitionné au niveau du fichier. Vous devez définir une taille de lot globale dans le jeu de données de validation et définir les validation_steps .

Un jeu de données répété est également recommandé pour l'évaluation.

Vous pouvez également créer une autre tâche qui lit périodiquement les points de contrôle et exécute l'évaluation. C'est ce que fait l'estimateur. Mais ce n'est pas une méthode recommandée pour effectuer l'évaluation et ses détails sont donc omis.

Performance

Vous disposez maintenant d'un modèle Keras configuré pour s'exécuter sur plusieurs nœuds de calcul avec MultiWorkerMirroredStrategy .

Pour ajuster les performances de la formation multi-travailleurs, vous pouvez essayer ce qui suit :

  • tf.distribute.MultiWorkerMirroredStrategy fournit plusieurs implémentations de communication collective :

    • RING implémente des collectifs basés sur des anneaux en utilisant gRPC comme couche de communication entre hôtes.
    • NCCL utilise la bibliothèque de communication collective NVIDIA pour implémenter des collectifs.
    • AUTO reporte le choix à l'exécution.

    Le meilleur choix d'implémentation collective dépend du nombre de GPU, du type de GPU et de l'interconnexion réseau dans le cluster. Pour remplacer le choix automatique, spécifiez le paramètre communication_options du constructeur de MultiWorkerMirroredStrategy . Par example:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Convertissez les variables en tf.float si possible :

    • Le modèle officiel ResNet comprend un exemple de la façon dont cela peut être fait.

Tolérance aux pannes

Dans la formation synchrone, le cluster échouerait si l'un des nœuds de calcul tombe en panne et qu'aucun mécanisme de récupération en cas de panne n'existe.

L'utilisation de Keras avec tf.distribute.Strategy présente l'avantage de la tolérance aux pannes dans les cas où les travailleurs meurent ou sont autrement instables. Pour ce faire, vous pouvez conserver l'état d'apprentissage dans le système de fichiers distribué de votre choix, de sorte qu'au redémarrage de l'instance précédemment défaillante ou préemptée, l'état d'apprentissage est récupéré.

Lorsqu'un nœud de calcul devient indisponible, les autres nœuds de calcul échouent (éventuellement après un délai d'attente). Dans de tels cas, le travailleur indisponible doit être redémarré, ainsi que les autres travailleurs qui ont échoué.

Rappel ModelCheckpoint

Le rappel ModelCheckpoint ne fournit plus de fonctionnalité de tolérance aux pannes, veuillez utiliser le rappel BackupAndRestore à la place.

Le rappel ModelCheckpoint peut toujours être utilisé pour enregistrer des points de contrôle. Mais avec cela, si la formation a été interrompue ou terminée avec succès, afin de continuer la formation à partir du point de contrôle, l'utilisateur est responsable de charger le modèle manuellement.

En option, l'utilisateur peut choisir d'enregistrer et de restaurer le modèle/les poids en dehors du rappel ModelCheckpoint .

Enregistrement et chargement du modèle

Pour enregistrer votre modèle à l'aide model.save ou tf.saved_model.save , la destination d'enregistrement doit être différente pour chaque travailleur.

  • Pour les travailleurs non chefs, vous devrez enregistrer le modèle dans un répertoire temporaire.
  • Pour le chef, vous devrez enregistrer dans le répertoire de modèle fourni.

Les répertoires temporaires du nœud de calcul doivent être uniques pour éviter les erreurs résultant de plusieurs nœuds de calcul essayant d'écrire au même emplacement.

Le modèle enregistré dans tous les répertoires est identique, et généralement seul le modèle enregistré par le chef doit être référencé pour la restauration ou le service.

Vous devriez avoir une logique de nettoyage qui supprime les répertoires temporaires créés par les travailleurs une fois votre formation terminée.

La raison d'économiser sur le chef et les ouvriers en même temps est que vous pourriez agréger des variables pendant le point de contrôle, ce qui oblige le chef et les ouvriers à participer au protocole de communication allreduce. D'un autre côté, laisser le chef et les travailleurs enregistrer dans le même répertoire de modèles entraînera des erreurs dues à des conflits.

À l'aide de MultiWorkerMirroredStrategy , le programme est exécuté sur chaque travailleur, et afin de savoir si le travailleur actuel est chef, il tire parti de l'objet résolveur de cluster qui a les attributs task_type et task_id :

  • task_type vous indique quel est le travail actuel (par exemple 'worker' ).
  • task_id vous indique l'identifiant du travailleur.
  • Le travailleur avec task_id == 0 est désigné comme travailleur principal.

Dans l'extrait de code ci-dessous, la fonction write_filepath fournit le chemin du fichier à écrire, qui dépend du task_id du travailleur :

  • Pour le travailleur principal (avec task_id == 0 ), il écrit dans le chemin du fichier d'origine.
  • Pour les autres travailleurs, il crée un répertoire temporaire — temp_dir — avec le task_id dans le chemin du répertoire pour y écrire :
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Avec cela, vous êtes maintenant prêt à enregistrer :

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Comme décrit ci-dessus, plus tard, le modèle ne devrait être chargé qu'à partir du chemin sur lequel le chef a été enregistré, supprimons donc les temporaires que les travailleurs non chefs ont enregistrés :

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Maintenant, quand il est temps de charger, utilisons l'API pratique tf.keras.models.load_model et continuons avec d'autres travaux.

Ici, supposez que vous n'utilisez qu'un seul travailleur pour charger et continuer la formation, auquel cas vous n'appelez pas tf.keras.models.load_model dans un autre strategy.scope() (notez que strategy = tf.distribute.MultiWorkerMirroredStrategy() , comme défini précédemment ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Enregistrement et restauration des points de contrôle

D'autre part, les points de contrôle vous permettent de sauvegarder les poids de votre modèle et de les restaurer sans avoir à sauvegarder le modèle entier.

Ici, vous allez créer un tf.train.Checkpoint qui suit le modèle, qui est géré par le tf.train.CheckpointManager , de sorte que seul le dernier point de contrôle soit conservé :

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Une fois le CheckpointManager configuré, vous êtes maintenant prêt à enregistrer et à supprimer les points de contrôle que les travailleurs non chefs avaient enregistrés :

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Désormais, lorsque vous devez restaurer le modèle, vous pouvez trouver le dernier point de contrôle enregistré à l'aide de la fonction pratique tf.train.latest_checkpoint . Après avoir restauré le point de contrôle, vous pouvez continuer l'entraînement.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Rappel BackupAndRestore

Le rappel tf.keras.callbacks.BackupAndRestore fournit la fonctionnalité de tolérance aux pannes en sauvegardant le modèle et le numéro d'époque actuel dans un fichier de point de contrôle temporaire sous l'argument backup_dir de BackupAndRestore . Ceci est fait à la fin de chaque époque.

Une fois les tâches interrompues et redémarrées, le rappel restaure le dernier point de contrôle et la formation continue depuis le début de l'époque interrompue. Tout entraînement partiel déjà effectué dans l'époque inachevée avant l'interruption sera jeté, de sorte qu'il n'affecte pas l'état final du modèle.

Pour l'utiliser, fournissez une instance de tf.keras.callbacks.BackupAndRestore à l'appel Model.fit .

Avec MultiWorkerMirroredStrategy , si un travailleur est interrompu, l'ensemble du cluster s'interrompt jusqu'à ce que le travailleur interrompu soit redémarré. Les autres nœuds de calcul redémarrent également et le nœud de calcul interrompu rejoint le cluster. Ensuite, chaque travailleur lit le fichier de point de contrôle qui a été précédemment enregistré et récupère son ancien état, permettant ainsi au cluster de se resynchroniser. Ensuite, la formation se poursuit.

Le rappel BackupAndRestore utilise le CheckpointManager pour enregistrer et restaurer l'état de formation, ce qui génère un fichier appelé point de contrôle qui suit les points de contrôle existants avec le dernier. Pour cette raison, backup_dir ne doit pas être réutilisé pour stocker d'autres points de contrôle afin d'éviter une collision de noms.

Actuellement, le rappel BackupAndRestore prend en charge la formation d'un seul travailleur sans stratégie - MirroredStrategy - et la formation multi-travailleurs avec MultiWorkerMirroredStrategy .

Vous trouverez ci-dessous deux exemples de formation à plusieurs travailleurs et de formation à un seul travailleur :

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Si vous inspectez le répertoire de backup_dir que vous avez spécifié dans BackupAndRestore , vous remarquerez peut-être des fichiers de point de contrôle générés temporairement. Ces fichiers sont nécessaires pour récupérer les instances précédemment perdues, et ils seront supprimés par la bibliothèque à la fin de Model.fit lors de la sortie réussie de votre formation.

Ressources additionnelles

  1. Le guide Formation distribuée dans TensorFlow fournit une vue d'ensemble des stratégies de distribution disponibles.
  2. Le didacticiel Boucle de formation personnalisée avec Keras et MultiWorkerMirroredStrategy montre comment utiliser MultiWorkerMirroredStrategy avec Keras et une boucle de formation personnalisée.
  3. Découvrez les modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.
  4. Le guide Meilleures performances avec tf.function fournit des informations sur d'autres stratégies et outils, tels que le profileur TensorFlow , que vous pouvez utiliser pour optimiser les performances de vos modèles TensorFlow.