Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Rozproszone szkolenie z TensorFlow

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

tf.distribute.Strategy to TensorFlow API do dystrybucji szkoleń na wiele procesorów graficznych, wiele maszyn lub TPU. Korzystając z tego interfejsu API, możesz rozpowszechniać istniejące modele i kod szkoleniowy przy minimalnych zmianach kodu.

tf.distribute.Strategy została zaprojektowana z myślą o następujących kluczowych celach:

  • Łatwy w użyciu i obsługuje wiele segmentów użytkowników, w tym badaczy, inżynierów ML itp.
  • Zapewniają dobrą wydajność po wyjęciu z pudełka.
  • Łatwe przełączanie między strategiami.

tf.distribute.Strategy może być używany z interfejsem API wysokiego poziomu, takim jak Keras , a także może być używany do dystrybucji niestandardowych pętli szkoleniowych (i ogólnie wszelkich obliczeń wykorzystujących TensorFlow).

W TensorFlow 2.x możesz uruchamiać programy z zapałem lub na wykresie za pomocą tf.function . tf.distribute.Strategy zamierza obsługiwać oba te tryby wykonywania, ale najlepiej działa z tf.function . Tryb Eager jest zalecany tylko do debugowania i nie jest obsługiwany przez TPUStrategy . Chociaż omawiamy szkolenie przez większość czasu w tym przewodniku, ten interfejs API może być również używany do dystrybucji oceny i prognozowania na różnych platformach.

Możesz używać tf.distribute.Strategy z bardzo niewielkimi zmianami w kodzie, ponieważ zmieniliśmy podstawowe komponenty TensorFlow, aby były świadome strategii. Obejmuje to zmienne, warstwy, modele, optymalizatory, metryki, podsumowania i punkty kontrolne.

W tym przewodniku wyjaśniamy różne rodzaje strategii i wyjaśniamy, jak można ich używać w różnych sytuacjach.

# Import TensorFlow
import tensorflow as tf

Rodzaje strategii

tf.distribute.Strategy zamierza objąć szereg przypadków użycia na różnych osiach. Niektóre z tych kombinacji są obecnie obsługiwane, a inne zostaną dodane w przyszłości. Niektóre z tych osi to:

  • Trenowanie synchroniczne i asynchroniczne: są to dwa typowe sposoby dystrybucji szkolenia z równoległością danych. W przypadku treningu synchronicznego wszyscy pracownicy szkolą się na różnych fragmentach danych wejściowych w synchronizacji i agregując gradienty na każdym kroku. W przypadku szkolenia asynchronicznego wszyscy pracownicy niezależnie uczą się danych wejściowych i asynchronicznie aktualizują zmienne. Zwykle szkolenie w zakresie synchronizacji jest obsługiwane przez redukcję wszystkich elementów i asynchronizację poprzez architekturę serwera parametrów.
  • Platforma sprzętowa: możesz chcieć skalować swoje szkolenie na wiele procesorów graficznych na jednej maszynie lub na wiele maszyn w sieci (z 0 lub więcej procesorami graficznymi w każdym) lub na Cloud TPU.

Aby wspierać te przypadki użycia, dostępnych jest sześć strategii. W następnej sekcji wyjaśnimy, które z nich są obsługiwane w których scenariuszach w TF 2.2 w tym momencie. Oto krótki przegląd:

Szkolenia API MirroredStrategy Strategia TPUS MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne Obsługiwane planowane stanowisko 2.3
Niestandardowa pętla treningowa Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne Obsługiwane planowane stanowisko 2.3
Estimator API Ograniczone wsparcie Nieobsługiwany Ograniczone wsparcie Ograniczone wsparcie Ograniczone wsparcie

MirroredStrategy

tf.distribute.MirroredStrategy obsługuje synchroniczne rozproszone szkolenie na wielu procesorach graficznych na jednym komputerze. Tworzy jedną replikę na każde urządzenie GPU. Każda zmienna w modelu jest odzwierciedlana we wszystkich replikach. Zmienne te razem tworzą pojedynczą zmienną koncepcyjną o nazwie MirroredVariable . Te zmienne są ze sobą zsynchronizowane, stosując identyczne aktualizacje.

