Cette page a été traduite par l'API Cloud Translation.
Switch to English

Formation distribuée avec TensorFlow

Voir sur TensorFlow.org Exécuter dans Google Colab Afficher la source sur GitHub Télécharger le cahier

Aperçu

tf.distribute.Strategy est une API TensorFlow pour distribuer la formation sur plusieurs GPU, plusieurs machines ou TPU. À l'aide de cette API, vous pouvez distribuer vos modèles existants et votre code de formation avec des modifications de code minimales.

tf.distribute.Strategy a été conçu avec ces objectifs clés à l'esprit:

  • Facile à utiliser et prend en charge plusieurs segments d'utilisateurs, y compris les chercheurs, les ingénieurs ML, etc.
  • Fournir de bonnes performances hors de la boîte.
  • Commutation facile entre les stratégies.

tf.distribute.Strategy peut être utilisé avec une API de haut niveau comme Keras , et peut également être utilisé pour distribuer des boucles d'entraînement personnalisées (et, en général, tout calcul utilisant TensorFlow).

Dans TensorFlow 2.x, vous pouvez exécuter vos programmes avec empressement, ou dans un graphe en utilisant tf.function . tf.distribute.Strategy intention de prendre en charge ces deux modes d'exécution, mais fonctionne mieux avec tf.function . Le mode Eager n'est recommandé qu'à des fins de débogage et n'est pas pris en charge pour TPUStrategy . Bien que nous parlions la plupart du temps de la formation dans ce guide, cette API peut également être utilisée pour distribuer l'évaluation et la prédiction sur différentes plates-formes.

Vous pouvez utiliser tf.distribute.Strategy avec très peu de changements dans votre code, car nous avons changé les composants sous-jacents de TensorFlow pour devenir sensibles à la stratégie. Cela inclut les variables, les couches, les modèles, les optimiseurs, les métriques, les résumés et les points de contrôle.

Dans ce guide, nous expliquons différents types de stratégies et comment vous pouvez les utiliser dans différentes situations.

 # Import TensorFlow
import tensorflow as tf
 

Types de stratégies

tf.distribute.Strategy intention de couvrir un certain nombre de cas d'utilisation selon différents axes. Certaines de ces combinaisons sont actuellement prises en charge et d'autres seront ajoutées à l'avenir. Certains de ces axes sont:

  • Entraînement synchrone ou asynchrone: il s'agit de deux méthodes courantes de distribution de l'entraînement avec parallélisme de données. Lors de la formation synchronisée, tous les employés s'entraînent sur différentes tranches de données d'entrée synchronisées et agrégent des gradients à chaque étape. Dans la formation asynchrone, tous les travailleurs s'entraînent indépendamment sur les données d'entrée et mettent à jour les variables de manière asynchrone. En règle générale, la formation à la synchronisation est prise en charge via tout-réduire et asynchrone via l'architecture de serveur de paramètres.
  • Plate-forme matérielle: vous souhaiterez peut-être faire évoluer votre formation sur plusieurs GPU sur une machine, ou sur plusieurs machines dans un réseau (avec 0 GPU ou plus chacune), ou sur Cloud TPU.

Afin de prendre en charge ces cas d'utilisation, six stratégies sont disponibles. Dans la section suivante, nous expliquons lesquels de ceux-ci sont pris en charge dans quels scénarios dans TF 2.2 à ce stade. Voici un bref aperçu:

API de formation MirroredStrategy TPUStratégie MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API Keras Prise en charge Prise en charge Support expérimental Support expérimental Posté planifié 2.3 pris en charge
Boucle d'entraînement personnalisée Prise en charge Prise en charge Support expérimental Support expérimental Posté planifié 2.3 pris en charge
API Estimator Assistance limitée Non supporté Assistance limitée Assistance limitée Assistance limitée

MirroredStrategy

