Réserve cette date! Google I / O revient du 18 au 20 mai S'inscrire maintenant
Cette page a été traduite par l'API Cloud Translation.
Switch to English

Formation multi-travailleurs avec Keras

Voir sur TensorFlow.org Exécuter dans Google Colab Afficher la source sur GitHub Télécharger le cahier

Aperçu

Ce didacticiel présente une formation distribuée multi-ouvriers avec le modèle Keras à l'aide de l'API tf.distribute.Strategy , en particulier tf.distribute.MultiWorkerMirroredStrategy . Avec l'aide de cette stratégie, un modèle Keras qui a été conçu pour s'exécuter sur un seul travailleur peut fonctionner de manière transparente sur plusieurs travailleurs avec un changement de code minimal.

Le guide Formation distribuée dans TensorFlow est disponible pour un aperçu des stratégies de distribution prises en charge par TensorFlow pour ceux qui souhaitent une meilleure compréhension des API tf.distribute.Strategy .

Installer

Premièrement, quelques importations nécessaires.

import json
import os
import sys

Avant d'importer TensorFlow, apportez quelques modifications à l'environnement.

Désactivez tous les GPU. Cela évite les erreurs causées par les travailleurs qui essaient tous d'utiliser le même GPU. Pour une application réelle, chaque travailleur serait sur une machine différente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Réinitialisez la variable d'environnement TF_CONFIG , vous en verrez plus tard.

os.environ.pop('TF_CONFIG', None)

Assurez-vous que le répertoire actuel est sur le chemin de python. Cela permet au notebook d'importer ultérieurement les fichiers écrits par %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Importez maintenant TensorFlow.

import tensorflow as tf

Jeu de données et définition du modèle

Créez ensuite un fichier mnist.py avec une configuration simple de modèle et de jeu de données. Ce fichier python sera utilisé par les processus de travail dans ce tutoriel:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Essayez de former le modèle pour un petit nombre d'époques et observez les résultats d'un seul travailleur pour vous assurer que tout fonctionne correctement. Au fur et à mesure que l'entraînement progresse, la perte devrait diminuer et la précision devrait augmenter.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2959 - accuracy: 0.0977
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2311 - accuracy: 0.2726
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1668 - accuracy: 0.4236
<tensorflow.python.keras.callbacks.History at 0x7f62c6ec0780>

Configuration multi-ouvriers

Entrons maintenant dans le monde de la formation multi-travailleurs. Dans TensorFlow, la variable d'environnement TF_CONFIG est requise pour l'entraînement sur plusieurs machines, chacune ayant éventuellement un rôle différent. TF_CONFIG est une chaîne JSON utilisée pour spécifier la configuration du cluster sur chaque worker qui fait partie du cluster.

Voici un exemple de configuration:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Voici le même TF_CONFIG sérialisé en tant que chaîne JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Il existe deux composants de TF_CONFIG : cluster et task .

  • cluster est le même pour tous les travailleurs et fournit des informations sur le cluster de formation, qui est un dict composé de différents types d'emplois tels que worker . Dans la formation multi-ouvriers avec MultiWorkerMirroredStrategy , il y a généralement un worker qui assume un peu plus de responsabilités comme la sauvegarde du point de contrôle et l'écriture d'un fichier récapitulatif pour TensorBoard en plus de ce qu'un worker régulier fait. Un tel travailleur est appelé le travailleur en chief , et il est de coutume que le worker avec l' index 0 soit désigné comme worker chef (en fait, c'est ainsi que tf.distribute.Strategy est implémenté).

  • task fournit des informations sur la tâche en cours et est différente pour chaque travailleur. Il spécifie le type et l' index de ce travailleur.

Dans cet exemple, vous définissez le type tâche sur "worker" et l' index tâche sur 0 . Cette machine est le premier ouvrier et sera désignée comme ouvrier principal et fera plus de travail que les autres. Notez que les autres machines devront également avoir la variable d'environnement TF_CONFIG définie, et qu'elle devrait avoir le même dict de cluster , mais un type tâche ou un index tâche différent selon les rôles de ces machines.

