Formation du serveur de paramètres avec ParameterServerStrategy

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

La formation de serveur de paramètres est une méthode parallèle des données communes à intensifier la formation de modèle sur plusieurs machines.

Un pôle de formation de serveur de paramètres se compose des travailleurs et des serveurs de paramètres. Les variables sont créées sur les serveurs de paramètres et elles sont lues et mises à jour par les travailleurs à chaque étape. Par défaut, les travailleurs lisent et mettent à jour ces variables indépendamment sans se synchroniser les uns avec les autres. C'est pourquoi parfois paramètre de formation de type serveur est appelé formation asynchrone.

Dans tensorflow 2, la formation de serveur de paramètres est alimenté par la tf.distribute.experimental.ParameterServerStrategy classe, qui distribue les étapes de formation à un groupe qui évolue jusqu'à des milliers de travailleurs (accompagné par les serveurs de paramètres).

Méthodes de formation prises en charge

Il existe deux principales méthodes de formation prises en charge :

Un cluster avec des emplois et des tâches

Quelle que soit l'API de choix ( Model.fit ou une boucle de formation sur mesure), la formation distribuée dans tensorflow 2 implique: un 'cluster' avec plusieurs 'jobs' , et chacun des emplois peut avoir un ou plusieurs 'tasks' .