tf.distribute.MirroredStrategy prend en charge la formation distribuée synchrone sur plusieurs GPU sur une machine. Il crée une réplique par périphérique GPU. Chaque variable du modèle est reflétée dans toutes les répliques. Ensemble, ces variables forment une seule variable conceptuelle appelée MirroredVariable . Ces variables sont synchronisées les unes avec les autres en appliquant des mises à jour identiques.

Des algorithmes efficaces de réduction totale sont utilisés pour communiquer les mises à jour des variables sur les appareils. All-reduction agrège les tenseurs sur tous les appareils en les additionnant et les rend disponibles sur chaque appareil. C'est un algorithme fusionné qui est très efficace et peut réduire considérablement la charge de synchronisation. Il existe de nombreux algorithmes et implémentations tout-réduire disponibles, selon le type de communication disponible entre les appareils. Par défaut, il utilise NVIDIA NCCL comme implémentation de réduction totale. Vous pouvez choisir parmi quelques autres options que nous proposons ou écrire la vôtre.

Voici le moyen le plus simple de créer MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Cela créera une instance MirroredStrategy qui utilisera tous les GPU visibles par TensorFlow et utilisera NCCL comme communication entre appareils.

Si vous souhaitez utiliser uniquement certains GPU sur votre machine, vous pouvez le faire comme ceci:

 mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
 
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Si vous souhaitez remplacer la communication entre périphériques, vous pouvez le faire en utilisant l'argument cross_device_ops en fournissant une instance de tf.distribute.CrossDeviceOps . Actuellement, tf.distribute.HierarchicalCopyAllReduce et tf.distribute.ReductionToOneDevice sont deux options autres que tf.distribute.NcclAllReduce qui est la valeur par défaut.

 mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStratégie

tf.distribute.TPUStrategy vous permet d'exécuter votre formation TensorFlow sur des unités de traitement Tensor (TPU). Les TPU sont les ASIC spécialisés de Google conçus pour accélérer considérablement les charges de travail d'apprentissage automatique. Ils sont disponibles sur Google Colab, TensorFlow Research Cloud et Cloud TPU .

En termes d'architecture de formation distribuée, TPUStrategy est la même MirroredStrategy - il implémente la formation distribuée synchrone. Les TPU fournissent leur propre implémentation d'opérations efficaces tout-réduire et autres opérations collectives sur plusieurs cœurs TPU, qui sont utilisées dans TPUStrategy .

Voici comment instancier TPUStrategy :

 cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
 

L'instance TPUClusterResolver permet de localiser les TPU. Dans Colab, vous n'avez pas besoin de lui spécifier d'arguments.

Si vous souhaitez l'utiliser pour les Cloud TPU:

  • Vous devez spécifier le nom de votre ressource TPU dans l'argument tpu .
  • Vous devez initialiser le système tpu explicitement au démarrage du programme. Ceci est nécessaire avant que les TPU puissent être utilisés pour le calcul. L'initialisation du système tpu efface également la mémoire TPU, il est donc important de terminer cette étape en premier afin d'éviter de perdre l'état.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy est très similaire à MirroredStrategy . Il met en œuvre une formation distribuée synchrone sur plusieurs travailleurs, chacun avec potentiellement plusieurs GPU. Similaire à MirroredStrategy , il crée des copies de toutes les variables du modèle sur chaque appareil sur tous les travailleurs.

Il utilise CollectiveOps comme méthode de communication multi-ouvriers à réduction totale utilisée pour synchroniser les variables. Une opération collective est une opération unique dans le graphe TensorFlow qui peut automatiquement choisir un algorithme de réduction totale dans l'environnement d'exécution TensorFlow en fonction du matériel, de la topologie du réseau et des tailles de tenseur.

Il implémente également des optimisations de performances supplémentaires. Par exemple, il inclut une optimisation statique qui convertit plusieurs réductions totales sur de petits tenseurs en moins de réductions globales sur des tenseurs plus grands. De plus, nous le concevons pour avoir une architecture de plugins - de sorte qu'à l'avenir, vous serez en mesure de brancher des algorithmes mieux adaptés à votre matériel. Notez que les opérations collectives mettent également en œuvre d'autres opérations collectives telles que la diffusion et le regroupement général.

