Odpowiedz już dziś na lokalne wydarzenie TensorFlow Everywhere!
Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Szkolenie serwera parametrów

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Uczenie serwera parametrów jest popularną metodą równoległą do danych w celu skalowania szkolenia modelu na wielu komputerach. Klaster szkoleniowy serwera parametrów składa się z procesów roboczych i serwerów parametrów. Zmienne są tworzone na serwerach parametrów i są odczytywane i aktualizowane przez pracowników na każdym kroku. Domyślnie pracownicy odczytują i aktualizują te zmienne niezależnie, bez synchronizacji między sobą. Dlatego czasami trening parametryczny w stylu serwera nazywany jest treningiem asynchronicznym.

Trening parametrów serwera TensorFlow 2 wykorzystuje centralny koordynator za pośrednictwem klasy tf.distribute.experimental.coordinator.ClusterCoordinator .

W tej implementacji, że worker i parameter server zadania uruchamiane tf.distribute.Server jakoby nasłuchiwać żądań od koordynatora. Koordynator tworzy zasoby, wysyła zadania szkoleniowe, pisze punkty kontrolne i radzi sobie z niepowodzeniami zadań.

Uważamy, że ta architektura i nowa klasa ClusterCoordinator zapewniają bardziej elastyczny i prostszy model programowania.

ClusterCoordinator

Klasa ClusterCoordinator musi działać w połączeniu z obiektem tf.distribute.Strategy . Ten obiekt tf.distribute.Strategy jest potrzebny do przekazywania informacji o klastrze i służy do definiowania kroku szkoleniowego, jak widzieliśmy w niestandardowym szkoleniu z MirroredStrategy . Obiekt ClusterCoordinator następnie wysyła wykonanie tych kroków szkoleniowych do pracowników zdalnych. Obecnie ClusterCoordinator działa tylko z tf.distribute.experimental.ParameterServerStrategy .

Najważniejszym API udostępnianym przez obiekt ClusterCoordinator jest schedule . schedule API kolejkuje funkcję tf.function i tf.function zwraca przyszłą wartość RemoteValue . Funkcje w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a ich wartości RemoteValue zostaną wypełnione asynchronicznie. Ponieważ schedule nie wymaga przypisania pracownika, przekazana tf.function może zostać wykonana na dowolnym dostępnym pracowniku. Jeśli pracownik, na którym jest wykonywana, stanie się niedostępny przed jej zakończeniem, funkcja zostanie ponowiona na innym dostępnym pracowniku. Z tego powodu oraz faktu, że wykonanie funkcji nie jest atomowe, funkcja może być wykonywana więcej niż raz.

Oprócz wysyłania funkcji zdalnych ClusterCoordinator pomaga również w tworzeniu zestawów danych dla wszystkich pracowników i odbudowywaniu tych zestawów danych, gdy pracownik odzyskuje sprawność po awarii.

Konfiguracja samouczka

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Konfiguracja klastra

Jak wspomniano powyżej, klaster szkoleniowy serwera parametrów wymaga zadania koordynatora, które uruchamia program szkoleniowy, jednego lub kilku zadań roboczych i zadań serwera parametrów, które obsługują serwery TensorFlow, tj. tf.distribute.Server , i ewentualnie dodatkowego zadania oceny, które uruchamia side-car ocena (patrz sekcja oceny samochodu bocznego poniżej). Wymagania do ich skonfigurowania to:

  • Zadanie koordynatora musi znać adresy i porty wszystkich innych serwerów TensorFlow z wyjątkiem osoby oceniającej.
  • Pracownicy i serwery parametrów muszą wiedzieć, którego portu muszą nasłuchiwać. Dla uproszczenia zwykle przekazujemy pełne informacje o klastrze podczas tworzenia serwerów TensorFlow na tych zadaniach.
  • Zadanie oceniającego nie musi znać konfiguracji klastra szkoleniowego. Jeśli tak, nie powinien próbować łączyć się z klastrem szkoleniowym.
  • Pliki robocze i serwery parametrów powinny mieć typy zadań odpowiednio „workery” i „ps”. Koordynator powinien użyć określenia „szef” jako typ zadania z powodów historycznych.