À des fins d'illustration, ce tutoriel montre comment définir un TF_CONFIG avec 2 workers sur localhost . En pratique, les utilisateurs créeraient plusieurs TF_CONFIG de TF_CONFIG sur des adresses / ports IP externes et définiraient TF_CONFIG sur chaque TF_CONFIG de manière appropriée.

Dans cet exemple, vous utiliserez 2 ouvriers, le TF_CONFIG du premier ouvrier est montré ci-dessus. Pour le deuxième travailleur, vous tf_config['task']['index']=1

Ci-dessus, tf_config n'est qu'une variable locale en python. Pour l'utiliser réellement pour configurer l'entraînement, ce dictionnaire doit être sérialisé en tant que JSON et placé dans la variable d'environnement TF_CONFIG .

Variables d'environnement et sous-processus dans les blocs-notes

Les sous-processus héritent des variables d'environnement de leur parent. Donc, si vous définissez une variable d'environnement dans ce processus de jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Vous pouvez accéder à la variable d'environnement à partir d'un sous-processus:

echo ${GREETINGS}
Hello TensorFlow!

Dans la section suivante, vous l'utiliserez pour passer le TF_CONFIG aux sous-processus de travail. Vous ne lanceriez jamais vraiment vos travaux de cette façon, mais c'est suffisant pour les besoins de ce didacticiel: pour illustrer un exemple minimal de plusieurs travailleurs.

Choisissez la bonne stratégie

Dans TensorFlow, il existe deux formes principales de formation distribuée:

  • Formation synchrone, où les étapes de formation sont synchronisées entre les travailleurs et les répliques, et
  • Formation asynchrone, où les étapes de formation ne sont pas strictement synchronisées.

MultiWorkerMirroredStrategy , qui est la stratégie recommandée pour la formation synchrone multi-ouvriers, sera MultiWorkerMirroredStrategy dans ce guide. Pour entraîner le modèle, utilisez une instance de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crée des copies de toutes les variables dans les couches du modèle sur chaque appareil sur tous les travailleurs. Il utilise CollectiveOps , une opération TensorFlow pour la communication collective, pour agréger les gradients et maintenir les variables synchronisées. Le guide tf.distribute.Strategy contient plus de détails sur cette stratégie.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fournit plusieurs implémentations via le paramètre CommunicationOptions . RING implémente des collectifs en anneau en utilisant gRPC comme couche de communication inter-hôte. NCCL utilise NCCL Nvidia pour mettre en œuvre des collectifs. AUTO reporte le choix au runtime. Le meilleur choix d'implémentation collective dépend du nombre et du type de GPU, ainsi que de l'interconnexion réseau dans le cluster.

Former le modèle

Avec l'intégration de l'API tf.distribute.Strategy dans tf.keras , le seul changement que vous apporterez pour distribuer la formation à plusieurs travailleurs consiste à inclure la création de modèle et l'appel model.compile() dans strategy.scope() . La portée de la stratégie de distribution dicte comment et où les variables sont créées, et dans le cas de MultiWorkerMirroredStrategy , les variables créées sont des MirroredVariable et elles sont répliquées sur chacun des MultiWorkerMirroredStrategy de MultiWorkerMirroredStrategy .

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Pour réellement fonctionner avec MultiWorkerMirroredStrategy vous devrez exécuter des processus de travail et leur passer un TF_CONFIG .

Comme le fichier mnist.py écrit précédemment, voici le main.py que chacun des workers exécutera:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Dans l'extrait de code ci-dessus, notez que global_batch_size , qui est passé à Dataset.batch , est défini sur per_worker_batch_size * num_workers . Cela garantit que chaque travailleur traite des lots d'exemples per_worker_batch_size quel que soit le nombre de travailleurs.

Le répertoire courant contient maintenant les deux fichiers Python:

ls *.py
main.py
mnist.py

Alors json-sérialisez le TF_CONFIG et ajoutez-le aux variables d'environnement:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Maintenant, vous pouvez lancer un processus de travail qui exécutera le main.py et utilisera le TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Il y a quelques choses à noter à propos de la commande ci-dessus:

  1. Il utilise le %%bash qui est un notebook "magique" pour exécuter certaines commandes bash.
  2. Il utilise l'indicateur --bg pour exécuter le processus bash en arrière-plan, car ce worker ne s'arrêtera pas. Il attend tous les ouvriers avant de démarrer.