Voici le moyen le plus simple de créer MultiWorkerMirroredStrategy :

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy vous permet actuellement de choisir entre deux implémentations différentes d'opérations collectives. CollectiveCommunication.RING implémente des CollectiveCommunication.RING en anneau en utilisant gRPC comme couche de communication. CollectiveCommunication.NCCL utilise le NCCL de Nvidia pour mettre en œuvre des collectifs. CollectiveCommunication.AUTO reporte le choix au runtime. Le meilleur choix d'implémentation collective dépend du nombre et du type de GPU, ainsi que de l'interconnexion réseau dans le cluster. Vous pouvez les spécifier de la manière suivante:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

L'une des principales différences entre la formation multi-ouvriers et la formation multi-GPU est la configuration multi-ouvriers. La variable d'environnement TF_CONFIG est le moyen standard dans TensorFlow de spécifier la configuration du cluster à chaque worker qui fait partie du cluster. En savoir plus sur la configuration de TF_CONFIG .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy effectue également un entraînement synchrone. Les variables ne sont pas mises en miroir, mais sont placées sur le processeur et les opérations sont répliquées sur tous les GPU locaux. S'il n'y a qu'un seul GPU, toutes les variables et opérations seront placées sur ce GPU.

Créez une instance de CentralStorageStrategy en:

 central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
 
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Cela créera une instance CentralStorageStrategy qui utilisera tous les GPU et CPU visibles. La mise à jour des variables sur les réplicas sera agrégée avant d'être appliquée aux variables.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy prend en charge la formation des serveurs de paramètres sur plusieurs machines. Dans cette configuration, certaines machines sont désignées comme travailleurs et d'autres comme serveurs de paramètres. Chaque variable du modèle est placée sur un serveur de paramètres. Le calcul est répliqué sur tous les GPU de tous les travailleurs.

En termes de code, cela ressemble à d'autres stratégies:

 ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
 

Pour la formation multi-ouvriers, TF_CONFIG doit spécifier la configuration des serveurs de paramètres et des nœuds de calcul dans votre cluster, dont vous pouvez en savoir plus dans TF_CONFIG ci-dessous .

Autres stratégies

En plus des stratégies ci-dessus, il existe deux autres stratégies qui pourraient être utiles pour le prototypage et le débogage lors de l'utilisation des API tf.distribute .

Stratégie par défaut

La stratégie par défaut est une stratégie de distribution qui est présente lorsqu'aucune stratégie de distribution explicite n'est dans la portée. Il implémente l'interface tf.distribute.Strategy , mais est un pass-through et ne fournit aucune distribution réelle. Par exemple, strategy.run(fn) appellera simplement fn . Le code écrit en utilisant cette stratégie doit se comporter exactement comme le code écrit sans aucune stratégie. Vous pouvez le considérer comme une stratégie «sans opération».

La stratégie par défaut est un singleton - et on ne peut pas en créer plus d'instances. Il peut être obtenu en utilisant tf.distribute.get_strategy() dehors de la portée de toute stratégie explicite (la même API qui peut être utilisée pour obtenir la stratégie actuelle dans la portée d'une stratégie explicite).

 default_strategy = tf.distribute.get_strategy()
 

Cette stratégie a deux objectifs principaux:

  • Il permet d'écrire du code de bibliothèque compatible avec la distribution sans condition. Par exemple, dans l'optimiseur, nous pouvons faire tf.distribute.get_strategy() et utiliser cette stratégie pour réduire les dégradés - il retournera toujours un objet de stratégie sur lequel nous pouvons appeler l'API de réduction.
 # In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
 
