Mam pytanie? Połącz się ze społecznością na Forum TensorFlow Odwiedź Forum

Szkolenie dla wielu pracowników z Estymatorem

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło w serwisie GitHub Pobierz notatnik

Przegląd

Ten samouczek pokazuje, jak tf.distribute.Strategy można wykorzystać do rozproszonego szkolenia wielu pracowników za pomocą tf.estimator . Jeśli piszesz kod za pomocą tf.estimator i interesuje Cię skalowanie wykraczające poza jedną maszynę o wysokiej wydajności, ten samouczek jest dla Ciebie.

Zanim zaczniesz, przeczytaj przewodnik po strategii dystrybucji . Samouczek szkoleniowy dla wielu procesorów graficznych jest również istotny, ponieważ ten samouczek używa tego samego modelu.

Ustawiać

Najpierw skonfiguruj TensorFlow i niezbędne importy.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Funkcja wejścia

W tym samouczku zastosowano zestaw danych MNIST z zestawów danych TensorFlow . Kod w tym miejscu jest podobny do samouczka szkoleniowego dla wielu procesorów graficznych z jedną kluczową różnicą: podczas korzystania z narzędzia Estimator do szkolenia wielu pracowników konieczne jest podzielenie zestawu danych na fragmenty według liczby pracowników, aby zapewnić zbieżność modelu. Dane wejściowe są podzielone na fragmenty według indeksu pracownika, dzięki czemu każdy pracownik przetwarza 1/num_workers odrębnych części zestawu danych.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Innym rozsądnym podejściem do osiągnięcia konwergencji byłoby przetasowanie zbioru danych z odrębnymi nasionami u każdego pracownika.

Konfiguracja wielu pracowników

Jedną z kluczowych różnic w tym samouczku (w porównaniu z samouczkiem szkoleniowym dla wielu procesorów graficznych ) jest konfiguracja wielu pracowników. TF_CONFIG środowiskowa TF_CONFIG to standardowy sposób określania konfiguracji klastra dla każdego TF_CONFIG który jest częścią klastra.

Istnieją dwa składniki TF_CONFIG : cluster i task . cluster dostarcza informacji o całym klastrze, a mianowicie o pracownikach i serwerach parametrów w klastrze. task zawiera informacje o bieżącym zadaniu. Pierwszy cluster składników jest taki sam dla wszystkich procesów roboczych i serwerów parametrów w klastrze, a drugie task składnika jest inne na każdym serwerze roboczym i serwerze parametrów i określa własny type i index . W tym przykładzie type zadania to worker a index zadania to 0 .

Na potrzeby ilustracji w tym samouczku pokazano, jak ustawić TF_CONFIG z 2 pracownikami na localhost . W praktyce należałoby utworzyć wielu pracowników na zewnętrznym adresie IP i porcie i odpowiednio ustawić TF_CONFIG dla każdego pracownika, tj. Zmodyfikować index zadań.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Zdefiniuj model

Napisz warstwy, optymalizator i funkcję straty do treningu. W tym samouczku zdefiniowano model z warstwami Keras, podobnie jak w samouczku szkoleniowym dotyczącym wielu GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Aby wytrenować model, użyj instancji tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy tworzy kopie wszystkich zmiennych w warstwach modelu na każdym urządzeniu dla wszystkich pracowników. Wykorzystuje CollectiveOps , opcję TensorFlow do komunikacji zbiorowej, aby agregować gradienty i synchronizować zmienne. Więcej informacji na temat tej strategii zawiera przewodnik tf.distribute.Strategy .

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From <ipython-input-1-f1f424df316e>:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Trenuj i oceniaj model

