Réserve cette date! Google I / O revient du 18 au 20 mai S'inscrire maintenant
Cette page a été traduite par l'API Cloud Translation.
Switch to English

Formation sur le serveur de paramètres

Voir sur TensorFlow.org Exécuter dans Google Colab Afficher la source sur GitHub Télécharger le cahier

Aperçu

La formation du serveur de paramètres est une méthode courante de mise en parallèle des données pour mettre à l'échelle la formation de modèle sur plusieurs machines. Un cluster de formation de serveur de paramètres se compose de nœuds de calcul et de serveurs de paramètres. Les variables sont créées sur des serveurs de paramètres et elles sont lues et mises à jour par les travailleurs à chaque étape. Par défaut, les nœuds de calcul lisent et mettent à jour ces variables indépendamment sans se synchroniser les uns avec les autres. C'est pourquoi la formation de type serveur de paramètres est parfois appelée formation asynchrone.

La formation du serveur de paramètres TensorFlow 2 utilise un coordinateur central via la classe tf.distribute.experimental.coordinator.ClusterCoordinator .

Dans cette implémentation, les tâches de parameter server worker et de parameter server exécutent tf.distribute.Server s qui écoutent les demandes du coordinateur. Le coordinateur crée des ressources, distribue des tâches de formation, écrit des points de contrôle et traite les échecs de tâches.

Nous pensons que cette architecture et la nouvelle classe ClusterCoordinator fournissent un modèle de programmation plus flexible et plus simple.

ClusterCoordinator

La classe ClusterCoordinator doit fonctionner en conjonction avec un objet tf.distribute.Strategy . Cet objet tf.distribute.Strategy est nécessaire pour transmettre les informations du cluster et est utilisé pour définir une étape d'entraînement comme nous l'avons vu dans l' entraînement personnalisé avec MirroredStrategy . L'objet ClusterCoordinator distribue ensuite l'exécution de ces étapes de formation aux travailleurs distants. Actuellement, ClusterCoordinator ne fonctionne qu'avec tf.distribute.experimental.ParameterServerStrategy .

L'API la plus importante fournie par l'objet ClusterCoordinator est schedule . L'API de schedule file d'attente une fonction tf.function et renvoie immédiatement une RemoteValue type RemoteValue . Les fonctions en file d'attente seront distribuées aux travailleurs distants dans les threads d'arrière-plan et leurs RemoteValue seront remplies de manière asynchrone. Étant donné que la schedule ne nécessite pas d'affectation de travailleur, la fonction tf.function transmise peut être exécutée sur n'importe quel travailleur disponible. Si le worker sur lequel elle est exécutée devient indisponible avant son achèvement, la fonction sera retentée sur un autre worker disponible. En raison de ce fait et du fait que l'exécution de la fonction n'est pas atomique, une fonction peut être exécutée plus d'une fois.

En plus de distribuer des fonctions distantes, le ClusterCoordinator permet également de créer des ensembles de données sur tous les ClusterCoordinator et de reconstruire ces ensembles de données lorsqu'un ClusterCoordinator se ClusterCoordinator un échec.

Configuration du didacticiel

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Configuration du cluster