Do przekazywania aktualizacji zmiennych między urządzeniami używane są wydajne algorytmy all-RED. Zmniejsz wszystkie agregaty tensorów na wszystkich urządzeniach, dodając je i udostępniaj na każdym urządzeniu. Jest to połączony algorytm, który jest bardzo wydajny i może znacznie zmniejszyć obciążenie związane z synchronizacją. Dostępnych jest wiele algorytmów i implementacji typu all-RED, w zależności od typu komunikacji dostępnej między urządzeniami. Domyślnie używa NVIDIA NCCL jako implementacji all-RED. Możesz wybrać jedną z kilku innych oferowanych przez nas opcji lub napisać własną.

Oto najprostszy sposób tworzenia MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Spowoduje to utworzenie instancji MirroredStrategy która będzie używać wszystkich procesorów graficznych widocznych dla TensorFlow i używać NCCL jako komunikacji między urządzeniami.

Jeśli chcesz używać tylko niektórych procesorów graficznych na swoim komputerze, możesz to zrobić w następujący sposób:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Jeśli chcesz przesłonić komunikację między urządzeniami, możesz to zrobić za pomocą argumentu cross_device_ops , dostarczając instancję tf.distribute.CrossDeviceOps . Obecnie tf.distribute.HierarchicalCopyAllReduce i tf.distribute.ReductionToOneDevice to dwie opcje inne niż tf.distribute.NcclAllReduce które jest wartością domyślną.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Strategia TPUS

tf.distribute.TPUStrategy umożliwia prowadzenie szkolenia TensorFlow na jednostkach przetwarzania tensorowego (TPU). TPU to wyspecjalizowane układy ASIC firmy Google zaprojektowane w celu radykalnego przyspieszenia obciążeń systemów uczących się. Są dostępne w Google Colab, TensorFlow Research Cloud i Cloud TPU .

Pod względem architektury rozproszonego szkolenia TPUStrategy jest tą samą MirroredStrategy - implementuje synchroniczne rozproszone szkolenie. Jednostki TPU zapewniają własną implementację wydajnych operacji all-REDUCH i innych zbiorczych operacji na wielu rdzeniach TPU, które są wykorzystywane w TPUStrategy .

Oto jak można utworzyć instancję TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

Instancja TPUClusterResolver pomaga zlokalizować jednostki TPU. W Colab nie musisz podawać żadnych argumentów.

Jeśli chcesz użyć tego dla Cloud TPU:

  • Musisz określić nazwę swojego zasobu TPU w argumencie tpu .
  • Musisz jawnie zainicjować system tpu na początku programu. Jest to wymagane, zanim jednostki TPU będą mogły być używane do obliczeń. Inicjalizacja systemu tpu powoduje również wyczyszczenie pamięci TPU, dlatego ważne jest, aby najpierw wykonać ten krok, aby uniknąć utraty stanu.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy jest bardzo podobny do MirroredStrategy . Wdraża synchroniczne rozproszone szkolenie dla wielu pracowników, z których każdy może mieć wiele procesorów graficznych. Podobnie jak MirroredStrategy , tworzy kopie wszystkich zmiennych w modelu na każdym urządzeniu dla wszystkich procesów roboczych.

Używa CollectiveOps jako metody komunikacji obejmującej wiele pracowników, która pozwala na synchronizację zmiennych. Operacja zbiorowa to pojedyncza operacja na wykresie TensorFlow, która może automatycznie wybrać algorytm all-RED w środowisku wykonawczym TensorFlow w zależności od sprzętu, topologii sieci i rozmiarów tensorów.

Implementuje również dodatkowe optymalizacje wydajności. Na przykład obejmuje statyczną optymalizację, która przekształca wiele wszystkich redukcji na małych tensorach na mniejszą liczbę wszystkich redukcji na większych tensorach. Ponadto projektujemy go tak, aby miał architekturę wtyczek - dzięki czemu w przyszłości będziesz mógł podłączać algorytmy lepiej dostosowane do Twojego sprzętu. Należy zauważyć, że operacje zbiorowe realizują również inne operacje zbiorowe, takie jak nadawanie i gromadzenie wszystkich danych.