1.0
  • Semblable au code de bibliothèque, il peut être utilisé pour écrire des programmes d'utilisateurs finaux pour travailler avec et sans stratégie de distribution, sans nécessiter de logique conditionnelle. Un exemple d'extrait de code illustrant ceci:
 if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
 
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy est une stratégie pour placer toutes les variables et les calculs sur un seul périphérique spécifié.

 strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
 

Cette stratégie se distingue de la stratégie par défaut de plusieurs manières. Dans la stratégie par défaut, la logique de placement des variables reste inchangée par rapport à l'exécution de TensorFlow sans aucune stratégie de distribution. Mais lors de l'utilisation de OneDeviceStrategy , toutes les variables créées dans sa portée sont explicitement placées sur l'appareil spécifié. De plus, toutes les fonctions appelées via OneDeviceStrategy.run seront également placées sur l'appareil spécifié.

L'entrée distribuée via cette stratégie sera pré-extraite vers le périphérique spécifié. Dans la stratégie par défaut, il n'y a pas de distribution d'entrée.

Semblable à la stratégie par défaut, cette stratégie peut également être utilisée pour tester votre code avant de passer à d'autres stratégies qui se distribuent réellement à plusieurs appareils / machines. Cela exercera un peu plus la machinerie de la stratégie de distribution que la stratégie par défaut, mais pas dans toute la mesure de l'utilisation de MirroredStrategy ou TPUStrategy etc. Si vous voulez du code qui se comporte comme si aucune stratégie, utilisez la stratégie par défaut.

Jusqu'à présent, nous avons parlé des différentes stratégies disponibles et de la manière dont vous pouvez les instancier. Dans les prochaines sections, nous parlerons des différentes manières dont vous pouvez les utiliser pour diffuser votre formation. Nous montrerons des extraits de code courts dans ce guide et un lien vers des didacticiels complets que vous pouvez exécuter de bout en bout.

Utilisation de tf.distribute.Strategy avec tf.keras.Model.fit

Nous avons intégré tf.distribute.Strategy dans tf.keras qui est l'implémentation par TensorFlow de la spécification de l'API Keras . tf.keras est une API de haut niveau pour créer et entraîner des modèles. En intégrant le backend tf.keras , nous avons rendu la distribution transparente de votre formation écrite dans le cadre de formation Keras à l'aide de model.fit .

Voici ce que vous devez modifier dans votre code:

  1. Créez une instance de la tf.distribute.Strategy appropriée.
  2. Déplacez la création du modèle, de l'optimiseur et des métriques Keras dans strategy.scope .

Nous prenons en charge tous les types de modèles Keras - séquentiels, fonctionnels et sous-classés.

Voici un extrait de code pour le faire pour un modèle Keras très simple avec une couche dense:

 mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Dans cet exemple, nous avons utilisé MirroredStrategy afin de pouvoir l'exécuter sur une machine avec plusieurs GPU. strategy.scope() indique à Keras la stratégie à utiliser pour distribuer la formation. La création de modèles / optimiseurs / métriques à l'intérieur de cette portée nous permet de créer des variables distribuées au lieu de variables régulières. Une fois que cela est configuré, vous pouvez adapter votre modèle comme vous le feriez normalement. MirroredStrategy se charge de répliquer l'entraînement du modèle sur les GPU disponibles, d'agréger les gradients, etc.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
 
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.0035
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.4436
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.2755

0.27546340227127075

Ici, nous avons utilisé un tf.data.Dataset de données tf.data.Dataset pour fournir la formation et l'évaluation. Vous pouvez également utiliser des tableaux numpy:

 import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
 
Epoch 1/2
10/10 [==============================] - 0s 1ms/step - loss: 0.1961
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.0867

<tensorflow.python.keras.callbacks.History at 0x7fa17808e240>

