La journée communautaire ML est le 9 novembre ! Rejoignez - nous pour les mises à jour de tensorflow, JAX et plus En savoir plus

Formation multi-travailleurs avec Estimateur

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Ce tutoriel montre comment tf.distribute.Strategy peut être utilisé pour la formation distribuée à plusieurs travailleurs avec tf.estimator . Si vous écrivez votre code à l' aide tf.estimator , et vous vous intéressez à l' échelle au - delà d' une seule machine à haute performance, ce tutoriel est pour vous.

Avant de commencer, s'il vous plaît lire la stratégie de distribution guide. Le didacticiel de formation multi-GPU est également pertinente, parce que ce tutoriel utilise le même modèle.

Installer

Tout d'abord, configurez TensorFlow et les importations nécessaires.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Fonction d'entrée

Ce tutoriel utilise l'ensemble de données MNIST de tensorflow datasets . Le code est ici similaire au didacticiel de formation multi-GPU avec une différence clé: lors de l' utilisation de prévision pour la formation multi-travailleurs, il est nécessaire de shard l'ensemble de données par le nombre de travailleurs pour assurer la convergence des modèles. Les données d'entrée sont fragmentées par l' indice de travail, de sorte que chaque processus de travail 1/num_workers portions distinctes de l'ensemble de données.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Une autre approche raisonnable pour atteindre la convergence serait de mélanger l'ensemble de données avec des graines distinctes pour chaque travailleur.

Configuration multi-travailleurs

L' une des principales différences dans ce tutoriel ( par rapport au didacticiel de formation multi-GPU ) est la configuration multi-travailleurs. La TF_CONFIG variable d'environnement est le moyen standard pour spécifier la configuration du cluster à chaque travailleur qui fait partie du groupe.

Il y a deux composantes de TF_CONFIG : cluster et task . cluster fournit des informations sur l'ensemble du cluster, à savoir les travailleurs et les serveurs de paramètres dans le cluster. task fournit des informations sur la tâche en cours. Le premier volet cluster est le même pour tous les travailleurs et les serveurs de paramètres du cluster, et le deuxième composant task est différent sur chaque travailleur et le serveur paramètres et spécifie son propre type et l' index . Dans cet exemple, la tâche de type est worker et la tâche index est 0 .

À titre d'illustration, ce tutoriel montre comment définir un TF_CONFIG avec 2 travailleurs sur localhost . Dans la pratique, vous devez créer plusieurs travailleurs sur une adresse IP externe et le port, et ensemble TF_CONFIG de chaque travailleur de manière appropriée, par exemple modifier la tâche index .

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Définir le modèle

Écrivez les couches, l'optimiseur et la fonction de perte pour l'entraînement. Ce tutoriel définit le modèle avec des couches KERAS, similaires au didacticiel de formation multi-GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Pour former le modèle, utilisez une instance de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy crée des copies de toutes les variables dans les couches du modèle sur chaque appareil à tous les travailleurs. Il utilise CollectiveOps , une séance de tensorflow pour la communication collective, à des gradients globaux et garder les variables de synchronisation. Le tf.distribute.Strategy Guide a plus de détails sur cette stratégie.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_1884/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Former et évaluer le modèle

Ensuite, préciser la stratégie de distribution dans le RunConfig pour l'estimateur, et former et d' évaluer en invoquant tf.estimator.train_and_evaluate . Ce tutoriel ne distribue que la formation en spécifiant la stratégie via train_distribute . Il est également possible de répartir l'évaluation par eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7fa86c4c8950>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:374: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2021-09-09 01:25:08.941607: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2021-09-09 01:25:08.942715: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.135354.
INFO:tensorflow:Loss for final step: 1.135354.
({'loss': 2.2692595, 'global_step': 938}, [])

Optimiser les performances d'entraînement

Vous avez maintenant un modèle et un estimateur capable multi-travailleur alimenté par tf.distribute.Strategy . Vous pouvez essayer les techniques suivantes pour optimiser les performances de la formation multi-travailleurs :

  • Augmenter la taille du lot: La taille du lot spécifié ici est par-GPU. En général, la plus grande taille de lot adaptée à la mémoire du GPU est recommandée.
  • Les variables fonte: Monter les variables tf.float si possible. Le modèle de ResNet officiel comprend un exemple de la façon dont cela peut être fait.
  • Utiliser la communication collective: MultiWorkerMirroredStrategy fournit de multiples implémentations de communication collective .

    • RING instruments collectifs à base d' anneau à l' aide de grpc comme couche de communication inter-hôte.
    • NCCL utilise NCCL de Nvidia pour mettre en œuvre des collectifs.
    • AUTO reporte le choix à l'exécution.

    Le meilleur choix d'implémentation collective dépend du nombre et du type de GPU, ainsi que de l'interconnexion réseau dans le cluster. Pour passer outre le choix automatique, spécifiez une valeur valide à la communication paramètre de MultiWorkerMirroredStrategy constructeur de, par exemple , la communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Visitez la section Performance dans le guide pour en savoir plus sur d' autres stratégies et des outils que vous pouvez utiliser pour optimiser les performances de vos modèles de tensorflow.

Autres exemples de code

  1. Mettre fin à l' exemple de fin pour la formation des travailleurs dans plusieurs tensorflow / écosystème en utilisant des modèles Kubernetes. Cet exemple commence par un modèle Keras et le convertit en un estimateur en utilisant l' tf.keras.estimator.model_to_estimator API.
  2. Modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.