Comme mentionné ci-dessus, un cluster de formation de serveur de paramètres nécessite une tâche de coordination qui exécute votre programme de formation, un ou plusieurs tf.distribute.Server et des tâches de serveur de paramètres qui exécutent des serveurs TensorFlow, c'est-à-dire tf.distribute.Server , et éventuellement une tâche d'évaluation supplémentaire qui exécute side-car évaluation (voir la section sur l'évaluation des side-car ci-dessous). Les conditions requises pour les configurer sont les suivantes:

  • La tâche du coordinateur doit connaître les adresses et les ports de tous les autres serveurs TensorFlow à l'exception de l'évaluateur.
  • Les nœuds de calcul et les serveurs de paramètres doivent savoir quel port ils doivent écouter. Par souci de simplicité, nous transmettons généralement les informations complètes du cluster lorsque nous créons des serveurs TensorFlow sur ces tâches.
  • La tâche de l'évaluateur n'a pas besoin de connaître la configuration du cluster de formation. Si tel est le cas, il ne doit pas tenter de se connecter au cluster de formation.
  • Les nœuds de calcul et les serveurs de paramètres doivent avoir des types de tâches respectivement «worker» et «ps». Le coordinateur doit utiliser «chef» comme type de tâche pour des raisons d'héritage.

Dans ce didacticiel, nous allons créer un cluster in-process afin que l'entraînement complet du serveur de paramètres puisse être exécuté dans colab. Nous présenterons comment mettre en place de vrais clusters dans une section ultérieure.

Cluster en cours

Dans ce didacticiel, nous allons démarrer un groupe de serveurs TensorFlow à l'avance et nous y connecter plus tard:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Entraînement avec boucle d'entraînement personnalisée

Boucle d'entraînement personnalisée avec tf.distribute.Strategy offre une grande flexibilité pour définir les boucles d'entraînement. Actuellement, pour la formation du serveur de paramètres dans TensorFlow 2, seule la boucle d'apprentissage personnalisée est prise en charge. Ici, nous utilisons ParameterServerStrategy pour définir une étape de formation, puis nous utilisons ClusterCoordinator pour distribuer l'exécution des étapes de formation aux travailleurs distants.

Créer le ParameterServerStrategy

Pour écrire une étape d'entraînement dans une boucle d'entraînement personnalisée, la première étape consiste à créer une ParameterServerStrategy . Nous expliquerons le variable_partitioner plus tard.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19436', 'localhost:22016'], 'worker': ['localhost:20127', 'localhost:21241', 'localhost:22797']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19436', 'localhost:22016'], 'worker': ['localhost:20127', 'localhost:21241', 'localhost:22797']})

Ensuite, vous allez créer un modèle, définir un ensemble de données et une fonction d'étape comme nous l'avons vu dans la boucle d'apprentissage avec d'autres tf.distribute.Strategy s. Vous pouvez trouver plus de détails dans ce tutoriel .

Afin d'utiliser les GPU pour la formation, allouez des GPU visibles à chaque collaborateur. ParameterServerStrategy utilisera tous les GPU disponibles sur chaque travailleur, avec la restriction que tous les travailleurs doivent avoir le même nombre de GPU disponibles. Pour garantir une prélecture efficace de l'ensemble de données, utilisez les API de création de l'ensemble de données distribuées recommandées mentionnées dans la section Étapes de la formation sur l' envoi aux travailleurs distants ci-dessous. Assurez-vous également d'appeler strategy.run dans worker_fn pour tirer pleinement parti des GPU alloués aux nœuds de calcul. Les autres étapes sont les mêmes pour l'entraînement avec ou sans GPU.

Créons ces composants dans les étapes suivantes:

Configurer les données

Tout d'abord, écrivez une fonction qui crée un ensemble de données qui inclut la logique de prétraitement implémentée par les couches de prétraitement Keras. Nous allons créer ces couches en dehors du dataset_fn mais appliquer la transformation à l'intérieur du dataset_fn puisque vous allez envelopper le dataset_fn dans une tf.function qui ne permet pas de créer des variables à l'intérieur.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Générez des exemples de jouets dans un ensemble de données:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Ensuite, nous créons le jeu de données d'entraînement enveloppé dans un dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construisez le modèle

Deuxièmement, nous créons le modèle et d'autres objets. Assurez-vous de créer toutes les variables sous strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Définir l'étape de formation

Troisièmement, créez l'étape de formation enveloppée dans une fonction tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Dans la fonction step ci-dessus, l'appel de strategy.run et strategy.reduce dans step_fn peut prendre en charge plusieurs GPU par worker. Si les nœuds de calcul ont des GPU alloués, strategy.run distribuera les ensembles de données sur plusieurs répliques.

Distribuer les étapes de formation aux travailleurs à distance

Une fois tous les calculs définis par ParameterServerStrategy , nous utiliserons la classe ClusterCoordinator pour créer des ressources et distribuer les étapes de formation aux travailleurs distants.

Créons d'abord un objet ClusterCoordinator et passons l'objet de stratégie:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Ensuite, nous créons un ensemble de données par travailleur et un itérateur. Dans le per_worker_dataset_fn ci-dessous, il est recommandé d' per_worker_dataset_fn le dataset_fn dans strategy.distribute_datasets_from_function pour permettre une prélecture efficace des GPU de manière transparente.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

