Aide à protéger la Grande barrière de corail avec tensorflow sur Kaggle Rejoignez Défi

Boucle d'entraînement personnalisée avec Keras et MultiWorkerMirroredStrategy

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Ce tutoriel montre la formation multi-travailleur avec l' API de boucle de formation sur mesure, distribué par MultiWorkerMirroredStrategy, donc un modèle Keras conçu pour fonctionner sur un seul travailleur peut parfaitement travailler sur plusieurs travailleurs avec le changement de code minimal.

Nous utilisons des boucles d'entraînement personnalisées pour entraîner notre modèle, car elles nous offrent une flexibilité et un meilleur contrôle sur l'entraînement. De plus, il est plus facile de déboguer le modèle et la boucle d'entraînement. Des informations plus détaillées sont disponibles dans Ecrire une boucle de formation à partir de zéro .

Si vous cherchez comment utiliser MultiWorkerMirroredStrategy avec keras model.fit , reportez - vous à ce tutoriel à la place.

Distribué dans la tf.distribute.Strategy formation tensorflow guide est disponible pour un aperçu des stratégies de distribution tensorflow supports pour ceux qui souhaitent une meilleure compréhension des tf.distribute.Strategy API.

Installer

Tout d'abord, certaines importations nécessaires.

import json
import os
import sys

Avant d'importer TensorFlow, apportez quelques modifications à l'environnement.

Désactivez tous les GPU. Cela évite les erreurs causées par les travailleurs essayant tous d'utiliser le même GPU. Pour une application réelle, chaque travailleur serait sur une machine différente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Réinitialiser la TF_CONFIG variable d'environnement, vous verrez plus de cela plus tard.

os.environ.pop('TF_CONFIG', None)

Assurez-vous que le répertoire actuel est sur le chemin de python. Cela permet à l'ordinateur portable d'importer les fichiers écrits par %%writefile plus tard.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Importez maintenant TensorFlow.

import tensorflow as tf

Jeu de données et définition du modèle

Ensuite , créez un mnist.py fichier avec un modèle simple et ensemble de données configuration. Ce fichier python sera utilisé par les processus de travail dans ce tutoriel :

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configuration multi-travailleurs

Entrons maintenant dans le monde de la formation multi-travailleurs. Dans tensorflow, la TF_CONFIG variable d'environnement est nécessaire pour la formation sur plusieurs machines, dont chacun a peut - être un rôle différent. TF_CONFIG utilisé ci - dessous, est une chaîne JSON utilisée pour spécifier la configuration du cluster sur chaque travailleur qui fait partie du groupe. Ceci est la méthode par défaut pour spécifier un cluster, en utilisant cluster_resolver.TFConfigClusterResolver , mais il y a d' autres options disponibles dans le distribute.cluster_resolver module.

Décrivez votre cluster

Voici un exemple de configuration :

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Voici le même TF_CONFIG sérialisé en tant que chaîne JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Il y a deux composantes de TF_CONFIG : cluster et task .

  • cluster est le même pour tous les travailleurs et fournit des informations sur le pôle de formation, qui est un dict composé de différents types d'emplois tels que worker . Dans la formation multi-travailleur MultiWorkerMirroredStrategy , il y a généralement un worker qui prend un peu plus de responsabilités comme point de contrôle d' épargne et d' écriture du fichier de synthèse pour TensorBoard en plus de ce régulièrement worker fait. Un tel travailleur est désigné comme le chief travailleur, et il est d' usage que le worker avec l' index 0 est nommé chef worker (en fait , c'est de savoir comment tf.distribute.Strategy est mis en œuvre).

  • task fournit des informations de la tâche en cours et est différent sur chaque travailleur. Il spécifie le type et l' index de ce travailleur.

Dans cet exemple, vous définissez la tâche de type à "worker" et la tâche index à 0 . Cette machine est le premier ouvrier et sera désignée comme l'ouvrier en chef et fera plus de travail que les autres. Notez que d' autres machines devront avoir la TF_CONFIG variable d' environnement jeu aussi bien, et il devrait avoir le même cluster dict, mais différentes tâches de type ou tâche index en fonction de ce que les rôles de ces machines sont.

À titre d'illustration, ce tutoriel montre comment on peut définir un TF_CONFIG avec 2 travailleurs sur localhost . Dans la pratique, les utilisateurs de créer plusieurs travailleurs sur les adresses / ports IP externes, et ensemble TF_CONFIG sur chaque travailleur de façon appropriée.

Dans cet exemple , vous utiliserez 2 travailleurs, le premier travailleur TF_CONFIG est indiqué ci - dessus. Pour le deuxième travailleur , vous définissez tf_config['task']['index']=1

Au- dessus, tf_config est juste une variable locale en python. Pour utiliser réellement à la formation de configuration, ce dictionnaire besoins à sérialisés comme JSON, et placés dans la TF_CONFIG variable d'environnement.

Variables d'environnement et sous-processus dans les notebooks

Les sous-processus héritent des variables d'environnement de leur parent. Donc , si vous définissez une variable d'environnement dans ce jupyter notebook - jupyter notebook processus:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Vous pouvez accéder à la variable d'environnement à partir d'un sous-processus :

echo ${GREETINGS}
Hello TensorFlow!

Dans la section suivante, vous allez l' utiliser pour passer le TF_CONFIG aux travailleurs sous - processus. Vous ne lanceriez jamais vraiment vos tâches de cette façon, mais c'est suffisant pour les besoins de ce didacticiel : pour démontrer un exemple multi-worker minimal.