Oto najprostszy sposób tworzenia MultiWorkerMirroredStrategy :

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

Obecnie MultiWorkerMirroredStrategy pozwala na wybór pomiędzy dwoma różnymi implementacjami operacji zbiorowych. CollectiveCommunication.RING implementuje CollectiveCommunication.RING oparte na pierścieniu, używając gRPC jako warstwy komunikacyjnej. CollectiveCommunication.NCCL wykorzystuje NCCL firmy Nvidia do implementacji kolektywów. CollectiveCommunication.AUTO odracza wybór do środowiska wykonawczego. Najlepszy wybór zbiorczej implementacji zależy od liczby i rodzaju procesorów graficznych oraz połączeń sieciowych w klastrze. Możesz je określić w następujący sposób:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

Jedną z kluczowych różnic między szkoleniem dla wielu pracowników w porównaniu ze szkoleniem z wieloma GPU jest konfiguracja dla wielu pracowników. TF_CONFIG środowiskowa TF_CONFIG jest standardowym sposobem w TensorFlow do określania konfiguracji klastra dla każdego procesu roboczego, który jest częścią klastra. Dowiedz się więcej o konfigurowaniu TF_CONFIG .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy również przeprowadza trening synchroniczny. Zmienne nie są dublowane, zamiast tego są umieszczane na CPU, a operacje są replikowane na wszystkich lokalnych GPU. Jeśli jest tylko jeden GPU, wszystkie zmienne i operacje zostaną umieszczone na tym GPU.

Utwórz instancję CentralStorageStrategy przez:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Spowoduje to utworzenie instancji CentralStorageStrategy która będzie wykorzystywać wszystkie widoczne GPU i CPU. Aktualizacja zmiennych w replikach zostanie zagregowana przed zastosowaniem do zmiennych.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy obsługuje szkolenie serwerów parametrów na wielu maszynach. W tej konfiguracji niektóre maszyny są wyznaczane jako pracownicy, a inne jako serwery parametrów. Każda zmienna modelu jest umieszczona na jednym serwerze parametrów. Obliczenia są replikowane na wszystkich procesorach graficznych wszystkich pracowników.

Pod względem kodu wygląda podobnie do innych strategii:

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

W przypadku szkolenia wielu pracowników TF_CONFIG musi określić konfigurację serwerów parametrów i pracowników w klastrze, o których można przeczytać więcej w TF_CONFIG poniżej .

Inne strategie

Oprócz powyższych strategii istnieją dwie inne strategie, które mogą być przydatne do tworzenia prototypów i debugowania podczas korzystania z interfejsów API tf.distribute .

Strategia domyślna

Strategia domyślna to strategia dystrybucji, która jest obecna, gdy w zakresie nie ma jawnej strategii dystrybucji. Implementuje interfejs tf.distribute.Strategy , ale jest tranzytem i nie zapewnia rzeczywistej dystrybucji. Na przykład strategy.run(fn) wywoła po prostu fn . Kod napisany przy użyciu tej strategii powinien zachowywać się dokładnie tak, jak kod napisany bez żadnej strategii. Możesz myśleć o tym jako o strategii „no-op”.

Strategia domyślna jest singletonem - i nie można tworzyć jej więcej. Można go uzyskać za pomocą tf.distribute.get_strategy() poza zakresem dowolnej jawnej strategii (ten sam interfejs API, którego można użyć do pobrania bieżącej strategii w zakresie jawnej strategii).

default_strategy = tf.distribute.get_strategy()