La dernière étape consiste à distribuer le calcul aux travailleurs distants à l'aide de la schedule . La méthode de schedule file d'attente un tf.function et renvoie immédiatement une RemoteValue type RemoteValue . Les fonctions en file d'attente seront distribuées aux travailleurs distants dans les threads d'arrière-plan et RemoteValue sera rempli de manière asynchrone. La méthode de join peut être utilisée pour attendre que toutes les fonctions planifiées soient exécutées.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.306250.
Finished epoch 1, accuracy is 0.518750.
Finished epoch 2, accuracy is 0.893750.
Finished epoch 3, accuracy is 1.000000.

Voici comment vous pouvez récupérer le résultat d'un RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.032416

Alternativement, vous pouvez lancer toutes les étapes et faire quelque chose en attendant la fin:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Pour le flux de travail complet de formation et de diffusion de cet exemple particulier, veuillez consulter ce test .

En savoir plus sur la création de jeux de données

L'ensemble de données dans le code ci-dessus est créé à l'aide de l'API create_per_worker_dataset . Il crée un ensemble de données par worker et renvoie un objet conteneur. Vous pouvez appeler la méthode iter dessus pour créer un itérateur par travailleur. L'itérateur par travailleur contient un itérateur par travailleur et la tranche correspondante d'un travailleur sera substituée dans l'argument d'entrée de la fonction passée à la méthode de schedule avant que la fonction ne soit exécutée sur un travailleur particulier.

Actuellement, la méthode de schedule suppose que les nœuds de calcul sont équivalents et suppose donc que les ensembles de données des différents nœuds de calcul sont identiques, sauf qu'ils peuvent être mélangés différemment s'ils contiennent une opération dataset.shuffle . Pour cette raison, nous vous recommandons également de répéter indéfiniment les ensembles de données et de planifier un nombre fini d'étapes au lieu de compter sur l' OutOfRangeError d'un ensemble de données.

Une autre note importante est que les tf.data données tf.data ne prennent pas en charge la sérialisation et la désérialisation implicites au- tf.data limites des tâches. Il est donc important de créer l'ensemble de données dans la fonction passée à create_per_worker_dataset .

Partage variable

Le partage de variable fait référence à la division d'une variable en plusieurs variables plus petites. Nous appelons ces variables plus petites shard s. Le partage variable peut être utile pour répartir la charge du réseau lors de l'accès à ces fragments. Il est également utile de répartir le calcul et le stockage d'une variable normale sur plusieurs serveurs de paramètres.

Pour activer le partage de variables, vous pouvez passer un variable_partitioner lors de la construction d'un objet ParameterServerStrategy . Le variable_partitioner sera appelé à chaque fois qu'une variable est créée et il devrait renvoyer le nombre de fragments le long de chaque dimension de la variable. Certains variable_partitioner tf.distribute.experimental.partitioners.FixedShardsPartitioner à l' tf.distribute.experimental.partitioners.FixedShardsPartitioner sont fournis, tels que tf.distribute.experimental.partitioners.FixedShardsPartitioner .

Dans l'exemple ci-dessus, nous utilisons le FixedShardsPartitioner qui divisera toutes les variables en deux fragments et chaque fragment sera affecté à différents serveurs de paramètres:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Lorsqu'un variable_partitioner est passé et si vous créez une variable directement sous strategy.scope() , il deviendra un type de conteneur avec une propriété de variables qui donne accès à la liste des fragments. Dans la plupart des cas, ce conteneur sera automatiquement converti en Tensor en concaténant tous les fragments. En conséquence, il peut être utilisé comme une variable normale. D'autre part, certaines méthodes TensorFlow telles que tf.nn.embedding_lookup fournissent une implémentation efficace pour ce type de conteneur et dans ces méthodes, la concaténation automatique sera évitée.

Veuillez consulter la docstring API de ParameterServerStrategy pour plus de détails.

Évaluation