Dans les deux cas (jeu de données ou numpy), chaque lot de l'entrée donnée est divisé également entre les multiples répliques. Par exemple, si vous utilisez MirroredStrategy avec 2 GPU, chaque lot de taille 10 sera divisé entre les 2 GPU, chacun recevant 5 exemples d'entrée à chaque étape. Chaque époque s'entraînera ensuite plus rapidement à mesure que vous ajoutez plus de GPU. En règle générale, vous souhaiterez augmenter la taille de votre lot au fur et à mesure que vous ajoutez des accélérateurs afin d'utiliser efficacement la puissance de calcul supplémentaire. Vous devrez également réajuster votre taux d'apprentissage, en fonction du modèle. Vous pouvez utiliser strategy.num_replicas_in_sync pour obtenir le nombre de répliques.

 # Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
 

Qu'est-ce qui est pris en charge maintenant?

API de formation MirroredStrategy TPUStratégie MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API Keras Prise en charge Prise en charge Support expérimental Support expérimental Support prévu post 2.3

Exemples et tutoriels

Voici une liste de didacticiels et d'exemples qui illustrent l'intégration ci-dessus de bout en bout avec Keras:

  1. Tutoriel pour former MNIST avec MirroredStrategy .
  2. Tutoriel pour former MNIST à l'aide de MultiWorkerMirroredStrategy .
  3. Guide de formation MNIST à l'aide de TPUStrategy .
  4. Référentiel TensorFlow Model Garden contenant des collections de modèles de pointe implémentés à l'aide de diverses stratégies.

Utilisation de tf.distribute.Strategy avec des boucles d'entraînement personnalisées

Comme vous l'avez vu, l'utilisation de tf.distribute.Strategy avec Keras model.fit nécessite de modifier que quelques lignes de votre code. Avec un peu plus d'effort, vous pouvez également utiliser tf.distribute.Strategy avec des boucles d'entraînement personnalisées.

Si vous avez besoin de plus de flexibilité et de contrôle sur vos boucles d'entraînement qu'avec Estimator ou Keras, vous pouvez écrire des boucles d'entraînement personnalisées. Par exemple, lorsque vous utilisez un GAN, vous voudrez peut-être effectuer un nombre différent de pas de générateur ou de discriminateur à chaque tour. De même, les cadres de haut niveau ne sont pas très adaptés à la formation d'apprentissage par renforcement.

Pour prendre en charge les boucles d'entraînement personnalisées, nous fournissons un ensemble de méthodes de base via les classes tf.distribute.Strategy . Leur utilisation peut nécessiter une restructuration mineure du code au départ, mais une fois que cela est fait, vous devriez pouvoir basculer entre les GPU, les TPU et plusieurs machines simplement en modifiant l'instance de stratégie.

Ici, nous allons montrer un bref extrait illustrant ce cas d'utilisation pour un exemple de formation simple utilisant le même modèle Keras qu'auparavant.

Tout d'abord, nous créons le modèle et l'optimiseur dans le cadre de la stratégie. Cela garantit que toutes les variables créées avec le modèle et l'optimiseur sont des variables en miroir.

 with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
 

Ensuite, nous créons l'ensemble de données d'entrée et appelons tf.distribute.Strategy.experimental_distribute_dataset pour distribuer l'ensemble de données en fonction de la stratégie.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
 

Ensuite, nous définissons une étape de la formation. Nous utiliserons tf.GradientTape pour calculer les gradients et l'optimiseur pour appliquer ces gradients pour mettre à jour les variables de notre modèle. Pour distribuer cette étape de formation, nous mettons dans une fonction train_step et la passons à tf.distrbute.Strategy.run avec les entrées de l'ensemble de données que nous obtenons de dist_dataset créé avant:

 loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
 