W tym samouczku utworzymy klaster w procesie, aby całe szkolenie serwera parametrów można było uruchomić w colab. W dalszej części przedstawimy, jak skonfigurować prawdziwe klastry .

Klaster w toku

W tym samouczku wcześniej uruchomimy kilka serwerów TensorFlow i połączymy się z nimi później:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Trening z niestandardową pętlą treningową

Niestandardowa pętla treningowa z tf.distribute.Strategy zapewnia dużą elastyczność w definiowaniu pętli szkoleniowych. Obecnie w przypadku uczenia serwera parametrów w TensorFlow 2 obsługiwana jest tylko niestandardowa pętla szkoleniowa. Tutaj używamy ParameterServerStrategy aby zdefiniować krok szkoleniowy, a następnie używamy ClusterCoordinator do wysyłania wykonania kroków szkoleniowych do pracowników zdalnych.

Utwórz ParameterServerStrategy

Aby napisać krok szkoleniowy w niestandardowej pętli szkoleniowej, pierwszym krokiem jest utworzenie strategii ParameterServerStrategy . Później wyjaśnimy variable_partitioner .

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

Następnie utworzysz model, zdefiniujesz zbiór danych i funkcję krokową, jak widzieliśmy w pętli treningowej z innymi tf.distribute.Strategy s. Więcej szczegółów znajdziesz w tym samouczku . Utwórzmy te komponenty w następujących krokach:

Skonfiguruj dane

Najpierw napisz funkcję, która tworzy zbiór danych zawierający logikę przetwarzania wstępnego zaimplementowaną przez warstwy przetwarzania wstępnego Keras. Będziemy tworzyć te warstwy poza dataset_fn jednak zastosować transformację wewnątrz dataset_fn ponieważ będzie zawijać dataset_fn w tf.function która nie pozwala na zmienne tworzone wewnątrz niego.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Wygeneruj przykłady zabawek w zbiorze danych:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Następnie tworzymy zbiór danych szkoleniowych opakowany w dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Zbuduj model

Po drugie, tworzymy model i inne obiekty. Upewnij się, że wszystkie zmienne zostały utworzone w sekcji strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Zdefiniuj etap szkolenia

Po trzecie, utwórz krok treningowy opakowany w funkcję tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

W powyższym kroku wywołanie strategy.run i strategy.reduce w step_fn jest przydatne do obsługi procesorów graficznych lub wielu replik w przyszłości, chociaż mają one w tej chwili trywialne implementacje.

Wysyłaj kroki szkoleniowe do pracowników zdalnych

Po zdefiniowaniu wszystkich obliczeń przez ParameterServerStrategy , będziemy używać klasy ClusterCoordinator do tworzenia zasobów i rozprowadzania kroków szkoleniowych do pracowników zdalnych.

Najpierw ClusterCoordinator obiekt ClusterCoordinator i ClusterCoordinator w nim obiekt strategii:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Następnie tworzymy zbiór danych na pracownika i iterator. W per_worker_dataset_fn poniżej, owijając dataset_fn do strategy.distribute_datasets_from_function jest opcjonalny, ale pozwoli to wspieranie efektywnego prefetching do GPU płynnie w przyszłości, gdy GPU są obsługiwane przez ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Ostatnim krokiem jest dystrybucja obliczeń do pracowników zdalnych przy użyciu schedule . Metoda schedule kolejkuje funkcję tf.function i tf.function zwraca przyszłą wartość RemoteValue . Funkcje w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a wartość RemoteValue zostanie wypełniona asynchronicznie. Metoda join może służyć do czekania, aż wszystkie zaplanowane funkcje zostaną wykonane.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Oto jak możesz pobrać wynik RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

Alternatywnie możesz uruchomić wszystkie kroki i zrobić coś, czekając na zakończenie:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Aby zapoznać się z pełnym przebiegiem szkolenia i obsługi dla tego konkretnego przykładu, zapoznaj się z tym testem .