Ta strategia służy dwóm głównym celom:

  • Pozwala bezwarunkowo pisać kod biblioteki obsługującej dystrybucję. Na przykład w optymalizatorze możemy zrobić tf.distribute.get_strategy() i użyć tej strategii do redukcji gradientów - zawsze zwróci ona obiekt strategii, na którym możemy wywołać redukuj API.
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Podobnie jak kod biblioteki, może być używany do pisania programów użytkowników końcowych do pracy ze strategią dystrybucji i bez niej, bez konieczności stosowania logiki warunkowej. Przykładowy fragment kodu ilustrujący to:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy to strategia umieszczania wszystkich zmiennych i obliczeń na jednym określonym urządzeniu.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Strategia ta różni się od strategii domyślnej pod wieloma względami. W strategii domyślnej logika umieszczania zmiennych pozostaje niezmieniona w porównaniu z uruchomionym TensorFlow bez żadnej strategii dystrybucji. Ale podczas korzystania z OneDeviceStrategy wszystkie zmienne utworzone w jego zakresie są jawnie umieszczane na określonym urządzeniu. Ponadto wszelkie funkcje wywoływane przez OneDeviceStrategy.run również zostaną umieszczone na określonym urządzeniu.

Dane wejściowe dystrybuowane za pomocą tej strategii zostaną wstępnie pobrane do określonego urządzenia. W domyślnej strategii nie ma dystrybucji danych wejściowych.

Podobnie jak w przypadku strategii domyślnej, strategia ta może być również używana do testowania kodu przed przejściem na inne strategie, które faktycznie są dystrybuowane do wielu urządzeń / maszyn. Spowoduje to ćwiczenie mechanizmu strategii dystrybucji w nieco większym stopniu niż strategia domyślna, ale nie w pełnym zakresie, jak przy użyciu MirroredStrategy lub TPUStrategy itp. Jeśli chcesz, aby kod zachowywał się tak, jakby nie miał strategii, użyj strategii domyślnej.

Do tej pory rozmawialiśmy o tym, jakie są dostępne różne strategie i jak można je zastosować. W kilku następnych sekcjach omówimy różne sposoby ich wykorzystania do dystrybucji treningu. W tym przewodniku pokażemy krótkie fragmenty kodu i link do pełnych samouczków, które możesz uruchomić od końca do końca.

Korzystanie tf.distribute.Strategy z tf.keras.Model.fit

Zintegrowaliśmy tf.distribute.Strategy z tf.keras które jest implementacją specyfikacji API Keras przez TensorFlow . tf.keras to interfejs API wysokiego poziomu do tworzenia i trenowania modeli. Poprzez integrację z tf.keras sprawiliśmy, że bezproblemowo rozpowszechniamy szkolenie napisane w ramach szkoleniowych Keras przy użyciu model.fit .

Oto, co musisz zmienić w swoim kodzie:

  1. Utwórz instancję odpowiedniego tf.distribute.Strategy .
  2. Przenieś tworzenie modelu Keras, optymalizatora i metryk w ramach strategy.scope . strategy.scope .

Obsługujemy wszystkie typy modeli Keras - sekwencyjne, funkcjonalne i podklasowe.

Oto fragment kodu, który to robi dla bardzo prostego modelu Keras z jedną gęstą warstwą:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

W tym przykładzie użyliśmy MirroredStrategy więc możemy uruchomić to na maszynie z wieloma GPU. strategy.scope() wskazuje Kerasowi, której strategii użyć do dystrybucji szkolenia. Tworzenie modeli / optymalizatorów / metryk w tym zakresie pozwala nam tworzyć zmienne rozproszone zamiast zwykłych zmiennych. Po skonfigurowaniu możesz dopasować model tak, jak zwykle. MirroredStrategy zajmuje się replikacją treningu modelu na dostępnych GPU, agregacją gradientów i nie tylko.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 3.4059
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 1.5054
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 0.9349

0.9348920583724976

Tutaj użyliśmy tf.data.Dataset aby zapewnić dane wejściowe dotyczące szkolenia i oceny. Możesz także użyć tablic numpy:

import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.6654
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.2941

<tensorflow.python.keras.callbacks.History at 0x7f8dd86cbcc0>