Następnie określ strategię dystrybucji w RunConfig dla estymatora oraz tf.estimator.train_and_evaluate i oceń, wywołując tf.estimator.train_and_evaluate . Ten samouczek rozprowadza tylko szkolenie, określając strategię za pośrednictwem train_distribute . Możliwe jest również rozpowszechnianie oceny za pośrednictwem eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f262c6db3d0>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.2978928, step = 0
INFO:tensorflow:loss = 2.2978928, step = 0
INFO:tensorflow:global_step/sec: 262.574
INFO:tensorflow:global_step/sec: 262.574
INFO:tensorflow:loss = 2.2917323, step = 100 (0.383 sec)
INFO:tensorflow:loss = 2.2917323, step = 100 (0.383 sec)
INFO:tensorflow:global_step/sec: 294.878
INFO:tensorflow:global_step/sec: 294.878
INFO:tensorflow:loss = 2.3222418, step = 200 (0.339 sec)
INFO:tensorflow:loss = 2.3222418, step = 200 (0.339 sec)
INFO:tensorflow:global_step/sec: 293.405
INFO:tensorflow:global_step/sec: 293.405
INFO:tensorflow:loss = 2.2923741, step = 300 (0.341 sec)
INFO:tensorflow:loss = 2.2923741, step = 300 (0.341 sec)
INFO:tensorflow:global_step/sec: 292.863
INFO:tensorflow:global_step/sec: 292.863
INFO:tensorflow:loss = 2.3176103, step = 400 (0.344 sec)
INFO:tensorflow:loss = 2.3176103, step = 400 (0.344 sec)
INFO:tensorflow:global_step/sec: 322.762
INFO:tensorflow:global_step/sec: 322.762
INFO:tensorflow:loss = 2.281468, step = 500 (0.307 sec)
INFO:tensorflow:loss = 2.281468, step = 500 (0.307 sec)
INFO:tensorflow:global_step/sec: 404.54
INFO:tensorflow:global_step/sec: 404.54
INFO:tensorflow:loss = 2.2791018, step = 600 (0.247 sec)
INFO:tensorflow:loss = 2.2791018, step = 600 (0.247 sec)
INFO:tensorflow:global_step/sec: 404.645
INFO:tensorflow:global_step/sec: 404.645
INFO:tensorflow:loss = 2.2914894, step = 700 (0.248 sec)
INFO:tensorflow:loss = 2.2914894, step = 700 (0.248 sec)
INFO:tensorflow:global_step/sec: 394.163
INFO:tensorflow:global_step/sec: 394.163
INFO:tensorflow:loss = 2.292805, step = 800 (0.253 sec)
INFO:tensorflow:loss = 2.292805, step = 800 (0.253 sec)
INFO:tensorflow:global_step/sec: 623.428
INFO:tensorflow:global_step/sec: 623.428
INFO:tensorflow:loss = 2.2710214, step = 900 (0.160 sec)
INFO:tensorflow:loss = 2.2710214, step = 900 (0.160 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-05-19T11:21:05
INFO:tensorflow:Starting evaluation at 2021-05-19T11:21:05
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 0.86388s
INFO:tensorflow:Inference Time : 0.86388s
INFO:tensorflow:Finished evaluation at 2021-05-19-11:21:06
INFO:tensorflow:Finished evaluation at 2021-05-19-11:21:06
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2815592
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2815592
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.1483853.
INFO:tensorflow:Loss for final step: 1.1483853.
({'loss': 2.2815592, 'global_step': 938}, [])

Zoptymalizuj wydajność treningu

Masz teraz model i estymator z tf.distribute.Strategy wielu pracowników, obsługiwany przez tf.distribute.Strategy . Możesz wypróbować następujące techniki, aby zoptymalizować wydajność szkolenia wielu pracowników:

  • Zwiększ rozmiar wsadu: podany tutaj rozmiar wsadu to na-GPU. Ogólnie rzecz biorąc, zalecany jest największy rozmiar wsadu, który mieści się w pamięci GPU.
  • tf.float zmienne: tf.float zmienne do tf.float jeśli to możliwe. Oficjalny model ResNet zawiera przykład, jak można to zrobić.
  • Korzystaj z komunikacji zbiorowej: MultiWorkerMirroredStrategy zapewnia wiele implementacji komunikacji zbiorowej .

    • RING implementuje kolekcje oparte na pierścieniu, używając gRPC jako warstwy komunikacji między hostami.
    • NCCL wykorzystuje NCCL Nvidii do implementacji kolektywów.
    • AUTO odracza wybór do czasu wykonania.

    Najlepszy wybór zbiorczej implementacji zależy od liczby i rodzaju procesorów graficznych oraz połączeń sieciowych w klastrze. Aby nadpisać automatyczny wybór, podaj poprawną wartość parametru communication konstruktora MultiWorkerMirroredStrategy , np. communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Odwiedź sekcję Wydajność w przewodniku, aby dowiedzieć się więcej o innych strategiach i narzędziach, których możesz użyć do optymalizacji wydajności modeli TensorFlow.

Inne przykłady kodu

  1. Kompletny przykład szkolenia wielu pracowników w tensorflow / ekosystemie przy użyciu szablonów Kubernetes. Ten przykład rozpoczyna się od modelu Keras i konwertuje go na estymator przy użyciu interfejsu API tf.keras.estimator.model_to_estimator .
  2. Oficjalne modele , z których wiele można skonfigurować do obsługi wielu strategii dystrybucji.