Formation multi-travailleurs avec Estimateur

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Ce didacticiel montre comment tf.distribute.Strategy peut être utilisé pour la formation distribuée multi-travailleurs avec tf.estimator . Si vous écrivez votre code à l'aide tf.estimator et que vous souhaitez évoluer au-delà d'une seule machine avec des performances élevées, ce didacticiel est fait pour vous.

Avant de commencer, veuillez lire le guide de stratégie de distribution . Le tutoriel de formation multi-GPU est également pertinent, car ce tutoriel utilise le même modèle.

Installer

Tout d'abord, configurez TensorFlow et les importations nécessaires.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Fonction d'entrée

Ce didacticiel utilise l'ensemble de données MNIST de TensorFlow Datasets . Le code ici est similaire au didacticiel de formation multi-GPU avec une différence clé : lors de l'utilisation d'Estimator pour la formation multi-travailleurs, il est nécessaire de diviser l'ensemble de données par le nombre de travailleurs pour assurer la convergence du modèle. Les données d'entrée sont partitionnées par index de travail, de sorte que chaque travail traite 1/num_workers des parties distinctes de l'ensemble de données.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Une autre approche raisonnable pour parvenir à la convergence consisterait à mélanger l'ensemble de données avec des graines distinctes pour chaque travailleur.

Configuration multi-travailleurs

L'une des principales différences dans ce didacticiel (par rapport au didacticiel de formation multi-GPU ) est la configuration multi-travailleur. La variable d'environnement TF_CONFIG est le moyen standard de spécifier la configuration du cluster à chaque travailleur faisant partie du cluster.

Il existe deux composants de TF_CONFIG : cluster et task . cluster fournit des informations sur l'ensemble du cluster, à savoir les nœuds de calcul et les serveurs de paramètres du cluster. task fournit des informations sur la tâche en cours. Le premier cluster composants est le même pour tous les nœuds de calcul et serveurs de paramètres du cluster, et la deuxième task de composant est différente sur chaque nœud de calcul et serveur de paramètres et spécifie ses propres type et index . Dans cet exemple, le type de tâche est worker et l' index tâche est 0 .

À des fins d'illustration, ce tutoriel montre comment définir un TF_CONFIG avec 2 workers sur localhost . En pratique, vous créeriez plusieurs travailleurs sur une adresse IP et un port externes, et TF_CONFIG sur chaque travailleur de manière appropriée, c'est-à-dire modifier l' index la tâche.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Définir le modèle

Écrivez les couches, l'optimiseur et la fonction de perte pour l'entraînement. Ce didacticiel définit le modèle avec des couches Keras, similaire au didacticiel de formation multi-GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Pour entraîner le modèle, utilisez une instance de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy crée des copies de toutes les variables dans les couches du modèle sur chaque appareil sur tous les travailleurs. Il utilise CollectiveOps , une opération TensorFlow pour la communication collective, pour agréger les gradients et synchroniser les variables. Le guide tf.distribute.Strategy contient plus de détails sur cette stratégie.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Former et évaluer le modèle

Ensuite, spécifiez la stratégie de distribution dans RunConfig pour l'estimateur, et entraînez-vous et évaluez en tf.estimator.train_and_evaluate . Ce tutoriel distribue uniquement la formation en spécifiant la stratégie via train_distribute . Il est également possible de distribuer l'évaluation via eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
use `update_config_proto` instead.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.10881.
INFO:tensorflow:Loss for final step: 1.10881.
({'loss': 2.234131, 'global_step': 938}, [])

Optimiser les performances d'entraînement

Vous disposez maintenant d'un modèle et d'un estimateur multi-travailleurs optimisés par tf.distribute.Strategy . Vous pouvez essayer les techniques suivantes pour optimiser les performances de la formation multi-travailleur :

  • Augmenter la taille du lot : la taille du lot spécifiée ici est par GPU. En général, la plus grande taille de lot adaptée à la mémoire GPU est recommandée.
  • Variables de cast : Castez les variables en tf.float si possible. Le modèle officiel ResNet comprend un exemple de la façon dont cela peut être fait.
  • Utiliser la communication collective : MultiWorkerMirroredStrategy fournit plusieurs implémentations de communication collective .

    • RING implémente des collectifs basés sur des anneaux en utilisant gRPC comme couche de communication entre hôtes.
    • NCCL utilise le NCCL de Nvidia pour implémenter des collectifs.
    • AUTO reporte le choix à l'exécution.

    Le meilleur choix d'implémentation collective dépend du nombre et du type de GPU, ainsi que de l'interconnexion réseau dans le cluster. Pour remplacer le choix automatique, spécifiez une valeur valide pour le paramètre de communication du constructeur de MultiWorkerMirroredStrategy , par exemple communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Consultez la section Performances du guide pour en savoir plus sur les autres stratégies et outils que vous pouvez utiliser pour optimiser les performances de vos modèles TensorFlow.

Autres exemples de code

  1. Exemple de bout en bout pour la formation multi-travailleurs dans tensorflow/écosystème à l'aide de modèles Kubernetes. Cet exemple commence par un modèle Keras et le convertit en estimateur à l'aide de l'API tf.keras.estimator.model_to_estimator .
  2. Modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.