W obu przypadkach (zbiór danych lub numpy) każda partia danych wejściowych jest równo dzielona między wiele replik. Na przykład, jeśli używasz MirroredStrategy z 2 GPU, każda partia o rozmiarze 10 zostanie podzielona między 2 GPU, a każdy z nich otrzyma 5 przykładów danych wejściowych w każdym kroku. Każda epoka będzie trenować szybciej, gdy dodasz więcej GPU. Zwykle chciałbyś zwiększyć rozmiar partii, dodając więcej akceleratorów, aby efektywnie wykorzystać dodatkową moc obliczeniową. Będziesz także musiał ponownie dostroić współczynnik uczenia się, w zależności od modelu. Możesz użyć strategy.num_replicas_in_sync aby uzyskać liczbę replik.

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

Co jest teraz obsługiwane?

Szkolenia API MirroredStrategy Strategia TPUS MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Interfejsy API Keras Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne Wsparcie planowane stanowisko 2.3

Przykłady i samouczki

Oto lista samouczków i przykładów, które ilustrują powyższą integrację od końca do końca z Kerasem:

  1. Samouczek do trenowania MNIST z MirroredStrategy .
  2. Samouczek do trenowania MNIST przy użyciu MultiWorkerMirroredStrategy .
  3. Przewodnik po szkoleniu MNIST z wykorzystaniem TPUStrategy .
  4. TensorFlow model Garden repozytorium zawierającego zbiory state-of-the-art modeli realizowanych przy użyciu różnych strategii.

Korzystanie z tf.distribute.Strategy z niestandardowymi pętlami szkoleniowymi

Jak widzieliśmy, użycie tf.distribute.Strategy z Keras model.fit wymaga zmiany tylko kilku wierszy kodu. Przy odrobinie wysiłku możesz również użyć tf.distribute.Strategy z niestandardowymi pętlami treningowymi.

Jeśli potrzebujesz większej elastyczności i kontroli nad pętlami treningowymi niż jest to możliwe w przypadku Estimatora lub Keras, możesz napisać własne pętle treningowe. Na przykład, korzystając z GAN, możesz chcieć wykonać inną liczbę kroków generatora lub dyskryminatora w każdej rundzie. Podobnie, ramy wysokiego poziomu nie nadają się zbytnio do szkolenia ze wzmocnieniem.

Aby obsługiwać niestandardowe pętle szkoleniowe, udostępniamy podstawowy zestaw metod za tf.distribute.Strategy klas tf.distribute.Strategy . Korzystanie z nich może początkowo wymagać niewielkiej restrukturyzacji kodu, ale gdy to zrobisz, powinieneś być w stanie przełączać się między GPU, TPU i wieloma maszynami, po prostu zmieniając instancję strategii.

Tutaj pokażemy krótki fragment ilustrujący ten przypadek użycia dla prostego przykładu szkoleniowego przy użyciu tego samego modelu Keras jak poprzednio.

Najpierw tworzymy model i optymalizator w zakresie strategii. Dzięki temu wszelkie zmienne utworzone za pomocą modelu i optymalizatora są zmiennymi lustrzanymi.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Następnie tworzymy wejściowy zbiór danych i wywołujemy tf.distribute.Strategy.experimental_distribute_dataset celu dystrybucji zbioru danych w oparciu o strategię.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

Następnie określamy jeden etap szkolenia. Będziemy używać tf.GradientTape do obliczania gradientów i optymalizatora do stosowania tych gradientów do aktualizacji zmiennych naszego modelu. Aby rozpowszechnić ten krok szkoleniowy, train_step funkcję train_step i przekazujemy ją do tf.distrbute.Strategy.run wraz z danymi wejściowymi zestawu danych, które otrzymujemy z utworzonego wcześniej dist_dataset :

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Kilka innych rzeczy, o których należy pamiętać w powyższym kodzie:

  1. Do obliczenia straty użyliśmy tf.nn.compute_average_loss . tf.nn.compute_average_loss sumuje straty na przykład i dzieli sumę przez global_batch_size. Jest to ważne, ponieważ później, po obliczeniu gradientów na każdej replice, są one agregowane w replikach poprzez zsumowanie .
  2. tf.distribute.Strategy.reduce interfejsu API tf.distribute.Strategy.reduce do agregacji wyników zwróconych przez tf.distribute.Strategy.run . tf.distribute.Strategy.run zwraca wyniki z każdej lokalnej repliki w strategii i istnieje wiele sposobów wykorzystania tego wyniku. Możesz je reduce , aby uzyskać zagregowaną wartość. Możesz również wykonać tf.distribute.Strategy.experimental_local_results aby uzyskać listę wartości zawartych w wyniku, po jednej na lokalną replikę.
  3. Gdy apply_gradients jest wywoływana w zakresie strategii dystrybucji, jego zachowanie jest modyfikowane. W szczególności przed zastosowaniem gradientów w każdej równoległej instancji podczas treningu synchronicznego wykonuje sumaryczne repliki wszystkich gradientów.