Quelques autres choses à noter dans le code ci-dessus:

  1. Nous avons utilisé tf.nn.compute_average_loss pour calculer la perte. tf.nn.compute_average_loss la perte par exemple et divise la somme par global_batch_size. Ceci est important car plus tard, une fois les gradients calculés sur chaque réplique, ils sont agrégés sur les répliques en les additionnant .
  2. Nous avons utilisé l'API tf.distribute.Strategy.reduce pour agréger les résultats renvoyés par tf.distribute.Strategy.run . tf.distribute.Strategy.run renvoie les résultats de chaque réplica local de la stratégie, et il existe plusieurs façons de consommer ce résultat. Vous pouvez les reduce pour obtenir une valeur agrégée. Vous pouvez également faire tf.distribute.Strategy.experimental_local_results pour obtenir la liste des valeurs contenues dans le résultat, une par réplique locale.
  3. Lorsque apply_gradients est appelé dans une portée de stratégie de distribution, son comportement est modifié. Plus précisément, avant d'appliquer des dégradés sur chaque instance parallèle pendant l'entraînement synchrone, il effectue une somme sur tous les répliques des dégradés.

Enfin, une fois que nous avons défini l'étape de formation, nous pouvons parcourir dist_dataset et exécuter la formation en boucle:

 for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
 
tf.Tensor(0.4155251, shape=(), dtype=float32)
tf.Tensor(0.41321823, shape=(), dtype=float32)
tf.Tensor(0.4109319, shape=(), dtype=float32)
tf.Tensor(0.40866604, shape=(), dtype=float32)
tf.Tensor(0.40642032, shape=(), dtype=float32)
tf.Tensor(0.40419456, shape=(), dtype=float32)
tf.Tensor(0.4019885, shape=(), dtype=float32)
tf.Tensor(0.399802, shape=(), dtype=float32)
tf.Tensor(0.39763477, shape=(), dtype=float32)
tf.Tensor(0.3954866, shape=(), dtype=float32)
tf.Tensor(0.39335734, shape=(), dtype=float32)
tf.Tensor(0.3912467, shape=(), dtype=float32)
tf.Tensor(0.38915452, shape=(), dtype=float32)
tf.Tensor(0.38708064, shape=(), dtype=float32)
tf.Tensor(0.38502476, shape=(), dtype=float32)
tf.Tensor(0.38298675, shape=(), dtype=float32)
tf.Tensor(0.38096642, shape=(), dtype=float32)
tf.Tensor(0.3789635, shape=(), dtype=float32)
tf.Tensor(0.3769779, shape=(), dtype=float32)
tf.Tensor(0.37500936, shape=(), dtype=float32)

Dans l'exemple ci-dessus, nous avons itéré sur dist_dataset pour fournir une entrée à votre entraînement. Nous fournissons également le tf.distribute.Strategy.make_experimental_numpy_dataset pour prendre en charge les entrées numpy. Vous pouvez utiliser cette API pour créer un ensemble de données avant d'appeler tf.distribute.Strategy.experimental_distribute_dataset .

Une autre façon d'itérer vos données consiste à utiliser explicitement des itérateurs. Vous souhaiterez peut-être effectuer cette opération lorsque vous souhaitez exécuter un nombre d'étapes donné plutôt que d'itérer sur l'ensemble de données. L'itération ci-dessus serait maintenant modifiée pour créer d'abord un itérateur, puis appeler explicitement le next pour obtenir les données d'entrée.

 iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
 
tf.Tensor(0.37305772, shape=(), dtype=float32)
tf.Tensor(0.3711228, shape=(), dtype=float32)
tf.Tensor(0.3692044, shape=(), dtype=float32)
tf.Tensor(0.36730233, shape=(), dtype=float32)
tf.Tensor(0.3654165, shape=(), dtype=float32)
tf.Tensor(0.36354658, shape=(), dtype=float32)
tf.Tensor(0.36169255, shape=(), dtype=float32)
tf.Tensor(0.3598542, shape=(), dtype=float32)
tf.Tensor(0.35803124, shape=(), dtype=float32)
tf.Tensor(0.3562236, shape=(), dtype=float32)

Cela couvre le cas le plus simple d'utilisation de l'API tf.distribute.Strategy pour distribuer des boucles d'entraînement personnalisées. Nous sommes en train d'améliorer ces API. Étant donné que ce cas d'utilisation nécessite plus de travail pour adapter votre code, nous publierons un guide détaillé séparé à l'avenir.