Więcej o tworzeniu zbioru danych

Zestaw danych w powyższym kodzie jest tworzony za pomocą interfejsu API create_per_worker_dataset . Tworzy jeden zestaw danych na pracownika i zwraca obiekt kontenera. Możesz wywołać na nim metodę iter , aby utworzyć iterator dla każdego pracownika. Iterator na pracownika zawiera jeden iterator na pracownika, a odpowiadający mu wycinek pracownika zostanie zastąpiony w argumencie wejściowym funkcji przekazanej do metody schedule , zanim funkcja zostanie wykonana na określonym procesie roboczym.

Obecnie metoda schedule zakłada, że ​​pracownicy są równoważni, a zatem zakłada, że ​​zestawy danych dotyczące różnych pracowników są takie same, z wyjątkiem tego, że mogą być one tasowane inaczej, jeśli zawierają operację zestawu danych . Shuffle . Z tego powodu zalecamy również powtarzanie zestawów danych w nieskończoność i planowanie skończonej liczby kroków zamiast polegania na OutOfRangeError ze zbioru danych.

Inną ważną uwagą jest to, że zestawy danych tf.data nie obsługują niejawnej serializacji i deserializacji między granicami zadań. Dlatego ważne jest, aby utworzyć cały zestaw danych wewnątrz funkcji przekazanej do create_per_worker_dataset .

Variable sharding

Fragmentowanie zmiennych odnosi się do dzielenia zmiennej na wiele mniejszych zmiennych. Nazywamy te mniejsze zmienne shard s. Fragmenty zmiennych mogą być przydatne do dystrybucji obciążenia sieci podczas uzyskiwania dostępu do tych fragmentów. Przydaje się również do dystrybucji obliczeń i przechowywania normalnej zmiennej na wiele serwerów parametrów.

Aby włączyć fragmentowanie variable_partitioner podczas konstruowania obiektu ParameterServerStrategy można przekazać zmienną_partycjoner. variable_partitioner będzie wywoływana za każdym razem, gdy tworzona jest zmienna i oczekuje się, że zwróci liczbę fragmentów wzdłuż każdego wymiaru zmiennej. Niektóre out-of-box variable_partitioner s są takie jak tf.distribute.experimental.partitioners.FixedShardsPartitioner .

W powyższym przykładzie używamy FixedShardsPartitioner który podzieli wszystkie zmienne na dwa shardy, a każdy fragment zostanie przypisany do różnych serwerów parametrów:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Kiedy variable_partitioner zostanie przekazana i jeśli utworzysz zmienną bezpośrednio w strategy.scope() , stanie się ona typem kontenera z właściwością variables która zapewnia dostęp do listy fragmentów. W większości przypadków ten kontener zostanie automatycznie przekonwertowany na Tensor poprzez połączenie wszystkich fragmentów. W rezultacie może być używana jako normalna zmienna. Z drugiej strony, niektóre metody TensorFlow, takie jak tf.nn.embedding_lookup zapewniają wydajną implementację dla tego typu kontenera i w tych metodach uniknie się automatycznego łączenia.

Więcej informacji można znaleźć w dokumentacji API ParameterServerStrategy .

Ocena

Istnieje więcej niż jeden sposób definiowania i uruchamiania pętli ocen w szkoleniu rozproszonym. Każdy ma swoje zalety i wady, jak opisano poniżej. Metoda oceny inline jest zalecana, jeśli nie masz preferencji.

Ocena bezpośrednia

W tej metodzie koordynator przełącza się między szkoleniem a oceną, dlatego nazywamy ją ewaluacją inline. Ewaluacja inline ma kilka zalet. Na przykład może obsługiwać duże modele oceny i zestawy danych oceny, których nie może przechowywać pojedyncze zadanie. Na przykład wyniki ewaluacji można wykorzystać do podjęcia decyzji o szkoleniu w następnej epoce.

