Multi-Worker-Training mit Estimator

Auf TensorFlow.org ansehen In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Überblick

Dieses Tutorial zeigt , wie tf.distribute.Strategy kann für verteilte Multi-Arbeiterausbildung mit verwendet werden tf.estimator . Wenn Sie Ihren Code schreiben tf.estimator , und Sie sind in Skalierung über eine einzige Maschine mit hohen Leistung interessiert ist , ist dieses Tutorial für Sie.

Bevor Sie beginnen, lesen Sie bitte die Vertriebsstrategie Führung. Das Multi-GPU - Lernprogramm ist auch relevant, weil dieses Tutorial das gleiche Modell verwendet.

Aufstellen

Richten Sie zunächst TensorFlow und die erforderlichen Importe ein.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Eingabefunktion

Dieses Tutorial verwendet die MNIST - Datensatz aus TensorFlow Datensätze . Der Code ist hier ähnlich wie das Multi-GPU - Training Tutorial mit einer wesentlichen Unterschied: wenn Estimator mit der Mehrarbeiterausbildung ist es notwendig , den Datensatz durch die Anzahl der Arbeiter Scherbe Modell Konvergenz zu gewährleisten. Die Eingangsdaten werden durch Arbeiter sharded Index, so dass jeder Arbeiter verarbeitet 1/num_workers unterschiedliche Abschnitte des Datensatzes.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Ein anderer vernünftiger Ansatz, um Konvergenz zu erreichen, besteht darin, den Datensatz mit unterschiedlichen Seeds bei jedem Worker zu mischen.

Multi-Worker-Konfiguration

Einer der wichtigsten Unterschiede in diesem Tutorial ( im Vergleich zum Training Tutorial Multi-GPU ) ist der Multi-Arbeiter - Setup. Die TF_CONFIG Umgebungsvariable ist der Standard - Weg , um die Cluster - Konfiguration für jeden Arbeiter , um anzugeben, den Teil des Clusters ist.

Es gibt zwei Komponenten von TF_CONFIG : cluster und task . cluster liefert Informationen über den gesamten Cluster, nämlich die Arbeiter und Parameter - Server im Cluster. task gibt Auskunft über die aktuelle Aufgabe. Die erste Komponente cluster ist die gleiche für alle Arbeiter und Parameter - Server in dem Cluster, und die zweite Komponente task unterscheidet sich auf jeden Arbeiter und Parameter - Server und gibt seinen eigenen type und index . In diesem Beispiel ist die Aufgabe type ist worker und der Task - index ist 0 .

Zur Verdeutlichung dieses Tutorial zeigt , wie ein setzen TF_CONFIG mit 2 Arbeitern auf localhost . In der Praxis würden Sie mehrere Arbeiter auf einer externen IP - Adresse und Port und Satz erstellen TF_CONFIG jedem Arbeitnehmer in geeigneter Weise , modifizieren , dh die Aufgabe index .

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Definiere das Modell

Schreiben Sie die Ebenen, den Optimierer und die Verlustfunktion für das Training. Dieses Tutorial definiert das Modell mit Keras Schichten, ähnlich wie das Multi-GPU - Training Tutorial .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Um das Modell zu trainieren, eine Instanz von tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy erstellt Kopien aller Variablen in den Schichten des Modells auf jedem Gerät über alle Arbeitnehmer. Es verwendet CollectiveOps , eine TensorFlow op für gemeinsame Kommunikation, zu aggregieren und halten Gradienten der Variablen synchron. Die tf.distribute.Strategy Führung hat weitere Details zu dieser Strategie.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_1884/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Trainieren und bewerten Sie das Modell

Als nächstes geben Sie die Vertriebsstrategie in der RunConfig für den Schätzer, und trainieren und zu bewerten unter Berufung auf tf.estimator.train_and_evaluate . Dieses Tutorial verteilt nur die Ausbildung von der Strategie über die Angabe train_distribute . Es ist auch möglich , die Auswertung über verteilen eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7fa86c4c8950>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:374: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2021-09-09 01:25:08.941607: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2021-09-09 01:25:08.942715: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.135354.
INFO:tensorflow:Loss for final step: 1.135354.
({'loss': 2.2692595, 'global_step': 938}, [])

Trainingsleistung optimieren

Sie haben nun ein Modell und ein Multi-Arbeiter durch angetrieben fähig Estimator tf.distribute.Strategy . Sie können die folgenden Techniken ausprobieren, um die Leistung des Multi-Worker-Trainings zu optimieren:

  • Erhöhen Sie die Losgröße: Die Chargengröße angegeben hier ist pro-GPU. Generell empfiehlt sich die größte Batchgröße, die in den GPU-Speicher passt.
  • Guss Variablen: Guss die Variablen tf.float , wenn möglich. Das offizielle RESNET Modell enthält ein Beispiel dafür , wie dies erreicht werden kann.
  • Verwenden kollektive Kommunikation: MultiWorkerMirroredStrategy mehrere bietet Implementierungen kollektive Kommunikation .

    • RING Arbeitsgeräte ringbasierte Kollektive Verwendung gRPC wie die Quer Host - Kommunikationsschicht.
    • NCCL nutzt Nvidias NCCL Kollektive zu implementieren.
    • AUTO aufschiebt die Wahl der Laufzeit.

    Die beste Wahl der kollektiven Implementierung hängt von der Anzahl und Art der GPUs und der Netzwerkverbindung im Cluster ab. Um die automatische Auswahl außer Kraft setzen, geben Sie einen gültigen Wert auf die communication von MultiWorkerMirroredStrategy ‚s Konstruktor, zB communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Besuchen Sie den Abschnitt Leistung in der Führung erfahren Sie mehr über andere Strategien und Tools Sie verwenden können , um die Leistung Ihrer TensorFlow Modelle zu optimieren.

Andere Codebeispiele

  1. End - to - End-Beispiel für Multi Arbeiterausbildung in tensorflow / Ökosystem mit Kubernetes Vorlagen. Dieses Beispiel beginnt mit einem Keras Modell und wandelt es in ein Estimator der Verwendung tf.keras.estimator.model_to_estimator API.
  2. Offizielle Modelle , von denen viele so konfiguriert werden , können mehrere Vertriebsstrategien laufen.