Il existe plusieurs façons de définir et d'exécuter une boucle d'évaluation dans la formation distribuée. Chacun a ses propres avantages et inconvénients, comme décrit ci-dessous. La méthode d'évaluation en ligne est recommandée si vous n'avez pas de préférence.

Évaluation en ligne

Dans cette méthode, le coordinateur alterne entre la formation et l'évaluation et nous l'appelons donc évaluation en ligne. L'évaluation en ligne présente plusieurs avantages. Par exemple, il peut prendre en charge de grands modèles d'évaluation et des ensembles de données d'évaluation qu'une seule tâche ne peut pas contenir. Pour un autre exemple, les résultats de l'évaluation peuvent être utilisés pour prendre des décisions pour la formation à l'époque suivante.

Il existe deux façons de mettre en œuvre l'évaluation en ligne:

  • Évaluation directe - Pour les petits modèles et les ensembles de données d'évaluation, le coordinateur peut exécuter l'évaluation directement sur le modèle distribué avec l'ensemble de données d'évaluation sur le coordinateur:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Évaluation distribuée - Pour les modèles volumineux ou les ensembles de données qui ne peuvent pas être exécutés directement sur le coordinateur, la tâche du coordinateur peut distribuer les tâches d'évaluation aux travailleurs via les méthodes de schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Évaluation du side-car

Une autre méthode est appelée évaluation side-car qui consiste à créer une tâche d'évaluateur dédiée qui lit à plusieurs reprises les points de contrôle et exécute l'évaluation sur un dernier point de contrôle. Cela permet à votre programme d'entraînement de se terminer tôt si vous n'avez pas besoin de modifier votre boucle d'entraînement en fonction des résultats de l'évaluation. Cependant, cela nécessite une tâche d'évaluateur supplémentaire et des points de contrôle périodiques pour déclencher l'évaluation. Voici une boucle d'évaluation possible du side-car:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clusters dans le monde réel

Dans un environnement de production réel, vous exécuterez toutes les tâches dans différents processus sur différentes machines. La manière la plus simple de configurer les informations de cluster sur chaque tâche est de définir les variables d'environnement "TF_CONFIG" et d'utiliser TFConfigClusterResolver pour analyser "TF_CONFIG". Pour une description générale des variables d'environnement "TF_CONFIG", veuillez consulter le guide de formation distribué .

Si vous démarrez vos tâches d'entraînement à l'aide de Kubernetes ou d'autres modèles de configuration, il est très probable que ces modèles aient déjà défini «TF_CONFIG» pour vous.

Définir la variable d'environnement «TF_CONFIG»

Supposons que vous ayez 3 nœuds de calcul et 2 serveurs de paramètres, le «TF_CONFIG» du nœud de calcul 1 peut être:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Le «TF_CONFIG» de l'évaluateur peut être:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

La partie «cluster» dans la chaîne «TF_CONFIG» ci-dessus pour l'évaluateur est facultative.

Si vous utilisez le même binaire pour toutes les tâches

Si vous préférez exécuter toutes ces tâches en utilisant un seul binaire, vous devrez laisser votre programme se diviser en différents rôles au tout début:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

Le code suivant démarre un serveur TensorFlow et attend:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Gestion de l'échec de la tâche

Échec du travailleur

Comme mentionné ci-dessus, ClusterCoordinator a une tolérance de pannes intégrée pour l'échec des travailleurs. Lors de la récupération du worker, la tranche correspondante des ensembles de données créés par create_per_worker_dataset qui sont toujours dans la portée sera recréée en appelant son dataset_fn origine passé à create_per_worker_dataset .

Échec du serveur de paramètres ou du coordinateur

Cependant, lorsque le coordinateur détecte une erreur de serveur de paramètres, il AbortedError immédiatement une erreur UnavailableError ou AbortedError . Vous pouvez redémarrer le coordinateur dans ce cas. Le coordinateur lui-même peut également devenir indisponible. Par conséquent, afin de ne pas perdre une grande partie de la progression de la formation, il est important de vérifier périodiquement les variables du modèle et de charger les variables du modèle à partir d'un point de contrôle, le cas échéant, avant le début de la formation. La progression de la formation peut être déduite approximativement de optimizer.iterations si un optimiseur est contrôlé.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue une RemoteValue