Istnieją dwa sposoby wdrożenia oceny inline:

  • Ocena bezpośrednia - w przypadku małych modeli i zestawów danych do oceny koordynator może przeprowadzić ocenę bezpośrednio na modelu rozproszonym ze zbiorem danych do oceny u koordynatora:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • Ocena rozproszona - w przypadku dużych modeli lub zbiorów danych, których nie można uruchomić bezpośrednio na koordynatorze, zadanie koordynatora może rozdzielić zadania oceny do pracowników za pomocą metod schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Ocena samochodu bocznego

Inną metodą jest ocena bocznego samochodu, która polega na utworzeniu dedykowanego zadania ewaluatora, które wielokrotnie odczytuje punkty kontrolne i przeprowadza ocenę w ostatnim punkcie kontrolnym. Pozwala to na wcześniejsze zakończenie programu treningowego, jeśli nie musisz zmieniać pętli treningowej na podstawie wyników oceny. Wymaga to jednak dodatkowego zadania ewaluatora i okresowych punktów kontrolnych w celu uruchomienia oceny. Poniżej znajduje się możliwa pętla oceny samochodu bocznego:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Klastry w świecie rzeczywistym

W prawdziwym środowisku produkcyjnym wykonasz wszystkie zadania w różnych procesach na różnych maszynach. Najprostszym sposobem skonfigurowania informacji o klastrze dla każdego zadania jest ustawienie zmiennych środowiskowych „TF_CONFIG” i użycie narzędzia TFConfigClusterResolver do przeanalizowania „TF_CONFIG”. Ogólny opis zmiennych środowiskowych „TF_CONFIG” można znaleźć w rozproszonym przewodniku szkoleniowym .

Jeśli zaczynasz swoje zadania szkoleniowe za pomocą Kubernetes lub innych szablonów konfiguracji, jest bardzo prawdopodobne, że te szablony już ustawiły dla Ciebie „TF_CONFIG”.

Ustaw zmienną środowiskową „TF_CONFIG”

Załóżmy, że masz 3 pracowników i 2 serwery parametrów, „TF_CONFIG” pracownika 1 może wynosić:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

„TF_CONFIG” ewaluatora może być:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

Część „klastra” w powyższym ciągu „TF_CONFIG” dla oceniającego jest opcjonalna.

Jeśli używasz tego samego pliku binarnego do wszystkich zadań

Jeśli wolisz uruchamiać wszystkie te zadania za pomocą jednego pliku binarnego, musisz na samym początku pozwolić programowi rozgałęzić się na różne role:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

Poniższy kod uruchamia serwer TensorFlow i czeka:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Obsługa niepowodzenia zadania

Awaria pracownika

Jak wspomniano powyżej, ClusterCoordinator ma wbudowaną odporność na błędy w przypadku awarii procesu ClusterCoordinator . Po przywróceniu pracownika, odpowiedni kawałek zbiorów danych stworzony przez create_per_worker_dataset że są nadal w zakresie zostaną odtworzone powołując swój pierwotny dataset_fn przekazany do create_per_worker_dataset .

Błąd serwera parametrów lub koordynatora

Jednak gdy koordynator zauważy błąd serwera parametrów, natychmiast AbortedError błąd UnavailableError lub AbortedError . W takim przypadku możesz ponownie uruchomić koordynatora. Sam koordynator również może stać się niedostępny. Dlatego, aby nie stracić wiele z postępów uczenia, ważne jest okresowe sprawdzanie zmiennych modelu i ładowanie zmiennych modelu z punktu kontrolnego, jeśli taki istnieje, przed rozpoczęciem uczenia. Postęp szkolenia można wywnioskować w przybliżeniu z optimizer.iterations jeśli optymalizator jest w punkcie kontrolnym.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Pobieranie RemoteValue

Pobieranie wartości RemoteValue jest gwarantowane, jeśli funkcja zostanie wykonana pomyślnie. Dzieje się tak, ponieważ obecnie zwracana wartość jest natychmiast kopiowana do koordynatora po wykonaniu funkcji. Jeśli wystąpi jakikolwiek błąd pracownika podczas kopiowania, funkcja zostanie ponowiona na innym dostępnym pracowniku. Dlatego jeśli chcesz zoptymalizować wydajność, możesz zaplanować funkcje bez wartości zwracanej.