Le processus de travail en arrière-plan n'imprimera pas la sortie sur ce bloc-notes, donc le &> redirige sa sortie vers un fichier, afin que vous puissiez voir ce qui s'est passé.

Alors, attendez quelques secondes que le processus démarre:

import time
time.sleep(10)

Maintenant, regardez ce qui a été sorti dans le fichier journal du travailleur jusqu'à présent:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

La dernière ligne du fichier journal devrait indiquer: Started server with target: grpc://localhost:12345 . Le premier travailleur est maintenant prêt et attend que tous les autres travailleurs soient prêts à continuer.

tf_config donc à jour le tf_config pour que le processus du deuxième travailleur tf_config charge:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lancez maintenant le deuxième worker. Cela démarrera la formation puisque tous les travailleurs sont actifs (il n'est donc pas nécessaire de faire l'historique de ce processus):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511
2021-02-23 02:20:43.794926: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:45.375845: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:45.376779: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:46.347650: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:46.347716: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347726: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347847: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:46.347887: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:46.347898: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:46.348715: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:46.349096: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.349700: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.353497: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:46.353936: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-02-23 02:20:47.285814: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.507974: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508360: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz

Maintenant, si vous revérifiez les journaux écrits par le premier travailleur, vous verrez qu'il a participé à la formation de ce modèle:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-02-23 02:20:47.286117: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.508657: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508964: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

Sans surprise, cela s'est déroulé plus lentement que le test exécuté au début de ce didacticiel. L'exécution de plusieurs travailleurs sur une seule machine ne fait qu'ajouter des frais généraux. Le but ici n'était pas d'améliorer le temps de formation, mais seulement de donner un exemple de formation multi-ouvriers.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formation multi-ouvriers en profondeur

Jusqu'à présent, ce didacticiel a présenté une configuration de base multi-ouvriers. Le reste de ce document examine en détail d'autres facteurs qui peuvent être utiles ou importants pour des cas d'utilisation réels.

Partage de jeu de données

Dans la formation multi-ouvriers, le partitionnement de l'ensemble de données est nécessaire pour garantir la convergence et les performances.

L'exemple de la section précédente repose sur le partage automatique par défaut fourni par l'API tf.distribute.Strategy . Vous pouvez contrôler le partitionnement en définissant tf.data.experimental.AutoShardPolicy de tf.data.experimental.DistributeOptions . Pour en savoir plus sur le partage automatique, consultez le guide de saisie distribuée .

Voici un exemple rapide de la façon de désactiver le partitionnement automatique, afin que chaque réplica traite chaque exemple (non recommandé):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Évaluation

Si vous passez validation_data dans model.fit , il alternera entre la formation et l'évaluation pour chaque époque. L'évaluation prenant validation_data est distribuée sur le même ensemble de travailleurs et les résultats de l'évaluation sont agrégés et disponibles pour tous les travailleurs. Semblable à la formation, l'ensemble de données de validation est automatiquement partitionné au niveau du fichier. Vous devez définir une taille de lot globale dans l'ensemble de données de validation et définir validation_steps . Un ensemble de données répété est également recommandé pour l'évaluation.

Vous pouvez également créer une autre tâche qui lit périodiquement les points de contrôle et exécute l'évaluation. C'est ce que fait Estimator. Mais ce n'est pas une méthode recommandée pour effectuer une évaluation et ses détails sont donc omis.

Performance

Vous disposez maintenant d'un modèle Keras qui est configuré pour s'exécuter sur plusieurs MultiWorkerMirroredStrategy de MultiWorkerMirroredStrategy avec MultiWorkerMirroredStrategy . Vous pouvez essayer les techniques suivantes pour ajuster les performances de la formation multi-ouvriers avec MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy fournit plusieurs implémentations de communication collective . RING implémente des collectifs en anneau en utilisant gRPC comme couche de communication inter-hôte. NCCL utilise NCCL Nvidia pour mettre en œuvre des collectifs. AUTO reporte le choix au runtime. Le meilleur choix d'implémentation collective dépend du nombre et du type de GPU, ainsi que de l'interconnexion réseau dans le cluster. Pour remplacer le choix automatique, spécifiez le paramètre communication_options du MultiWorkerMirroredStrategy de MultiWorkerMirroredStrategy , par exemple communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • Transformez les variables en tf.float si possible. Le modèle officiel ResNet comprend un exemple de la façon dont cela peut être fait.