Qu'est-ce qui est pris en charge maintenant?

API de formation MirroredStrategy TPUStratégie MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Boucle d'entraînement personnalisée Prise en charge Prise en charge Support expérimental Support expérimental Support prévu post 2.3

Exemples et tutoriels

Voici quelques exemples d'utilisation de la stratégie de distribution avec des boucles d'entraînement personnalisées:

  1. Tutoriel pour former MNIST à l'aide de MirroredStrategy .
  2. Guide de formation MNIST à l'aide de TPUStrategy .
  3. Référentiel TensorFlow Model Garden contenant des collections de modèles de pointe implémentés à l'aide de diverses stratégies.

Utilisation de tf.distribute.Strategy avec Estimator (prise en charge limitée)

tf.estimator est une API TensorFlow de formation distribuée qui prenait à l'origine en charge l'approche de serveur de paramètres async. Comme avec Keras, nous avons intégré tf.distribute.Strategy dans tf.Estimator . Si vous utilisez Estimator pour votre formation, vous pouvez facilement passer à la formation distribuée avec très peu de modifications de votre code. Grâce à cela, les utilisateurs d'Estimator peuvent désormais effectuer une formation distribuée synchrone sur plusieurs GPU et plusieurs travailleurs, ainsi qu'utiliser des TPU. Ce support dans Estimator est cependant limité. Voir la section Ce qui est pris en charge maintenant ci-dessous pour plus de détails.

L'utilisation de tf.distribute.Strategy avec Estimator est légèrement différente de celle du cas Keras. Au lieu d'utiliser strategy.scope , nous passons maintenant l'objet de stratégie dans le RunConfig pour l'estimateur.

Voici un extrait de code qui montre cela avec un Estimator LinearRegressor et MirroredStrategy LinearRegressor :

 mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp2ack9oru
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp2ack9oru', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Nous utilisons ici un Estimator prédéfini, mais le même code fonctionne également avec un Estimator personnalisé. train_distribute détermine comment la formation sera distribuée, et eval_distribute détermine comment l'évaluation sera distribuée. C'est une autre différence par rapport à Keras où nous utilisons la même stratégie pour l'entraînement et l'évaluation.

Nous pouvons maintenant former et évaluer cet Estimator avec une fonction d'entrée:

 def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
 
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-08-04T20:28:12Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp2ack9oru/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.20350s
INFO:tensorflow:Finished evaluation at 2020-08-04-20:28:12
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmp2ack9oru/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Une autre différence à souligner ici entre Estimator et Keras est la gestion des entrées. Dans Keras, nous avons mentionné que chaque lot de l'ensemble de données est divisé automatiquement entre les multiples réplicas. Dans Estimator, cependant, nous n'effectuons pas de fractionnement automatique du lot, ni de partage automatique des données entre différents travailleurs. Vous avez un contrôle total sur la façon dont vous souhaitez que vos données soient distribuées entre les travailleurs et les périphériques, et vous devez fournir un input_fn pour spécifier comment distribuer vos données.

Votre input_fn est appelé une fois par travailleur, donnant ainsi un ensemble de données par travailleur. Ensuite, un lot de cet ensemble de données est transmis à un réplica sur ce worker, consommant ainsi N lots pour N réplicas sur 1 worker. En d'autres termes, l'ensemble de données renvoyé par input_fn doit fournir des lots de taille PER_REPLICA_BATCH_SIZE . Et la taille globale du lot pour une étape peut être obtenue sous la forme PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

Lorsque vous effectuez une formation multi-ouvriers, vous devez soit diviser vos données entre les travailleurs, soit mélanger avec une graine aléatoire sur chacun. Vous pouvez voir un exemple de la façon de procéder dans la formation multi-ouvriers avec Estimator .

Et de même, vous pouvez également utiliser des stratégies de serveur de paramètres et de travail multiples. Le code reste le même, mais vous devez utiliser tf.estimator.train_and_evaluate et définir les variables d'environnement TF_CONFIG pour chaque binaire exécuté dans votre cluster.