Zgłaszanie błędów

Gdy koordynator zauważy błąd, taki jak UnavailableError z serwerów parametrów lub inne błędy aplikacji, takie jak InvalidArgument z tf.debugging.check_numerics , anuluje wszystkie oczekujące i oczekujące w kolejce funkcje przed zgłoszeniem błędu. Pobranie odpowiednich wartości RemoteValue spowoduje zgłoszenie RemoteValue CancelledError .

Po zgłoszeniu błędu koordynator nie zgłosi tego samego błędu ani żadnego błędu z anulowanych funkcji.

Poprawa wydajności

Istnieje kilka możliwych przyczyn wystąpienia problemów z wydajnością podczas trenowania z ParameterServerStrategy i ClusterResolver .

Jednym z częstych powodów jest niezrównoważone obciążenie serwerów parametrów, a niektóre mocno obciążone serwery parametrów osiągnęły pojemność. Przyczyn może być również wiele. Oto kilka prostych metod złagodzenia tego problemu

  1. podziel duże zmienne modelu, określając variable_partitioner podczas konstruowania ParameterServerStrategy .
  2. Jeśli to możliwe, unikaj tworzenia zmiennej hotspot, która jest wymagana przez wszystkie serwery parametrów w jednym kroku. Na przykład użyj stałej szybkości uczenia się lub podklasy tf.keras.optimizers.schedules.LearningRateSchedule w optymalizatorach, ponieważ domyślne zachowanie jest takie, że szybkość uczenia się stanie się zmienną umieszczoną na określonym serwerze parametrów i żądaną przez wszystkie inne serwery parametrów w każdym kroku .
  3. Przetasuj swoje duże słowniki przed przekazaniem ich do warstw przetwarzania wstępnego Keras.

Innym możliwym powodem problemów z wydajnością jest koordynator. Nasza pierwsza implementacja schedule / join jest oparta na języku Python, a zatem może mieć narzut związany z wątkami. Również opóźnienie między koordynatorem a pracownikami może być duże. W takim przypadku możesz spakować wiele kroków do jednej funkcji tf.function .:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Będziemy nadal optymalizować koordynatora i miejmy nadzieję, że większość użytkowników nie będzie musiała ręcznie pakować kroków w przyszłości.

Ponadto małą sztuczką w celu poprawy wydajności jest zaplanowanie funkcji bez wartości zwracanej, jak wyjaśniono w sekcji dotyczącej obsługi niepowodzenia zadania powyżej.

Znane ograniczenia

Większość znanych ograniczeń omówiono w powyższych sekcjach. Oto podsumowanie:

  • os.environment["grpc_fail_fast"]="use_caller" jest potrzebny w każdym zadaniu, łącznie z koordynatorem, aby odporność na błędy działała poprawnie.
  • Pracownicy GPU nie są obsługiwani.
  • Uczenie synchronicznego serwera parametrów nie jest obsługiwane.
  • ParameterServerStrategy nie działa z interfejsami API compile i fit Keras.
  • ClusterCoordinator.schedule nie obsługuje gwarancji odwiedzin dla zestawu danych.
  • Gdy ClusterCoordinator.create_per_worker_dataset jest ClusterCoordinator.create_per_worker_dataset , cały zestaw danych musi zostać utworzony wewnątrz przekazanej do niego funkcji.
  • Aby osiągnąć optymalną wydajność, zwykle konieczne jest spakowanie wielu kroków w jedną funkcję.
  • Nie jest obsługiwane ładowanie save_model za pośrednictwem tf.saved_model.load zawierającego zmienne podzielone na fragmenty. Należy zauważyć, że ładowanie takiego save_model przy użyciu usługi TensorFlow Serving powinno zadziałać.
  • Nie jest obsługiwane ładowanie punktu kontrolnego zawierającego podzielone na fragmenty zmienne boksu optymalizatora do innej liczby fragmentów.
  • Nie jest obsługiwane odtwarzanie po awarii serwera parametrów bez ponownego uruchomienia zadania koordynatora.