Tolérance aux pannes

Dans la formation synchrone, le cluster échouait si l'un des nœuds de calcul échoue et qu'aucun mécanisme de reprise après incident n'existe. L'utilisation de Keras avec tf.distribute.Strategy présente l'avantage de la tolérance aux pannes dans les cas où les travailleurs meurent ou sont instables. Pour ce faire, vous préservez l'état de l'entraînement dans le système de fichiers distribué de votre choix, de sorte qu'au redémarrage de l'instance qui avait précédemment échoué ou préempté, l'état de l'entraînement est récupéré.

Lorsqu'un travailleur devient indisponible, d'autres travailleurs échouent (éventuellement après un délai d'expiration). Dans de tels cas, le travailleur indisponible doit être redémarré, ainsi que les autres travailleurs qui ont échoué.

Rappel de ModelCheckpoint

ModelCheckpoint rappel ModelCheckpoint ne fournit plus la fonctionnalité de tolérance aux pannes, veuillez utiliser le rappel BackupAndRestore place.

Le rappel ModelCheckpoint peut toujours être utilisé pour enregistrer les points de contrôle. Mais avec cela, si la formation a été interrompue ou terminée avec succès, afin de continuer la formation à partir du point de contrôle, l'utilisateur est responsable de charger le modèle manuellement.

En option, l'utilisateur peut choisir d'enregistrer et de restaurer le modèle / les poids en dehors ModelCheckpoint rappel ModelCheckpoint .

Sauvegarde et chargement du modèle

Pour enregistrer votre modèle à l'aide de model.save ou tf.saved_model.save , la destination d'enregistrement doit être différente pour chaque travailleur. Sur les ouvriers non en chef, vous devrez enregistrer le modèle dans un répertoire temporaire, et sur le chef, vous devrez enregistrer dans le répertoire de modèle fourni. Les répertoires temporaires sur le travailleur doivent être uniques pour éviter les erreurs résultant de plusieurs travailleurs essayant d'écrire au même emplacement. Le modèle enregistré dans tous les répertoires est identique et, en général, seul le modèle enregistré par le chef doit être référencé pour restauration ou service. Vous devriez avoir une logique de nettoyage qui supprime les répertoires temporaires créés par les travailleurs une fois votre formation terminée.

La raison pour laquelle vous devez économiser sur le chef et les employés en même temps est que vous pourriez agréger des variables pendant le point de contrôle, ce qui oblige le chef et les employés à participer au protocole de communication allreduce. D'un autre côté, laisser le chef et les ouvriers enregistrer dans le même répertoire de modèle entraînera des erreurs dues à des conflits.

Avec MultiWorkerMirroredStrategy , le programme est exécuté sur chaque worker, et afin de savoir si le worker actuel est le chef, il tire parti de l'objet de résolution de cluster qui a les attributs task_type et task_id . task_type vous indique quel est le travail en cours (par exemple 'worker'), et task_id vous indique l'identifiant du worker. Le travailleur avec l'ID 0 est désigné comme travailleur en chef.

Dans l'extrait de code ci-dessous, write_filepath fournit le chemin du fichier à écrire, qui dépend de l'ID du worker. Dans le cas du chef (worker avec id 0), il écrit dans le chemin du fichier d'origine; pour les autres, il crée un répertoire temporaire (avec id dans le chemin du répertoire) dans lequel écrire:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Avec cela, vous êtes maintenant prêt à enregistrer:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Comme décrit ci-dessus, plus tard, le modèle ne devrait être chargé qu'à partir du chemin enregistré dans le chef, supprimons donc ceux temporaires que les ouvriers non-chefs ont sauvegardés:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Maintenant, quand il est temps de charger, tf.keras.models.load_model API tf.keras.models.load_model pratique et continuons le travail. Ici, supposons que vous n'utilisiez qu'un seul worker pour charger et continuer la formation, auquel cas vous n'appelez pas tf.keras.models.load_model dans une autre strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 13ms/step - loss: 2.3041 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2873 - accuracy: 0.0023
<tensorflow.python.keras.callbacks.History at 0x7f62c4ef5048>

Enregistrement et restauration des points de contrôle

D'autre part, le point de contrôle vous permet d'enregistrer les poids du modèle et de les restaurer sans avoir à enregistrer le modèle entier. Ici, vous allez créer un tf.train.Checkpoint qui suit le modèle, qui est géré par un tf.train.CheckpointManager afin que seul le dernier point de contrôle soit conservé.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Une fois le CheckpointManager configuré, vous êtes maintenant prêt à enregistrer et à supprimer les points de contrôle non-chefs de file enregistrés.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Désormais, lorsque vous avez besoin de restaurer, vous pouvez trouver le dernier point de contrôle enregistré à l'aide de la fonction pratique tf.train.latest_checkpoint . Après avoir restauré le point de contrôle, vous pouvez continuer la formation.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.3050 - accuracy: 0.0920
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2963 - accuracy: 0.0896
<tensorflow.python.keras.callbacks.History at 0x7f62c44a2710>

Rappel de BackupAndRestore

Rappel BackupAndRestore fournit des fonctionnalités de tolérance aux pannes, en sauvegardant le modèle et le numéro de l' époque actuelle dans un fichier de point de contrôle temporaire en vertu backup_dir argument BackupAndRestore . Cela se fait à la fin de chaque époque.

Une fois que les travaux sont interrompus et redémarrés, le rappel restaure le dernier point de contrôle et la formation continue à partir du début de l'époque interrompue. Tout entraînement partiel déjà effectué à l'époque inachevée avant l'interruption sera jeté, de sorte qu'il n'affecte pas l'état final du modèle.

Pour l'utiliser, fournissez une instance de tf.keras.callbacks.experimental.BackupAndRestore lors de l' tf.keras.Model.fit() à tf.keras.Model.fit() .

Avec MultiWorkerMirroredStrategy, si un worker est interrompu, l'ensemble du cluster s'arrête jusqu'à ce que le worker interrompu soit redémarré. Les autres nœuds de calcul redémarreront également et le nœud de calcul interrompu rejoindra le cluster. Ensuite, chaque worker lit le fichier de point de contrôle qui a été précédemment enregistré et récupère son ancien état, permettant ainsi au cluster de se synchroniser. Ensuite, la formation continue.

BackupAndRestore rappel BackupAndRestore utilise CheckpointManager pour enregistrer et restaurer l'état d'entraînement, ce qui génère un fichier appelé point de contrôle qui suit les points de contrôle existants avec le dernier. Pour cette raison, backup_dir ne doit pas être réutilisé pour stocker d'autres points de contrôle afin d'éviter une collision de noms.

Actuellement, le rappel BackupAndRestore prend en charge un seul travailleur sans stratégie, MirroredStrategy et multi-worker avec MultiWorkerMirroredStrategy. Vous trouverez ci-dessous deux exemples de formation multi-travailleurs et de formation mono-travailleur.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2930 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2467 - accuracy: 0.2765
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1963 - accuracy: 0.3645
<tensorflow.python.keras.callbacks.History at 0x7f62c4371390>

Si vous inspectez le répertoire de backup_dir vous avez spécifié dans BackupAndRestore , vous remarquerez peut-être des fichiers de point de contrôle générés temporairement. Ces fichiers sont nécessaires pour récupérer les instances précédemment perdues, et ils seront supprimés par la bibliothèque à la fin de tf.keras.Model.fit() lors de la sortie réussie de votre formation.

Voir également

  1. Le guide Formation distribuée dans TensorFlow fournit un aperçu des stratégies de distribution disponibles.
  2. Modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.
  3. La section Performances du guide fournit des informations sur les autres stratégies et outils que vous pouvez utiliser pour optimiser les performances de vos modèles TensorFlow.