Qu'est-ce qui est pris en charge maintenant?

La prise en charge de la formation avec Estimator en utilisant toutes les stratégies à l'exception de TPUStrategy . La formation et l'évaluation de base devraient fonctionner, mais un certain nombre de fonctionnalités avancées telles que l'échafaudage ne fonctionnent pas encore. Il peut également y avoir un certain nombre de bogues dans cette intégration. Pour le moment, nous ne prévoyons pas d'améliorer activement cette prise en charge, mais nous nous concentrons plutôt sur Keras et la prise en charge de la boucle d'entraînement personnalisée. Dans la mesure du possible, vous devriez préférer utiliser tf.distribute avec ces API à la place.

API de formation MirroredStrategy TPUStratégie MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API Estimator Assistance limitée Non supporté Assistance limitée Assistance limitée Assistance limitée

Exemples et tutoriels

Voici quelques exemples qui montrent l'utilisation de bout en bout de diverses stratégies avec Estimator:

  1. Formation multi-travailleurs avec Estimator pour former MNIST avec plusieurs travailleurs à l'aide de MultiWorkerMirroredStrategy .
  2. Exemple de bout en bout pour une formation multi-ouvriers dans tensorflow / écosystème à l'aide de modèles Kubernetes. Cet exemple commence par un modèle Keras et le convertit en Estimator à l'aide de l'API tf.keras.estimator.model_to_estimator .
  3. Modèle officiel ResNet50 , qui peut être formé à l'aide de MirroredStrategy ou MultiWorkerMirroredStrategy .

Autres sujets

Dans cette section, nous aborderons certains sujets pertinents pour plusieurs cas d'utilisation.

Configuration de la variable d'environnement TF_CONFIG

Pour la formation multi-ouvriers, comme mentionné précédemment, vous devez définir la variable d'environnement TF_CONFIG pour chaque binaire exécuté dans votre cluster. La variable d'environnement TF_CONFIG est une chaîne JSON qui spécifie quelles tâches constituent un cluster, leurs adresses et le rôle de chaque tâche dans le cluster. Nous fournissons un modèle Kubernetes dans le référentiel tensorflow / écosystème qui définit TF_CONFIG pour vos tâches de formation.

Il existe deux composants de TF_CONFIG: cluster et tâche. cluster fournit des informations sur le cluster de formation, qui est un dict composé de différents types d'emplois tels que worker. Dans la formation multi-ouvriers, il y a généralement un ouvrier qui assume un peu plus de responsabilités comme la sauvegarde du point de contrôle et l'écriture d'un fichier récapitulatif pour TensorBoard en plus de ce qu'un ouvrier régulier fait. Un tel travailleur est appelé le «chef», et il est de coutume que le travailleur d'indice 0 soit désigné comme ouvrier en chef (en fait, c'est ainsi que tf.distribute.Strategy est mis en œuvre). tâche, d'autre part, fournit des informations sur la tâche en cours. Le premier cluster de composants est le même pour tous les nœuds de calcul et la deuxième tâche de composant est différente pour chaque opérateur et spécifie le type et l'index de ce nœud de calcul.

Un exemple de TF_CONFIG est:

 os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})
 

Ce TF_CONFIG spécifie qu'il existe trois TF_CONFIG et deux tâches ps dans le cluster avec leurs hôtes et leurs ports. La partie "tâche" spécifie que le rôle de la tâche en cours dans le cluster, worker 1 (le second worker). Les rôles valides dans un cluster sont «chef», «travailleur», «ps» et «évaluateur». Il ne devrait y avoir aucun travail "ps" sauf lors de l'utilisation de tf.distribute.experimental.ParameterServerStrategy .

Et après?

tf.distribute.Strategy est en cours de développement. Nous vous invitons à l'essayer et à fournir vos commentaires en utilisant les problèmes GitHub .