Na koniec, po zdefiniowaniu kroku szkoleniowego, możemy wykonać iterację po dist_dataset i uruchomić szkolenie w pętli:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.3262986, shape=(), dtype=float32)
tf.Tensor(0.32475147, shape=(), dtype=float32)
tf.Tensor(0.3232167, shape=(), dtype=float32)
tf.Tensor(0.32169423, shape=(), dtype=float32)
tf.Tensor(0.32018384, shape=(), dtype=float32)
tf.Tensor(0.3186855, shape=(), dtype=float32)
tf.Tensor(0.317199, shape=(), dtype=float32)
tf.Tensor(0.31572425, shape=(), dtype=float32)
tf.Tensor(0.31426117, shape=(), dtype=float32)
tf.Tensor(0.31280956, shape=(), dtype=float32)
tf.Tensor(0.3113694, shape=(), dtype=float32)
tf.Tensor(0.30994043, shape=(), dtype=float32)
tf.Tensor(0.30852267, shape=(), dtype=float32)
tf.Tensor(0.30711594, shape=(), dtype=float32)
tf.Tensor(0.30572012, shape=(), dtype=float32)
tf.Tensor(0.30433518, shape=(), dtype=float32)
tf.Tensor(0.3029609, shape=(), dtype=float32)
tf.Tensor(0.30159724, shape=(), dtype=float32)
tf.Tensor(0.30024406, shape=(), dtype=float32)
tf.Tensor(0.29890123, shape=(), dtype=float32)

W powyższym przykładzie dist_dataset po dist_dataset aby zapewnić dane wejściowe do treningu. tf.distribute.Strategy.make_experimental_numpy_dataset również tf.distribute.Strategy.make_experimental_numpy_dataset danych tf.distribute.Strategy.make_experimental_numpy_dataset do obsługi danych wejściowych numpy. Możesz użyć tego interfejsu API do utworzenia zestawu danych przed wywołaniem tf.distribute.Strategy.experimental_distribute_dataset .

Innym sposobem iteracji danych jest jawne użycie iteratorów. Możesz to zrobić, jeśli chcesz wykonać określoną liczbę kroków, a nie iterować po całym zestawie danych. Powyższy iteracji będzie teraz być modyfikowana najpierw utworzyć iterator a następnie jawnie wywołać next na to, aby uzyskać dane wejściowe.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.2975687, shape=(), dtype=float32)
tf.Tensor(0.2962464, shape=(), dtype=float32)
tf.Tensor(0.29493415, shape=(), dtype=float32)
tf.Tensor(0.29363185, shape=(), dtype=float32)
tf.Tensor(0.2923394, shape=(), dtype=float32)
tf.Tensor(0.29105672, shape=(), dtype=float32)
tf.Tensor(0.28978375, shape=(), dtype=float32)
tf.Tensor(0.28852034, shape=(), dtype=float32)
tf.Tensor(0.2872664, shape=(), dtype=float32)
tf.Tensor(0.28602186, shape=(), dtype=float32)

Obejmuje to najprostszy przypadek użycia interfejsu API tf.distribute.Strategy do dystrybucji niestandardowych pętli szkoleniowych. Jesteśmy w trakcie ulepszania tych interfejsów API. Ponieważ ten przypadek użycia wymaga więcej pracy, aby dostosować kod, w przyszłości opublikujemy osobny szczegółowy przewodnik.

Co jest teraz obsługiwane?

Szkoleniowy interfejs API MirroredStrategy Strategia TPUS MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Niestandardowa pętla treningowa Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne Wsparcie planowane stanowisko 2.3