La RemoteValue un RemoteValue est garantie de réussir si une fonction est exécutée avec succès. En effet, actuellement, la valeur de retour est immédiatement copiée dans le coordinateur après l'exécution d'une fonction. En cas d'échec du worker pendant la copie, la fonction sera retentée sur un autre worker disponible. Par conséquent, si vous souhaitez optimiser les performances, vous pouvez planifier des fonctions sans valeur de retour.

Rapport d'erreurs

Une fois que le coordinateur voit une erreur telle que UnavailableError des serveurs de paramètres ou d'autres erreurs d'application telles qu'un InvalidArgument de tf.debugging.check_numerics , il annulera toutes les fonctions en attente et en file d'attente avant de tf.debugging.check_numerics l'erreur. La RemoteValue leurs RemoteValue correspondants RemoteValue une CancelledError .

Après qu'une erreur est levée, le coordinateur ne lèvera pas la même erreur ou aucune erreur des fonctions annulées.

Amélioration des performances

Il existe plusieurs raisons possibles si vous rencontrez des problèmes de performances lorsque vous vous entraînez avec ParameterServerStrategy et ClusterResolver .

Une raison courante est que les serveurs de paramètres ont une charge déséquilibrée et que certains serveurs de paramètres fortement chargés ont atteint leur capacité. Il peut également y avoir plusieurs causes profondes. Certaines méthodes simples pour atténuer ce problème consistent à

  1. Fragmentez vos grandes variables de modèle en spécifiant un variable_partitioner lors de la construction d'un ParameterServerStrategy .
  2. évitez de créer une variable de point d'accès requise par tous les serveurs de paramètres en une seule étape si possible. Par exemple, utilisez un taux d'apprentissage constant ou une sous-classe tf.keras.optimizers.schedules.LearningRateSchedule dans les optimiseurs car le comportement par défaut est que le taux d'apprentissage deviendra une variable placée sur un serveur de paramètres particulier et demandée par tous les autres serveurs de paramètres à chaque étape .
  3. mélangez vos grands vocabulaires avant de les transmettre aux couches de prétraitement Keras.

Une autre raison possible pour les problèmes de performance est le coordinateur. Notre première implémentation de schedule / join est basée sur Python et peut donc avoir une surcharge de thread. La latence entre le coordinateur et les ouvriers peut également être importante. Si tel est le cas, vous pouvez tf.function plusieurs étapes dans une seule tf.function :

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Nous continuerons d'optimiser le coordinateur et, espérons-le, la plupart des utilisateurs n'auront plus à emballer manuellement les étapes à l'avenir.

En outre, une petite astuce pour améliorer les performances consiste à planifier des fonctions sans valeur de retour, comme expliqué dans la section sur l'échec de la tâche de gestion ci-dessus.

Limitations connues

La plupart des limitations connues sont couvertes dans les sections ci-dessus. Voici un résumé:

  • os.environment["grpc_fail_fast"]="use_caller" est nécessaire sur chaque tâche, y compris le coordinateur, pour que la tolérance aux pannes fonctionne correctement.
  • La formation du serveur de paramètres synchrones n'est pas prise en charge.
  • ParameterServerStrategy ne fonctionne pas avec les API de compile et d' fit Keras.
  • ClusterCoordinator.schedule ne prend pas en charge les garanties de visite pour un ensemble de données.
  • Lorsque ClusterCoordinator.create_per_worker_dataset est utilisé, l'ensemble de données doit être créé dans la fonction qui lui est transmise.
  • Il est généralement nécessaire de regrouper plusieurs étapes dans une seule fonction pour obtenir des performances optimales.
  • Le chargement d'un saved_model via tf.saved_model.load contenant des variables tf.saved_model.load n'est pas pris en charge. Notez que le chargement d'un tel modèle enregistré à l'aide de TensorFlow Serving devrait fonctionner.
  • Le chargement d'un point de contrôle contenant des variables d'emplacements d'optimisation fragmentées n'est pas pris en charge dans un nombre différent de fragments.
  • Il n'est pas pris en charge pour récupérer après une défaillance du serveur de paramètres sans redémarrer la tâche du coordinateur.