Szkolenie dla wielu pracowników z Estimatorem

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Ten samouczek pokazuje, jak tf.distribute.Strategy może być używany do rozproszonego szkolenia dla wielu pracowników za pomocą tf.estimator . Jeśli piszesz kod za pomocą tf.estimator i interesuje Cię skalowanie poza pojedynczą maszynę z wysoką wydajnością, ten samouczek jest dla Ciebie.

Zanim zaczniesz, przeczytaj przewodnik po strategii dystrybucji . Samouczek dotyczący obsługi wielu procesorów graficznych jest również istotny, ponieważ w tym samouczku zastosowano ten sam model.

Ustawiać

Najpierw skonfiguruj TensorFlow i niezbędne importy.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Funkcja wejścia

Ten samouczek używa zestawu danych MNIST z TensorFlow Datasets . Kod w tym miejscu jest podobny do samouczka szkoleniowego dotyczącego wielu procesorów graficznych z jedną kluczową różnicą: w przypadku korzystania z narzędzia Estimator do szkolenia z wieloma pracownikami konieczne jest podzielenie zestawu danych według liczby procesów roboczych w celu zapewnienia zbieżności modelu. Dane wejściowe są dzielone na fragmenty według indeksu procesu roboczego, dzięki czemu każdy proces roboczy przetwarza 1/num_workers różne części zestawu danych.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Innym rozsądnym podejściem do osiągnięcia konwergencji byłoby przetasowanie zestawu danych z odrębnymi nasionami u każdego pracownika.

Konfiguracja wielostanowiskowa

Jedną z kluczowych różnic w tym samouczku (w porównaniu z samouczkiem dotyczącym obsługi wielu procesorów graficznych ) jest konfiguracja wielu pracowników. Zmienna środowiskowa TF_CONFIG to standardowy sposób określania konfiguracji klastra dla każdego procesu roboczego, który jest częścią klastra.

Istnieją dwa składniki TF_CONFIG : cluster i task . cluster udostępnia informacje o całym klastrze, a mianowicie o serwerach roboczych i serwerach parametrów w klastrze. task dostarcza informacji o bieżącym zadaniu. Pierwszy cluster komponentów jest taki sam dla wszystkich procesów roboczych i serwerów parametrów w klastrze, a drugie task komponentu jest inne na każdym serwerze roboczym i serwerze parametrów oraz określa własny type i index . W tym przykładzie type zadania to worker a index zadania to 0 .

W celach ilustracyjnych ten samouczek pokazuje, jak ustawić TF_CONFIG z 2 procesami roboczymi na localhost . W praktyce należałoby utworzyć wielu pracowników na zewnętrznym adresie IP i porcie oraz odpowiednio ustawić TF_CONFIG na każdym pracowniku, tj. zmodyfikować index zadań.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Zdefiniuj model

Napisz warstwy, optymalizator i funkcję straty dla treningu. Ten samouczek definiuje model z warstwami Keras, podobnie jak samouczek dotyczący obsługi wielu procesorów graficznych .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Aby wytrenować model, użyj wystąpienia tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy tworzy kopie wszystkich zmiennych w warstwach modelu na każdym urządzeniu dla wszystkich pracowników. Wykorzystuje CollectiveOps , operację TensorFlow do komunikacji zbiorowej, aby agregować gradienty i synchronizować zmienne. Przewodnik tf.distribute.Strategy zawiera więcej szczegółów na temat tej strategii.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Trenuj i oceniaj model

Następnie określ strategię dystrybucji w RunConfig dla estymatora oraz trenuj i oceniaj, wywołując tf.estimator.train_and_evaluate . Ten samouczek dystrybuuje tylko szkolenie, określając strategię za pośrednictwem train_distribute . Możliwe jest również rozpowszechnianie oceny poprzez eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
use `update_config_proto` instead.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.10881.
INFO:tensorflow:Loss for final step: 1.10881.
({'loss': 2.234131, 'global_step': 938}, [])

Zoptymalizuj wydajność treningu

Masz teraz model i estymator obsługujący wielu pracowników obsługiwany przez tf.distribute.Strategy . Możesz wypróbować następujące techniki, aby zoptymalizować wydajność szkolenia wieloosobowego:

  • Zwiększ rozmiar wsadu: Określony tutaj rozmiar wsadu dotyczy jednostki GPU. Ogólnie zalecany jest największy rozmiar partii, który mieści się w pamięci GPU.
  • Rzutuj zmienne: rzuć zmienne na tf.float , jeśli to możliwe. Oficjalny model ResNet zawiera przykład tego, jak można to zrobić.
  • Korzystaj z komunikacji zbiorowej: MultiWorkerMirroredStrategy zapewnia wiele implementacji komunikacji zbiorowej .

    • RING implementuje kolektywy oparte na pierścieniu, wykorzystując gRPC jako warstwę komunikacji między hostami.
    • NCCL używa NCCL Nvidii do wdrażania kolektywów.
    • AUTO odracza wybór do czasu pracy.

    Najlepszy wybór implementacji zbiorowej zależy od liczby i rodzaju procesorów graficznych oraz połączenia sieciowego w klastrze. Aby pominąć wybór automatyczny, należy podać poprawną wartość parametru communication konstruktora MultiWorkerMirroredStrategy , np. communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Odwiedź sekcję Wydajność w przewodniku, aby dowiedzieć się więcej o innych strategiach i narzędziach , których możesz użyć do optymalizacji wydajności modeli TensorFlow.

Inne przykłady kodu

  1. Kompleksowy przykład szkolenia wielu pracowników w tensorflow/ekosystemie przy użyciu szablonów Kubernetes. Ten przykład zaczyna się od modelu Keras i konwertuje go na estymator przy użyciu interfejsu API tf.keras.estimator.model_to_estimator .
  2. Oficjalne modele , z których wiele można skonfigurować do obsługi wielu strategii dystrybucji.