Przykłady i samouczki

Oto kilka przykładów użycia strategii dystrybucji z niestandardowymi pętlami szkoleniowymi:

  1. Samouczek do trenowania MNIST przy użyciu MirroredStrategy .
  2. Przewodnik po szkoleniu MNIST z wykorzystaniem TPUStrategy .
  3. TensorFlow model Garden repozytorium zawierającego zbiory state-of-the-art modeli realizowanych przy użyciu różnych strategii.

Korzystanie z tf.distribute.Strategy z estymatorem (ograniczone wsparcie)

tf.estimator to rozproszony, szkoleniowy interfejs API TensorFlow, który pierwotnie obsługiwał podejście serwera parametrów asynchronicznych. Podobnie jak w przypadku Keras, zintegrowaliśmy tf.distribute.Strategy z tf.Estimator . Jeśli używasz Estymatora do szkolenia, możesz łatwo przejść na szkolenie rozproszone, wprowadzając bardzo niewiele zmian w kodzie. Dzięki temu użytkownicy Estimatora mogą teraz przeprowadzać synchroniczne rozproszone szkolenie na wielu procesorach graficznych i wielu pracownikach, a także korzystać z TPU. To wsparcie w Estymatorze jest jednak ograniczone. Aby uzyskać więcej informacji, zobacz sekcję Co jest teraz obsługiwane poniżej.

Użycie tf.distribute.Strategy z tf.distribute.Strategy jest nieco inne niż w przypadku Keras. Zamiast używać strategy.scope , teraz przekazujemy obiekt strategii do RunConfig dla Estymatora.

Oto fragment kodu, który pokazuje to z LinearRegressor estymatorem LinearRegressor i MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpbnyibny6
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpbnyibny6', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8dd83bc518>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8dd83bc518>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Używamy tutaj gotowego Estymatora, ale ten sam kod działa również z niestandardowym Estimatorem. train_distribute określa sposób dystrybucji szkolenia, a eval_distribute określa sposób dystrybucji ewaluacji. To kolejna różnica w stosunku do Keras, gdzie używamy tej samej strategii zarówno do treningu, jak i do ewaluacji.

Teraz możemy wytrenować i ocenić ten estymator za pomocą funkcji wejściowej:

def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd8317ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd8317ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpbnyibny6/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmpbnyibny6/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd83c6730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd83c6730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-09-10T01:28:07Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpbnyibny6/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.24097s
INFO:tensorflow:Finished evaluation at 2020-09-10-01:28:07
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmpbnyibny6/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Kolejną różnicą, którą należy tutaj podkreślić między Estimatorem i Kerasem, jest obsługa danych wejściowych. W Keras wspomnieliśmy, że każda partia zbioru danych jest automatycznie dzielona na wiele replik. Jednak w Estymatorze nie dokonujemy automatycznego dzielenia partii ani nie dzielimy automatycznie danych na różnych pracowników. Masz pełną kontrolę nad sposobem dystrybucji danych między pracownikami i urządzeniami i musisz podać input_fn aby określić sposób dystrybucji danych.

Twoje input_fn jest wywoływane raz na pracownika, co daje jeden zestaw danych na pracownika. Następnie jedna partia z tego zbioru danych jest przekazywana do jednej repliki tego pracownika, zużywając w ten sposób N partii na N replik na 1 pracownika. Innymi słowy, zbiór danych zwrócony przez input_fn powinien zawierać partie o rozmiarze PER_REPLICA_BATCH_SIZE . Globalny rozmiar partii dla kroku można uzyskać jako PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

Podczas szkolenia dla wielu pracowników należy podzielić dane między pracowników lub wymieszać z losowym ziarnem dla każdego z nich. Przykład, jak to zrobić, znajdziesz w Szkoleniu dla wielu pracowników z estymatorem .

W podobny sposób można również stosować strategie wielu pracowników i serwerów parametrów. Kod pozostaje ten sam, ale należy użyć tf.estimator.train_and_evaluate i ustawić TF_CONFIG środowiskowe TF_CONFIG dla każdego TF_CONFIG binarnego działającego w klastrze.