Lors de l'utilisation de la formation du serveur de paramètres, il est recommandé d'avoir :

  • Un travail de coordinateur (qui a le nom du travail en chief )
  • Plusieurs emplois des travailleurs (nom d' emploi worker ); et
  • Plusieurs tâches de serveur de paramètres (nom de la tâche ps )

Alors que le coordonnateur crée des ressources, des tâches de formation des dépêches, écrit des points de contrôle, et traite des échecs de travail, les travailleurs et les serveurs de paramètres exécutent tf.distribute.Server qui écoutent les demandes du coordonnateur.

La formation du serveur Paramètre avec Model.fit API

La formation du serveur avec le paramètre Model.fit API nécessite le coordinateur d'utiliser un tf.distribute.experimental.ParameterServerStrategy objet, et un tf.keras.utils.experimental.DatasetCreator comme entrée. Semblable à Model.fit utilisation sans stratégie, ou avec d' autres stratégies, le flux de travail consiste à créer et compiler le modèle, la préparation des callbacks, suivi d'un Model.fit appel.

Entraînement du serveur de paramètres avec une boucle d'entraînement personnalisée

Avec des boucles de formation personnalisés, la tf.distribute.experimental.coordinator.ClusterCoordinator classe est l'élément clé utilisée pour le coordinateur.

La plus importante API fournie par le ClusterCoordinator objet est le schedule :

  • Le schedule API enqueues un tf.function et retourne un avenir semblable à RemoteValue immédiatement.
  • Les fonctions en file d' attente seront envoyés aux travailleurs à distance dans les discussions de fond et leur RemoteValue s seront remplis de manière asynchrone.
  • Étant donné que le schedule ne nécessite pas l' affectation des travailleurs, le tf.function passé en peut être exécuté sur tout travailleur disponible.
  • Si le travailleur sur lequel elle est exécutée devient indisponible avant son achèvement, la fonction sera réessayée sur un autre travailleur disponible.
  • En raison de ce fait et du fait que l'exécution d'une fonction n'est pas atomique, une fonction peut être exécutée plusieurs fois.

En plus de l' envoi des fonctions à distance, le ClusterCoordinator permet également de créer des ensembles de données sur tous les travailleurs et reconstruire ces ensembles de données lorsqu'un travailleur récupère de l' échec.

Configuration du didacticiel

Le tutoriel branche dans Model.fit chemins de boucle de formation et personnalisés, et vous pouvez choisir celui qui correspond à vos besoins. Les sections autres que "Formation avec X" sont applicables aux deux parcours.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

Configuration du cluster

Comme mentionné ci - dessus, un pôle de formation de serveur de paramètres nécessite une tâche de coordination qui exécute votre programme de formation, un ou plusieurs travailleurs et les tâches de serveur de paramètres qui exécutent tensorflow Serveurs- tf.distribute.Server -et peut - être une tâche d'évaluation supplémentaire qui exécute l' évaluation side-car (voir la section d'évaluation du side-car ci-dessous). Les conditions pour les mettre en place sont :

  • La tâche de coordinateur doit connaître les adresses et les ports de tous les autres serveurs TensorFlow, à l'exception de l'évaluateur.
  • Les travailleurs et les serveurs de paramètres doivent savoir sur quel port ils doivent écouter. Par souci de simplicité, vous pouvez généralement transmettre les informations complètes du cluster lors de la création de serveurs TensorFlow sur ces tâches.
  • La tâche de l'évaluateur n'a pas besoin de connaître la configuration du cluster de formation. Si tel est le cas, il ne doit pas tenter de se connecter au cluster d'entraînement.
  • Les travailleurs et les serveurs de paramètres doivent avoir des types de tâches comme "worker" et "ps" , respectivement. Le coordonnateur devrait utiliser "chief" comme type de tâche pour des raisons historiques.

Dans ce didacticiel, vous allez créer un cluster in-process afin que l'ensemble de la formation du serveur de paramètres puisse être exécuté dans Colab. Vous apprendrez comment mettre en place des clusters réels dans une autre section.

Cluster en cours

Vous commencerez par créer plusieurs serveurs TensorFlow à l'avance et vous y connecterez plus tard. Notez que cela est uniquement dans le but de la démonstration de ce tutoriel, et dans la formation réelle des serveurs sera lancé sur "worker" et "ps" machines.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

La configuration du cluster en cours est fréquemment utilisé dans les tests unitaires, comme ici .

Une autre option pour le test local est de lancer des processus sur la machine locale vérification à la formation multi-travailleur Keras pour un exemple de cette approche.

Instancier une ParameterServerStrategy

Avant de plonger dans le code de formation, nous allons instancier un ParameterServerStrategy objet. Notez que cela est nécessaire indépendamment du fait que vous procédez avec Model.fit ou une boucle de formation sur mesure. Le variable_partitioner argument sera expliqué dans la section variable sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

Afin d'utiliser les GPU pour la formation, allouez des GPU visibles à chaque travailleur. ParameterServerStrategy utilisera tous les processeurs graphiques disponibles sur chaque travailleur, avec la restriction que tous les travailleurs devraient avoir le même nombre de processeurs graphiques disponibles.

Partage variable

Sharding variable se rapporte au fractionnement d' une variable dans plusieurs variables plus petites, qui sont appelés fragments. Le partitionnement variable peut être utile pour répartir la charge du réseau lors de l'accès à ces partitions. Il est également utile de répartir le calcul et le stockage d'une variable normale sur plusieurs serveurs de paramètres.

Pour activer sharding variable, vous pouvez passer dans un variable_partitioner lors de la construction d' un ParameterServerStrategy objet. Le variable_partitioner sera appelé à chaque fois que lorsqu'une variable est créée et il devrait retourner le nombre de tessons le long de chaque dimension de la variable. Certains hors de la boîte variable_partitioner s sont fournis tels que tf.distribute.experimental.partitioners.MinSizePartitioner . Il est recommandé d'utiliser partitionneurs basés sur la taille comme tf.distribute.experimental.partitioners.MinSizePartitioner pour éviter le partitionnement de petites variables, qui pourraient avoir un impact négatif sur la vitesse de formation du modèle.

Lorsqu'un variable_partitioner est passé et si vous créez une variable directement sous strategy.scope() , il deviendra un type de conteneur avec des variables propriété qui donne accès à la liste des tessons. Dans la plupart des cas, ce conteneur sera automatiquement converti en un Tensor en concaténant tous les fragments. En conséquence, il peut être utilisé comme une variable normale. D'autre part, certaines méthodes de tensorflow telles que tf.nn.embedding_lookup fournissent la mise en œuvre efficace de ce type de récipient et dans ces méthodes concaténation automatique est proscrite.

S'il vous plaît voir l'API docs de tf.distribute.experimental.ParameterServerStrategy pour plus de détails.

Formation avec Model.fit

Keras fournit une API de formation facile à utiliser via Model.fit qui gère la boucle de formation sous le capot, avec la flexibilité de Overridable train_step et callbacks, qui offrent des fonctionnalités telles que l' enregistrement de point de contrôle ou d'un résumé d' économie pour TensorBoard. Avec Model.fit , le même code de formation peut être utilisé pour d' autres stratégies avec un simple échange de l'objet de stratégie.

Des données d'entrée

Model.fit avec la formation de serveur de paramètres exige que les données d'entrée sont fournies dans un appelable qui prend un seul argument de type tf.distribute.InputContext , et retourne un tf.data.Dataset . Ensuite, créez un tf.keras.utils.experimental.DatasetCreator objet qui prend une telle callable , et une option tf.distribute.InputOptions objet via input_options argument.

Notez qu'il est recommandé de mélanger et répéter les données avec la formation de serveur de paramètres et spécifiez steps_per_epoch en fit appel afin que la bibliothèque connaît les limites d'époque.

S'il vous plaît voir le entrée distribué tutoriel pour plus d' informations sur le InputContext argument.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Le code dataset_fn sera invoqué sur le dispositif d'entrée, qui est généralement le CPU, sur chacune des machines de travail.

Construction et compilation de modèles

Maintenant, vous allez créer un tf.keras.Model trivial de tf.keras.models.Sequential modèle pour suivi des fins de démonstration par un- Model.compile appel à incorporer des composants, comme un optimiseur, des mesures ou des paramètres tels que steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Rappels et formation

Avant d'appeler model.fit pour la formation proprement dite, nous allons préparer les callbacks nécessaires pour les tâches courantes, telles que:

  • ModelCheckpoint : pour enregistrer les poids du modèle.
  • BackupAndRestore : pour vous assurer que les progrès de la formation est automatiquement sauvegardé, et récupéré si l'expérience de cluster indisponibilité (comme abort ou préemption); ou alors
  • TensorBoard : pour enregistrer les rapports d'étape dans les fichiers de synthèse, qui se visualisables dans l' outil TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

Utilisation directe avec ClusterCoordinator ( en option)

Même si vous choisissez le Model.fit parcours de formation, vous pouvez éventuellement instancier un tf.distribute.experimental.coordinator.ClusterCoordinator objet pour programmer d' autres fonctions que vous souhaitez exécuter sur les travailleurs. Voir la formation avec une boucle de formation personnalisée section pour plus de détails et d' exemples.

Entraînement avec une boucle d'entraînement personnalisée

En utilisant des boucles de formation sur mesure avec tf.distribute.Strategy offre une grande flexibilité pour définir les boucles de formation. Avec le ParameterServerStrategy défini ci - dessus (comme strategy ), vous utiliserez un tf.distribute.experimental.coordinator.ClusterCoordinator pour envoyer l'exécution des étapes de formation aux travailleurs à distance.

Ensuite, vous allez créer un modèle, définir un ensemble de données et une fonction étape, comme vous l' avez fait dans la boucle de formation avec d' autres tf.distribute.Strategy s. Vous pouvez trouver plus de détails dans la formation personnalisée avec tf.distribute.Strategy tutoriel.

Pour assurer le préchargement du jeu de données efficace, utilisez le jeu de données API recommandé la création distribuée mentionnées dans les étapes de formation Dispatch aux travailleurs à distance section ci - dessous. Aussi, assurez - vous d'appeler Strategy.run l' intérieur worker_fn pour tirer le meilleur parti des processeurs graphiques attribués aux travailleurs. Le reste des étapes est le même pour l'entraînement avec ou sans GPU.

Créons ces composants dans les étapes suivantes :

Configurer les données

Tout d' abord, écrire une fonction qui crée un ensemble de données qui comprend la logique pré - traitement mis en œuvre par des couches de pré - traitement KERAS .

Vous allez créer ces couches en dehors du dataset_fn mais appliquer la transformation à l' intérieur du dataset_fn , puisque vous envelopper le dataset_fn dans un tf.function , ce qui ne permet pas les variables à créer à l' intérieur.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Générez des exemples de jouets dans un ensemble de données :

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Ensuite, créez l'ensemble de données de formation enveloppé dans un dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construire le modèle

Ensuite, créez le modèle et d'autres objets. Assurez - vous de créer toutes les variables sous strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

La confirmation de Let que l'utilisation de FixedShardsPartitioner divisé toutes les variables en deux tessons et chaque tesson a été affecté à des serveurs de paramètres différents:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Définir l'étape de formation

Troisièmement, créer l'étape de formation enveloppé dans un tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Dans la fonction étape de formation ci - dessus, appelant Strategy.run et Strategy.reduce dans le step_fn peut prendre en charge plusieurs processeurs graphiques par travailleur. Si les travailleurs ont alloué GPUs, Strategy.run distribuera les jeux de données sur plusieurs répliques.

Envoi des étapes de formation aux télétravailleurs

Après sont définis tous les calculs par ParameterServerStrategy , vous utiliserez la tf.distribute.experimental.coordinator.ClusterCoordinator classe pour créer des ressources et distribuer les étapes de formation aux travailleurs à distance.

Nous allons d' abord créer un ClusterCoordinator objet et passer dans l'objet de stratégie:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Ensuite, créez un ensemble de données par travailleur et un itérateur. Dans les per_worker_dataset_fn ci - dessous, enveloppant le dataset_fn dans strategy.distribute_datasets_from_function est recommandé pour permettre préchargement efficace de processeurs graphiques de façon transparente.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

La dernière étape consiste à distribuer le calcul aux travailleurs distants à l' aide ClusterCoordinator.schedule :

  • Le schedule méthode enqueues un tf.function et retourne un avenir semblable à RemoteValue immédiatement. Les fonctions en file d' attente seront envoyés aux travailleurs à distance dans les discussions de fond et le RemoteValue seront remplis de manière asynchrone.
  • La join méthode ( ClusterCoordinator.join ) peut être utilisé pour attendre jusqu'à ce que toutes les fonctions programmées sont exécutées.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Voici comment vous pouvez chercher le résultat d'une RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternativement, vous pouvez lancer toutes les étapes et faire quelque chose en attendant la fin :

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Pour la formation complète et le service workflow pour cet exemple particulier, s'il vous plaît vérifier ce critère .

En savoir plus sur la création de jeux de données

Le jeu de données dans le code ci - dessus est créée en utilisant l' ClusterCoordinator.create_per_worker_dataset API). Il crée un ensemble de données par travailleur et renvoie un objet conteneur. Vous pouvez appeler la iter méthode sur pour créer un itérateur par travailleur. Le per-travailleur itérateur contient un itérateur par travailleur et la tranche correspondante d'un travailleur sera substitué dans le paramètre d'entrée de la fonction passée à la ClusterCoordinator.schedule procédé avant que la fonction est exécutée sur un travailleur particulier.

À l' heure actuelle, la ClusterCoordinator.schedule méthode suppose les travailleurs sont équivalents et assume ainsi les ensembles de données sur les différents travailleurs sont les mêmes si elles contiennent , sauf qu'ils peuvent être mélangées différemment une Dataset.shuffle opération. À cause de cela, il est également recommandé que les ensembles de données à répéter indéfiniment et vous programmez un nombre fini d'étapes au lieu de compter sur la OutOfRangeError d'un ensemble de données.

Une autre remarque importante est que tf.data ensembles de données ne prennent pas en charge la sérialisation et la désérialisation implicite à travers les frontières de la tâche. Il est donc important de créer l'ensemble des données à l' intérieur de la fonction passée à ClusterCoordinator.create_per_worker_dataset .

Évaluation

Il existe plusieurs façons de définir et d'exécuter une boucle d'évaluation dans la formation distribuée. Chacun a ses propres avantages et inconvénients, comme décrit ci-dessous. La méthode d'évaluation en ligne est recommandée si vous n'avez pas de préférence.

Évaluation en ligne

Dans cette méthode, alterne coordination entre la formation et l' évaluation et il est donc appelé l' évaluation en ligne.

L'évaluation en ligne présente plusieurs avantages. Par exemple:

  • Il peut prendre en charge de grands modèles d'évaluation et des ensembles de données d'évaluation qu'une seule tâche ne peut pas contenir.
  • Les résultats de l'évaluation peuvent être utilisés pour prendre des décisions pour la formation de l'époque suivante.

Il existe deux manières de mettre en œuvre l'évaluation en ligne : l'évaluation directe et l'évaluation distribuée.

  • L' évaluation directe:, le coordonnateur peut exécuter l' évaluation directement sur le modèle distribué Pour les petits modèles et des ensembles de données d'évaluation avec l'ensemble de données d'évaluation du coordinateur:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Distribué évaluation: Pour les grands modèles ou ensembles de données qui sont infaisables pour exécuter directement sur le coordinateur, la tâche de coordination peut répartir les tâches d'évaluation aux travailleurs via les ClusterCoordinator.schedule / ClusterCoordinator.join méthodes:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Évaluation du side-car

Une autre méthode est appelée évaluation side-car où vous créez une tâche d'évaluateur dédié qui lit à plusieurs reprises des points de contrôle et exécute l' évaluation sur un dernier point de contrôle. Cela permet à votre programme d'entraînement de se terminer plus tôt si vous n'avez pas besoin de modifier votre boucle d'entraînement en fonction des résultats de l'évaluation. Cependant, cela nécessite une tâche d'évaluation supplémentaire et des points de contrôle périodiques pour déclencher l'évaluation. Voici une boucle d'évaluation side-car possible :

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Les clusters dans le monde réel

Dans un environnement de production réel, vous exécuterez toutes les tâches dans différents processus sur différentes machines. La façon la plus simple de configurer les informations de cluster sur chaque tâche consiste à définir "TF_CONFIG" variables d'environnement et d' utiliser un tf.distribute.cluster_resolver.TFConfigClusterResolver pour analyser "TF_CONFIG" .

Pour une description générale sur "TF_CONFIG" variables d'environnement, reportez - vous à la formation distribuée guide.

Si vous commencez vos tâches de formation à l' aide Kubernetes ou d' autres modèles de configuration, il est très probable que ces modèles ont déjà mis en “TF_CONFIG" pour vous.

Réglez le "TF_CONFIG" variable d'environnement

Supposons que vous avez 3 travailleurs et 2 serveurs de paramètres, le "TF_CONFIG" de travailleurs 1 peut être:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

Le "TF_CONFIG" de l'évaluateur peut être:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Le "cluster" part au - dessus "TF_CONFIG" chaîne pour l'évaluateur est facultative.

Si vous utilisez le même binaire pour toutes les tâches

Si vous préférez exécuter toutes ces tâches à l'aide d'un seul binaire, vous devrez laisser votre programme se brancher sur différents rôles au tout début :

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Le code suivant démarre un serveur TensorFlow et attend :

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Gestion de l'échec de la tâche

Échec du travailleur

tf.distribute.experimental.coordinator.ClusterCoordinator ou Model.fit fournissent intégré dans la tolérance aux pannes pour l' échec des travailleurs. Lors de la récupération des travailleurs, la fonction de jeu de données fourni précédemment (soit à ClusterCoordinator.create_per_worker_dataset pour une boucle de formation sur mesure, ou tf.keras.utils.experimental.DatasetCreator pour Model.fit ) seront appelées les travailleurs à recréer les ensembles de données.

Échec du serveur de paramètres ou du coordinateur

Toutefois, lorsque le coordinateur voit une erreur de serveur de paramètres, il déclenche une UnavailableError ou AbortedError immédiatement. Vous pouvez redémarrer le coordinateur dans ce cas. Le coordinateur lui-même peut également devenir indisponible. Par conséquent, certains outillages sont recommandés afin de ne pas perdre la progression de l'entraînement :

  • Pour Model.fit , vous devez utiliser un BackupAndRestore rappel, qui gère l'économie d'avancement et de restauration automatique. Voir Callbacks et la formation section ci - dessus pour un exemple.

  • Pour une boucle d'entraînement personnalisée, vous devez vérifier périodiquement les variables de modèle et charger les variables de modèle à partir d'un point de contrôle, le cas échéant, avant le début de l'entraînement. Les progrès de la formation peut être déduit approximativement à partir optimizer.iterations si un optimiseur est un point de reprise:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Récupérer un RemoteValue

Récupérer un RemoteValue est garanti pour réussir si une fonction est exécutée avec succès. En effet, actuellement, la valeur de retour est immédiatement copiée dans le coordinateur après l'exécution d'une fonction. S'il y a un échec de travailleur pendant la copie, la fonction sera réessayée sur un autre travailleur disponible. Par conséquent, si vous souhaitez optimiser les performances, vous pouvez planifier des fonctions sans valeur de retour.

Rapport d'erreur

Une fois que le coordinateur voit une erreur comme UnavailableError à partir des serveurs de paramètres ou d' autres erreurs d'application comme un InvalidArgument de tf.debugging.check_numerics , il annulera toutes les fonctions en attente et en attente avant de soulever l'erreur. Récupérer leur correspondant RemoteValue s soulèvera un CancelledError .

Une fois qu'une erreur est déclenchée, le coordinateur ne déclenchera pas la même erreur ni aucune erreur provenant de fonctions annulées.

Amélioration des performances

Il y a plusieurs raisons possibles si vous voyez des problèmes de performance lorsque vous vous entraînez avec ParameterServerStrategy et ClusterResolver .

Une raison courante est que les serveurs de paramètres ont une charge déséquilibrée et que certains serveurs de paramètres fortement chargés ont atteint leur capacité maximale. Il peut également y avoir plusieurs causes profondes. Voici quelques méthodes simples pour atténuer ce problème :

  1. Shard vos grandes variables du modèle via la spécification d' un variable_partitioner lors de la construction d' un ParameterServerStrategy .
  2. Évitez de créer une variable de point d'accès qui est requise par tous les serveurs de paramètres en une seule étape si possible. Par exemple, utiliser un taux d'apprentissage constant ou sous - classe tf.keras.optimizers.schedules.LearningRateSchedule dans optimiseurs puisque le comportement par défaut est que le taux d'apprentissage deviendra une variable placée sur un serveur de paramètres particulier et demandé par tous les autres serveurs de paramètres à chaque étape .
  3. Mélangez vos grands vocabulaires avant de les transmettre aux couches de prétraitement Keras.

Une autre raison possible des problèmes de performance est le coordinateur. Votre première mise en œuvre du schedule / join est Python-base et peut donc avoir le filetage en tête. De plus, la latence entre le coordinateur et les travailleurs peut être importante. Si c'est le cas,

  • Pour Model.fit , vous pouvez définir steps_per_execution l' argument fourni à Model.compile à une valeur supérieure à 1.

  • Pour une boucle de formation sur mesure, vous pouvez emballer plusieurs étapes en un seul tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Comme la bibliothèque est optimisée davantage, nous espérons que la plupart des utilisateurs n'auront pas à emballer manuellement les étapes à l'avenir.

De plus, une petite astuce pour améliorer les performances consiste à planifier des fonctions sans valeur de retour, comme expliqué dans la section de gestion des échecs de tâche ci-dessus.

Limites connues

La plupart des limitations connues sont déjà couvertes dans les sections ci-dessus. Cette section fournit un résumé.

ParameterServerStrategy générale

  • os.environment["grpc_fail_fast"]="use_caller" est nécessaire sur toutes les tâches , y compris le coordonnateur, pour rendre le travail de la tolérance aux pannes correctement.
  • L'apprentissage synchrone du serveur de paramètres n'est pas pris en charge.
  • Il est généralement nécessaire de regrouper plusieurs étapes dans une seule fonction pour obtenir des performances optimales.
  • Il est pas pris en charge pour charger un saved_model via tf.saved_model.load contenant des variables fragmentées. Notez que le chargement d'un tel save_model à l'aide de TensorFlow Serving devrait fonctionner.
  • Il n'est pas pris en charge de charger un point de contrôle contenant des variables d'emplacement d'optimiseur fragmentées dans un nombre différent de fragments.
  • Il n'est pas possible de récupérer après une défaillance du serveur de paramètres sans redémarrer la tâche de coordinateur.
  • L' utilisation de tf.lookup.StaticHashTable (qui est couramment employé par certains tf.keras.layers.experimental.preprocessing couches, comme IntegerLookup , StringLookup et TextVectorization ) résultats en matière de ressources mis sur le coordonnateur à ce moment une formation de serveur de paramètres. Cela a des implications en termes de performances pour les RPC de recherche des travailleurs vers le coordinateur. Il s'agit actuellement d'une priorité élevée à traiter.

Model.fit détails

  • steps_per_epoch argument est nécessaire Model.fit . Vous pouvez sélectionner une valeur qui fournit des intervalles appropriés dans une époque.
  • ParameterServerStrategy ne prend pas en charge pour les rappels personnalisés qui ont des appels au niveau des lots pour des raisons de performance. Vous devez convertir ces appels en appels de niveau époque avec convenablement choisi steps_per_epoch , de sorte qu'ils sont appelés chaque steps_per_epoch nombre d'étapes. Les rappels intégrés ne sont pas affectés : leurs appels au niveau du lot ont été modifiés pour être performants. Soutenir les appels au niveau des lots pour ParameterServerStrategy est prévu.
  • Pour la même raison, contrairement à d'autres stratégies, la barre de progression et les métriques ne sont enregistrées qu'aux limites des époques.
  • run_eagerly n'est pas pris en charge.

Spécificités de la boucle d'entraînement personnalisée