MultiWorkerMirroredStrategy

Pour former le modèle, utilisez une instance de tf.distribute.MultiWorkerMirroredStrategy , ce qui crée des copies de toutes les variables dans les couches du modèle sur chaque appareil à tous les travailleurs. Le tf.distribute.Strategy Guide a plus de détails sur cette stratégie.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2021-11-23 02:29:16.957442: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:16.957748: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

Utilisez tf.distribute.Strategy.scope pour spécifier qu'une stratégie devrait être utilisée lors de la construction de votre modèle. Cela vous met dans le « contexte international réplique » de cette stratégie, ce qui signifie que la stratégie est mise dans le contrôle des choses comme placement variables.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Partage automatique de vos données entre les travailleurs

Dans la formation multi-travailleurs, le partitionnement de l'ensemble de données n'est pas nécessairement nécessaire, mais il vous donne une sémantique unique qui rend plus de formation plus reproductible, c'est-à-dire que la formation sur plusieurs travailleurs doit être la même que la formation sur un seul travailleur. Remarque : les performances peuvent être affectées dans certains cas.

Voir: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step

Définir une boucle d'entraînement personnalisée et entraîner le modèle

Spécifier un optimiseur

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Définir une étape de formation avec tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Enregistrement et restauration des points de contrôle

L'implémentation du point de contrôle dans une boucle d'entraînement personnalisée nécessite que l'utilisateur le gère au lieu d'utiliser un rappel keras. Il vous permet de sauvegarder les poids du modèle et de les restaurer sans avoir à sauvegarder le modèle entier.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Ici, vous allez créer un tf.train.Checkpoint que les pistes du modèle, qui est géré par un tf.train.CheckpointManager de sorte que seul le dernier point de contrôle est conservé.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Maintenant, quand vous devez restaurer, vous pouvez trouver le dernier point de contrôle enregistré à l' aide de la pratique tf.train.latest_checkpoint fonction.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Après avoir restauré le point de contrôle, vous pouvez continuer à entraîner votre boucle d'entraînement personnalisée.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2021-11-23 02:29:18.214294: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.826228, train_loss: 0.540131.
Epoch: 1, accuracy: 0.937946, train_loss: 0.207413.
Epoch: 2, accuracy: 0.960603, train_loss: 0.137420.

Configuration complète du code sur les travailleurs

Pour réellement fonctionner avec MultiWorkerMirroredStrategy vous devez exécuter les processus de travail et passer un TF_CONFIG à eux.

Comme le mnist.py fichier écrit plus tôt, voici le main.py qui contiennent le même code que nous avons traversé pas à pas auparavant dans ce colab, nous l'écrire juste un fichier pour chacun des travailleurs sera l' exécuter:

Fichier: main.py

Writing main.py

Former et évaluer

Le répertoire actuel contient désormais les deux fichiers Python :

ls *.py
main.py
mnist.py

Alors JSON-sérialiser le TF_CONFIG et l' ajouter aux variables d'environnement:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Maintenant, vous pouvez lancer un processus de travail qui exécutera le main.py et utiliser le TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Il y a quelques points à noter à propos de la commande ci-dessus :

  1. Il utilise le %%bash qui est un ordinateur portable « magique » pour exécuter certaines commandes bash.
  2. Il utilise le --bg drapeau pour exécuter le bash processus en arrière - plan, parce que ce travailleur ne se termine pas. Il attend tous les travailleurs avant de commencer.

Le processus de travail backgrounded ne sera pas imprimer sortie à ce bloc - notes, de sorte que le &> redirige sa sortie vers un fichier, afin que vous puissiez voir ce qui est arrivé.

Attendez donc quelques secondes que le processus démarre :

import time
time.sleep(20)

Maintenant, regardez ce qui a été sorti jusqu'à présent dans le fichier journal du travailleur :

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

La dernière ligne du fichier journal doit dire: Started server with target: grpc://localhost:12345 . Le premier travailleur est maintenant prêt et attend que tous les autres travailleurs soient prêts à continuer.

Donc , mettre à jour le tf_config pour le processus du deuxième travailleur à ramasser:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lancez maintenant le deuxième ouvrier. Cela lancera la formation puisque tous les travailleurs sont actifs (il n'est donc pas nécessaire de mettre ce processus en arrière-plan) :

python main.py > /dev/null 2>&1

Maintenant, si vous revérifiez les journaux écrits par le premier travailleur, vous verrez qu'il a participé à la formation de ce modèle :

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration
2021-11-23 02:29:50.709898: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.820424, train_loss: 0.575663.
Epoch: 1, accuracy: 0.927344, train_loss: 0.241324.
Epoch: 2, accuracy: 0.953237, train_loss: 0.154762.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formation multi-travailleurs en profondeur

Ce tutoriel a fait preuve d' une Custom Training Loop de Custom Training Loop flux de travail de la configuration multi-travailleurs. Une description détaillée des autres sujets est disponible dans le model.fit's guide de la configuration multi-travailleurs et applicable à CTLs.

Voir également

  1. La formation distribuée dans tensorflow guide donne un aperçu des stratégies de distribution disponibles.
  2. Modèles officiels , dont beaucoup peuvent être configurés pour exécuter plusieurs stratégies de distribution.
  3. La section Performance dans le guide fournit des informations sur d' autres stratégies et outils que vous pouvez utiliser pour optimiser les performances de vos modèles de tensorflow.