Co jest teraz obsługiwane?

Istnieje ograniczone wsparcie dla szkolenia z TPUStrategy przy użyciu wszystkich strategii z wyjątkiem TPUStrategy . Podstawowe szkolenie i ocena powinny działać, ale wiele zaawansowanych funkcji, takich jak rusztowanie, jeszcze nie działa. Ta integracja może również zawierać wiele błędów. W tej chwili nie planujemy aktywnie ulepszać tego wsparcia, a zamiast tego koncentrujemy się na Keras i niestandardowej obsłudze pętli treningowej. Jeśli to możliwe, powinieneś zamiast tego używać tf.distribute z tymi interfejsami API.

Szkolenia API MirroredStrategy Strategia TPUS MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Estimator API Ograniczone wsparcie Nieobsługiwany Ograniczone wsparcie Ograniczone wsparcie Ograniczone wsparcie

Przykłady i samouczki

Oto kilka przykładów, które pokazują od końca do końca użycie różnych strategii z Estymatorem:

  1. Szkolenie dla wielu pracowników z Estymatorem w celu szkolenia MNIST z wieloma pracownikami przy użyciu MultiWorkerMirroredStrategy .
  2. Kompletny przykład szkolenia wielu pracowników w tensorflow / ekosystemie przy użyciu szablonów Kubernetes. Ten przykład rozpoczyna się od modelu Keras i konwertuje go na tf.keras.estimator.model_to_estimator przy użyciu interfejsu API tf.keras.estimator.model_to_estimator .
  3. Oficjalny model ResNet50 , który można trenować za pomocą MirroredStrategy lub MultiWorkerMirroredStrategy .

Inne tematy

W tej sekcji omówimy kilka tematów, które dotyczą wielu przypadków użycia.

Konfigurowanie zmiennej środowiskowej TF_CONFIG

W przypadku szkolenia wielu pracowników, jak wspomniano wcześniej, należy ustawić TF_CONFIG środowiskową TF_CONFIG dla każdego TF_CONFIG binarnego działającego w klastrze. TF_CONFIG środowiskowa TF_CONFIG to ciąg JSON, który określa, jakie zadania tworzą klaster, ich adresy i rolę każdego zadania w klastrze. Udostępniamy szablon Kubernetes w repozytorium tensorflow / ekosystem, który ustawia TF_CONFIG dla Twoich zadań szkoleniowych.

Istnieją dwa składniki TF_CONFIG: klaster i zadanie. klaster dostarcza informacji o klastrze szkoleniowym, który jest dyktatem składającym się z różnych rodzajów zawodów, takich jak pracownik. W przypadku szkolenia dla wielu pracowników zwykle jeden pracownik przejmuje nieco większą odpowiedzialność, taką jak zapisywanie punktu kontrolnego i pisanie pliku podsumowania dla TensorBoard, oprócz tego, co robi zwykły pracownik. Taki pracownik jest określany mianem „szefa” i jest zwyczajem, że pracownik o indeksie 0 jest mianowany jako główny pracownik (w rzeczywistości w ten sposób wdrażana jest strategia tf.distribute.Strategy). zadanie z drugiej strony dostarcza informacji o bieżącym zadaniu. Pierwszy klaster składników jest taki sam dla wszystkich pracowników, a drugie zadanie składnika jest różne dla każdego pracownika i określa typ i indeks tego pracownika.

Jednym z przykładów TF_CONFIG jest:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Ten TF_CONFIG określa, że ​​w TF_CONFIG istnieją trzy TF_CONFIG i dwa zadania ps wraz z ich hostami i portami. Część „zadanie” określa rolę aktualnego zadania w klastrze, pracownik 1 (drugi pracownik). Prawidłowe role w klastrze to „szef”, „pracownik”, „ps” i „oceniający”. Zadanie "ps" nie powinno istnieć, z wyjątkiem użycia tf.distribute.experimental.ParameterServerStrategy .

Co dalej?

tf.distribute.Strategy jest aktywnie rozwijana. Zapraszamy do wypróbowania go i przekazania opinii za pomocą problemów z usługą GitHub .