Oglądaj prezentacje, sesje produktowe, warsztaty i nie tylko z playlisty Google I / O See

Szkolenie serwera parametrów

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło w serwisie GitHub Pobierz notatnik

Przegląd

Uczenie serwera parametrów jest popularną metodą równoległą do danych w celu skalowania szkolenia modelu na wielu komputerach. Klaster szkoleniowy serwera parametrów składa się z procesów roboczych i serwerów parametrów. Zmienne są tworzone na serwerach parametrów i są odczytywane i aktualizowane przez pracowników na każdym kroku. Domyślnie pracownicy odczytują i aktualizują te zmienne niezależnie, bez synchronizacji między sobą. Dlatego czasami uczenie w stylu serwera parametrów jest nazywane uczeniem asynchronicznym.

W TF2 uczenie serwera parametrów jest obsługiwane przez klasę tf.distribute.experimental.ParameterServerStrategy , która dystrybuuje kroki szkoleniowe do klastra, który skaluje do tysięcy pracowników (wraz z serwerami parametrów). Istnieją dwa główne obsługiwane szkoleniowe interfejsy API: Keras Training API, znany również jako Model.fit i Custom Training Loop (CTL). Model.fit jest zalecany, gdy użytkownicy wolą abstrakcję na wysokim poziomie i obsługę treningu, a CTL jest zalecany, gdy użytkownicy wolą definiować szczegóły swojej pętli szkoleniowej.

Niezależnie od wybranego API, rozproszone szkolenie w TF2 obejmuje „klaster” z kilkoma „zadaniami”, a każde z zadań może mieć jedno lub więcej „zadań”. Podczas korzystania z uczenia serwera parametrów zaleca się mieć jedno zadanie koordynatora (które ma chief zadania), wiele zadań roboczych (nazwa zadania worker ) i wiele zadań serwera parametrów (nazwa zadania ps ).

Podczas gdy koordynator tworzy zasoby, wysyła zadania szkoleniowe, zapisuje punkty kontrolne i tf.distribute.Server sobie z awariami zadań, pracownicy i serwery parametrów uruchamiają tf.distribute.Server które nasłuchują żądań koordynatora.

Trening serwera parametrów z API Model.fit

tf.distribute.experimental.ParameterServerStrategy serwera parametrów za pomocą Model.fit API wymaga, aby koordynator tf.distribute.experimental.ParameterServerStrategy obiektu tf.distribute.experimental.ParameterServerStrategy i tf.keras.utils.experimental.DatasetCreator jako danych wejściowych. Podobnie jak w przypadku korzystania z Model.fit bez strategii lub z innymi strategiami, przepływ pracy obejmuje tworzenie i kompilację modelu, przygotowanie wywołań zwrotnych, a następnie wywołanie Model.fit .

Trenowanie serwera parametrów z niestandardowym interfejsem API pętli szkoleniowej (CTL)

W przypadku tf.distribute.experimental.coordinator.ClusterCoordinator CTL klasa tf.distribute.experimental.coordinator.ClusterCoordinator jest kluczowym składnikiem używanym przez koordynatora. Klasa ClusterCoordinator musi działać w połączeniu z obiektem tf.distribute.Strategy . Ten obiekt tf.distribute.Strategy jest potrzebny do dostarczania informacji o klastrze i służy do definiowania kroku szkoleniowego, jak widzieliśmy w niestandardowym szkoleniu z MirroredStrategy . Obiekt ClusterCoordinator następnie wysyła wykonanie tych kroków szkoleniowych do pracowników zdalnych. Na potrzeby ClusterCoordinator serwera parametrów ClusterCoordinator musi pracować z tf.distribute.experimental.ParameterServerStrategy .

Najważniejszym API udostępnianym przez obiekt ClusterCoordinator jest schedule . schedule API kolejkuje funkcję tf.function i tf.function zwraca przyszłą wartość RemoteValue . Funkcje w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a ich wartości RemoteValue zostaną wypełnione asynchronicznie. Ponieważ schedule nie wymaga przypisania pracownika, przekazana tf.function może zostać wykonana na dowolnym dostępnym pracowniku. Jeśli pracownik, na którym jest wykonywana, stanie się niedostępny przed jej zakończeniem, funkcja zostanie ponowiona na innym dostępnym pracowniku. Z powodu tego faktu oraz faktu, że wykonanie funkcji nie jest atomowe, funkcja może być wykonywana więcej niż jeden raz.

Oprócz wysyłania funkcji zdalnych ClusterCoordinator pomaga również w tworzeniu zestawów danych dotyczących wszystkich pracowników i odbudowywaniu tych zestawów danych, gdy pracownik odzyskuje sprawność po awarii.

Konfiguracja samouczka

Samouczek będzie rozgałęziony na ścieżki CTL lub Model.fit i możesz wybrać tę, która odpowiada Twoim potrzebom. Sekcje inne niż „Trening z X” mają zastosowanie do obu ścieżek.

pip install -q portpicker
pip install -q tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Konfiguracja klastra

Jak wspomniano powyżej, klaster szkoleniowy serwera parametrów wymaga zadania koordynatora, które uruchamia program szkoleniowy, jednego lub kilku zadań roboczych i serwera parametrów, które uruchamiają serwery TensorFlow, tj. tf.distribute.Server , i ewentualnie dodatkowego zadania oceny, które uruchamia side-car ocena (patrz sekcja oceny samochodu bocznego poniżej). Wymagania do ich skonfigurowania to:

  • Zadanie koordynatora musi znać adresy i porty wszystkich innych serwerów TensorFlow z wyjątkiem osoby oceniającej.
  • Pracownicy i serwery parametrów muszą wiedzieć, którego portu muszą nasłuchiwać. Dla uproszczenia zwykle przekazujemy pełne informacje o klastrze podczas tworzenia serwerów TensorFlow na tych zadaniach.
  • Zadanie oceniającego nie musi znać konfiguracji klastra szkoleniowego. Jeśli tak, nie powinien próbować łączyć się z klastrem szkoleniowym.
  • Pliki robocze i serwery parametrów powinny mieć typy zadań odpowiednio „workery” i „ps”. Koordynator powinien używać określenia „szef” jako typ zadania z powodów historycznych.

W tym samouczku utworzymy klaster w procesie, aby całe szkolenie serwera parametrów można było uruchomić w colab. W dalszej części przedstawimy, jak skonfigurować prawdziwe klastry .

Klaster w toku

W tym samouczku wcześniej uruchomimy kilka serwerów TensorFlow i połączymy się z nimi później. Zauważ, że jest to tylko w celu demonstracji tego samouczka, a podczas prawdziwego treningu serwery będą uruchamiane na maszynach Worker i PS.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Konfiguracja klastra w trakcie jest często używana w naszych testach jednostkowych. Oto jeden przykład .

Utwórz wystąpienie ParameterServerStrategy

Zanim zagłębimy się w kod szkoleniowy, utwórzmy wystąpienie obiektu ParameterServerStrategy . Zauważ, że jest to potrzebne niezależnie od tego, czy kontynuujesz z niestandardową pętlą treningową, czy z Model.fit . variable_partitioner argumentem zostaną wyjaśnione w następnym rozdziale .

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:24366', 'localhost:17071'], 'worker': ['localhost:17839', 'localhost:24811', 'localhost:19665']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:24366', 'localhost:17071'], 'worker': ['localhost:17839', 'localhost:24811', 'localhost:19665']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0', '/job:chief/replica:0/task:0/device:GPU:1', '/job:chief/replica:0/task:0/device:GPU:2', '/job:chief/replica:0/task:0/device:GPU:3', '/job:chief/replica:0/task:0/device:GPU:4', '/job:chief/replica:0/task:0/device:GPU:5'], variable_device = '/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Aby używać procesorów graficznych do szkolenia, przydziel procesory graficzne widoczne dla każdego pracownika. ParameterServerStrategy użyje wszystkich dostępnych procesorów graficznych na każdym z pracowników, z zastrzeżeniem, że wszyscy pracownicy powinni mieć taką samą liczbę dostępnych procesorów graficznych.

Variable sharding

Fragmentowanie zmiennej odnosi się do dzielenia zmiennej na wiele mniejszych zmiennych. Nazywamy te mniejsze zmienne shard s. Fragmenty zmiennych mogą być przydatne do rozłożenia obciążenia sieci podczas uzyskiwania dostępu do tych fragmentów. Przydaje się również do dystrybucji obliczeń i przechowywania normalnej zmiennej na wiele serwerów parametrów.

Aby włączyć zmienną sharding, można przekazać w variable_partitioner przy konstruowaniu ParameterServerStrategy obiekt. variable_partitioner będzie wywoływana za każdym razem, gdy tworzona jest zmienna i oczekuje się, że zwróci liczbę fragmentów wzdłuż każdego wymiaru zmiennej. Niektóre out-of-box variable_partitioner s są takie jak tf.distribute.experimental.partitioners.FixedShardsPartitioner .

Kiedy variable_partitioner jest przekazywana i jeśli utworzysz zmienną bezpośrednio w strategy.scope() , stanie się ona typem kontenera z właściwością variables która zapewnia dostęp do listy fragmentów. W większości przypadków ten kontener zostanie automatycznie przekonwertowany na Tensor przez połączenie wszystkich fragmentów. W rezultacie może być używana jako normalna zmienna. Z drugiej strony niektóre metody TensorFlow, takie jak tf.nn.embedding_lookup zapewniają wydajną implementację dla tego typu kontenera iw tych metodach uniknie się automatycznej konkatenacji.

Więcej informacji można znaleźć w dokumentacji API ParameterServerStrategy .

Trening z Model.fit

Keras zapewnia łatwy w użyciu interfejs API do szkolenia za pośrednictwem Model.fit który obsługuje pętlę szkoleniową pod maską, z elastycznością overridable train_step i callbackami, które zapewniają funkcje, takie jak zapisywanie punktów kontrolnych lub podsumowanie zapisywania dla TensorBoard. Dzięki Model.fit ten sam kod szkoleniowy może być użyty do innych strategii z prostą zamianą obiektu strategii.

Dane wejściowe

Model.fit z Model.fit serwera parametrów wymaga, aby dane wejściowe były dostarczane w postaci wywoływalnej, która przyjmuje pojedynczy argument typu tf.distribute.InputContext i zwracatf.data.Dataset . Następnie utwórz tf.keras.utils.experimental.DatasetCreator obiekt, który zajmuje tak callable , a opcjonalny tf.distribute.InputOptions sprzeciw poprzez input_options argument. Zauważ, że zaleca się tasowanie i powtarzanie danych z steps_per_epoch serwera parametrów oraz określenie steps_per_epoch w wywołaniu fit aby biblioteka znała granice epoki.

Aby uzyskać więcej informacji na temat argumentu InputContext zobacz Przewodnik dotyczący rozproszonych danych wejściowych .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Kod w dataset_fn zostanie wywołany na urządzeniu wejściowym, którym jest zwykle procesor CPU, na każdej z maszyn roboczych.

Budowa i kompilacja modeli

Teraz utworzysz tf.keras.Model z wybranymi interfejsami API (trywialny tf.keras.models.Sequential Model tf.keras.models.Sequential jest tutaj używany jako demonstracja), a następnie wywołanie Model.compile celu włączenia komponentów, takich jak optymalizator, metryki lub parametry, takie jak steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Połączenia zwrotne i szkolenia

Zanim zadzwonisz do model.fit do właściwego szkolenia, przygotujmy potrzebne callbacki dla typowych zadań, takich jak:

  • ModelCheckpoint - aby zapisać wagi modelu.

  • BackupAndRestore - aby upewnić się, że postęp szkolenia jest automatycznie archiwizowany i odzyskiwany, jeśli klaster jest niedostępny (na przykład przerwanie lub wywłaszczanie) lub

  • TensorBoard - do zapisywania raportów postępu w plikach podsumowań, które są wizualizowane w narzędziu TensorBoard.

Należy pamiętać, że ze względu na wydajność niestandardowe wywołania zwrotne nie mogą mieć nadpisanych wywołań zwrotnych na poziomie partii, gdy są używane z ParameterServerStrategy . Zmodyfikuj swoje niestandardowe wywołania zwrotne, aby były wywołania na poziomie epoki, i dostosuj steps_per_epoch do odpowiedniej wartości. Ponadto steps_per_epoch jest wymaganym argumentem dla Model.fit gdy jest używany z ParameterServerStrategy .

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 6s - loss: 0.9476
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.8812
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.5994
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f1e973a5d40> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f1e8c19eb90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.4205
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.3881
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<tensorflow.python.keras.callbacks.History at 0x7f236401be50>

Bezpośrednie użycie z ClusterCoordinator (opcjonalnie)

Nawet jeśli wybierzesz ścieżkę szkoleniową Model.fit , możesz opcjonalnie utworzyć wystąpienie obiektu ClusterCoordinator aby zaplanować inne funkcje, które chcesz wykonać na pracownikach. Więcej szczegółów i przykładów znajduje się poniżej w sekcji Trening z niestandardową pętlą szkoleniową .

Trening z niestandardową pętlą treningową

Niestandardowa pętla treningowa z tf.distribute.Strategy zapewnia dużą elastyczność w definiowaniu pętli szkoleniowych. Przy zdefiniowanej powyżej strategii ParameterServerStrategy , będziesz używać ClusterCoordinator do wysyłania wykonania kroków szkoleniowych do pracowników zdalnych.

Następnie utworzysz model, zdefiniujesz zbiór danych i funkcję krokową, jak widzieliśmy w pętli treningowej z innymi tf.distribute.Strategy . Więcej szczegółów znajdziesz w tym samouczku .

Aby zapewnić wydajne wstępne pobieranie zestawów danych, użyj zalecanych interfejsów API tworzenia rozproszonych zestawów danych, o których mowa w poniższej sekcji Szkolenie dotyczące wysyłania do pracowników zdalnych . Upewnij się również, że wywołujesz strategy.run wewnątrz pliku worker_fn, aby w pełni wykorzystać GPU przydzielone pracownikom. Pozostałe kroki są takie same w przypadku treningu z GPU lub bez.

Utwórzmy te komponenty w następujących krokach:

Skonfiguruj dane

Najpierw napisz funkcję, która tworzy zbiór danych, który zawiera logikę przetwarzania wstępnego zaimplementowaną przez warstwy przetwarzania wstępnego Keras. Będziemy tworzyć te warstwy poza dataset_fn jednak zastosować transformację wewnątrz dataset_fn ponieważ będzie zawijać dataset_fn w tf.function która nie pozwala na zmienne tworzone wewnątrz niego.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab,
                                          mask_token=None)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Wygeneruj przykłady zabawek w zbiorze danych:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Następnie tworzymy zbiór danych szkoleniowych opakowany w dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Zbuduj model

Po drugie, tworzymy model i inne obiekty. Upewnij się, że wszystkie zmienne zostały utworzone w sekcji strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Potwierdźmy, że użycie FixedShardsPartitioner podzieliło wszystkie zmienne na dwa fragmenty, a każdy fragment został przypisany do różnych serwerów parametrów:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Zdefiniuj etap szkolenia

Po trzecie, utwórz krok szkoleniowy opakowany w funkcję tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

W powyższym kroku wywołanie strategy.run i strategy.reduce w step_fn może obsługiwać wiele procesorów graficznych na pracownika. Jeśli pracownicy mają przydzielone GPU, strategy.run rozprowadzi zestawy danych w wielu replikach.

Wysyłaj kroki szkoleniowe do pracowników zdalnych

Po zdefiniowaniu wszystkich obliczeń przez ParameterServerStrategy , będziemy używać klasy ClusterCoordinator do tworzenia zasobów i rozprowadzania kroków szkoleniowych do pracowników zdalnych.

Najpierw ClusterCoordinator obiekt ClusterCoordinator i ClusterCoordinator w nim obiekt strategii:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Następnie tworzymy zbiór danych na pracownika i iterator. W per_worker_dataset_fn poniżej, owijając dataset_fn do strategy.distribute_datasets_from_function zaleca się, aby umożliwić wydajne wstępne pobieranie do GPU bezproblemowo.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Ostatnim krokiem jest dystrybucja obliczeń do pracowników zdalnych przy użyciu schedule . Metoda schedule kolejkuje funkcję tf.function i tf.function zwraca przyszłą wartość RemoteValue . Funkcje w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a wartość RemoteValue zostanie wypełniona asynchronicznie. Metoda join może służyć do czekania, aż wszystkie zaplanowane funkcje zostaną wykonane.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.743750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Oto jak możesz pobrać wynik RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.005490

Alternatywnie możesz uruchomić wszystkie kroki i zrobić coś, czekając na zakończenie:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Aby uzyskać pełny przepływ pracy w zakresie szkolenia i obsługi dla tego konkretnego przykładu, zapoznaj się z tym testem .

Więcej o tworzeniu zbioru danych

Zestaw danych w powyższym kodzie jest tworzony za pomocą interfejsu API create_per_worker_dataset . Tworzy jeden zestaw danych na pracownika i zwraca obiekt kontenera. Możesz wywołać na nim metodę iter , aby utworzyć iterator dla każdego pracownika. Iterator na pracownika zawiera jeden iterator na pracownika, a odpowiadający mu wycinek procesu roboczego zostanie zastąpiony w argumencie wejściowym funkcji przekazanej do metody schedule , zanim funkcja zostanie wykonana na określonym procesie roboczym.

Obecnie metoda schedule zakłada, że ​​pracownicy są równoważni, a zatem zakłada, że ​​zestawy danych dla różnych pracowników są takie same, z wyjątkiem tego, że mogą być one tasowane inaczej, jeśli zawierają operację zestawu danych . Shuffle . Z tego powodu zalecamy również powtarzanie zestawów danych w nieskończoność i planowanie skończonej liczby kroków zamiast polegania na OutOfRangeError ze zbioru danych.

Inną ważną uwagą jest to, że zestawy danych tf.data nie obsługują niejawnej serializacji i deserializacji między granicami zadań. Dlatego ważne jest, aby utworzyć cały zbiór danych wewnątrz funkcji przekazanej do create_per_worker_dataset .

Ocena

Istnieje więcej niż jeden sposób definiowania i uruchamiania pętli ocen w szkoleniu rozproszonym. Każdy ma swoje zalety i wady, jak opisano poniżej. Metoda oceny inline jest zalecana, jeśli nie masz preferencji.

Ocena bezpośrednia

W tej metodzie koordynator przełącza się między szkoleniem a oceną, dlatego nazywamy ją ewaluacją inline. Ewaluacja inline ma kilka zalet. Na przykład może obsługiwać duże modele oceny i zestawy danych oceny, których nie może przechowywać pojedyncze zadanie. Na przykład wyniki ewaluacji można wykorzystać do podjęcia decyzji o szkoleniu w następnej epoce.

Istnieją dwa sposoby wdrożenia oceny inline:

  • Ocena bezpośrednia - w przypadku małych modeli i zestawów danych do oceny koordynator może przeprowadzić ocenę bezpośrednio na modelu rozproszonym ze zbiorem danych do oceny na koordynatorze:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Ocena rozproszona - w przypadku dużych modeli lub zbiorów danych, których nie można uruchomić bezpośrednio na koordynatorze, zadanie koordynatora może rozdzielić zadania oceny do pracowników za pomocą metod schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Ocena samochodu bocznego

Inną metodą jest ocena bocznego samochodu, która polega na utworzeniu dedykowanego zadania oceniającego, które wielokrotnie odczytuje punkty kontrolne i przeprowadza ocenę w ostatnim punkcie kontrolnym. Pozwala to na wcześniejsze zakończenie programu treningowego, jeśli nie musisz zmieniać pętli treningowej na podstawie wyników oceny. Wymaga to jednak dodatkowego zadania ewaluatora i okresowych punktów kontrolnych w celu uruchomienia oceny. Poniżej znajduje się możliwa pętla oceny samochodu bocznego:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Klastry w świecie rzeczywistym

W prawdziwym środowisku produkcyjnym wykonasz wszystkie zadania w różnych procesach na różnych maszynach. Najprostszym sposobem skonfigurowania informacji o klastrze dla każdego zadania jest ustawienie zmiennych środowiskowych „TF_CONFIG” i użycie narzędzia tf.distribute.cluster_resolver.TFConfigClusterResolver do przeanalizowania „TF_CONFIG”. Ogólny opis zmiennych środowiskowych „TF_CONFIG” można znaleźć w rozproszonym przewodniku szkoleniowym .

Jeśli zaczynasz swoje zadania szkoleniowe przy użyciu Kubernetes lub innych szablonów konfiguracji, jest bardzo prawdopodobne, że te szablony ustawiły już dla Ciebie „TF_CONFIG”.

Ustaw zmienną środowiskową „TF_CONFIG”

Załóżmy, że masz 3 pracowników i 2 serwery parametrów, „TF_CONFIG” pracownika 1 może wynosić:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

„TF_CONFIG” ewaluatora może być:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Część „klastra” w powyższym ciągu „TF_CONFIG” dla oceniającego jest opcjonalna.

Jeśli używasz tego samego pliku binarnego do wszystkich zadań

Jeśli wolisz uruchamiać wszystkie te zadania przy użyciu jednego pliku binarnego, musisz na samym początku pozwolić programowi rozgałęzić się na różne role:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

Poniższy kod uruchamia serwer TensorFlow i czeka:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Obsługa niepowodzenia zadania

Awaria pracownika

ClusterCoordinator lub Model.fit zapewnia wbudowaną odporność na błędy w przypadku awarii pracownika. Po przywróceniu procesu DatasetCreator na pracowników zostanie wywołana poprzednio udostępniona funkcja zestawu danych (w celu create_per_worker_dataset zestawu danych dla CTL lub DatasetCreator dla Model.fit ) w celu ponownego utworzenia zestawów danych.

Błąd serwera parametrów lub koordynatora

Jednak gdy koordynator zauważy błąd serwera parametrów, natychmiast AbortedError błąd UnavailableError lub AbortedError . W takim przypadku możesz ponownie uruchomić koordynatora. Sam koordynator również może stać się niedostępny. Dlatego zaleca się pewne narzędzia, aby nie stracić postępów w treningu:

  • W przypadku Model.fit należy użyć wywołania zwrotnego BackupAndRestore , które automatycznie obsługuje zapisywanie i przywracanie postępu. Przykład znajduje się w sekcji Callback i szkolenie powyżej.

  • W przypadku list CTL należy okresowo sprawdzać zmienne modelu i ładować zmienne modelu z punktu kontrolnego, jeśli taki istnieje, przed rozpoczęciem uczenia. Postęp szkolenia można wywnioskować w przybliżeniu na podstawie parametrów optimizer.iterations jeśli optymalizator jest w punkcie kontrolnym:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Pobieranie wartości RemoteValue

Pobieranie wartości RemoteValue jest gwarantowane, jeśli funkcja zostanie pomyślnie wykonana. Dzieje się tak, ponieważ obecnie zwracana wartość jest natychmiast kopiowana do koordynatora po wykonaniu funkcji. Jeśli wystąpi jakikolwiek błąd pracownika podczas kopiowania, funkcja zostanie ponowiona na innym dostępnym procesie roboczym. Dlatego jeśli chcesz zoptymalizować wydajność, możesz zaplanować funkcje bez wartości zwracanej.

Zgłaszanie błędów

Gdy koordynator zauważy błąd, taki jak UnavailableError z serwerów parametrów lub innych błędów aplikacji, takich jak InvalidArgument z tf.debugging.check_numerics , anuluje wszystkie oczekujące i oczekujące w kolejce funkcje przed zgłoszeniem błędu. Pobranie odpowiednich wartości RemoteValue spowoduje zgłoszenie RemoteValue CancelledError .

Po zgłoszeniu błędu koordynator nie zgłosi tego samego błędu ani żadnego błędu z anulowanych funkcji.

Poprawa wydajności

Istnieje kilka możliwych przyczyn ClusterResolver problemów z wydajnością podczas treningu z ParameterServerStrategy i ClusterResolver .

Jedną z częstych przyczyn jest niezrównoważone obciążenie serwerów parametrów, a niektóre mocno obciążone serwery parametrów osiągnęły pojemność. Przyczyn może być również wiele. Oto kilka prostych metod złagodzenia tego problemu

  1. podziel duże zmienne modelu, określając variable_partitioner podczas konstruowania ParameterServerStrategy .
  2. unikaj tworzenia zmiennej hotspot, która jest wymagana przez wszystkie serwery parametrów w jednym kroku, jeśli to możliwe. Na przykład użyj stałej szybkości uczenia się lub podklasy tf.keras.optimizers.schedules.LearningRateSchedule w optymalizatorach, ponieważ domyślnym zachowaniem jest to, że szybkość uczenia się stanie się zmienną umieszczaną na określonym serwerze parametrów i żądaną przez wszystkie inne serwery parametrów w każdym kroku .
  3. Przetasuj swoje duże słowniki przed przekazaniem ich do warstw przetwarzania wstępnego Keras.

Innym możliwym powodem problemów z wydajnością jest koordynator. Nasza pierwsza implementacja schedule / join jest oparta na języku Python, a zatem może mieć narzut związany z wątkami. Również opóźnienie między koordynatorem a pracownikami może być duże. Jeżeli o to chodzi,

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Ponieważ nadal optymalizujemy bibliotekę, mamy nadzieję, że większość użytkowników nie będzie musiała ręcznie pakować kroków w przyszłości.

Ponadto małą sztuczką poprawiającą wydajność jest planowanie funkcji bez wartości zwracanej, jak wyjaśniono w sekcji dotyczącej obsługi niepowodzenia zadania powyżej.

Znane ograniczenia

Większość znanych ograniczeń omówiono w powyższych sekcjach. Ta sekcja zawiera podsumowanie.

ParameterServerStrategy general

  • os.environment["grpc_fail_fast"]="use_caller" jest potrzebny w każdym zadaniu, łącznie z koordynatorem, aby odporność na błędy działała poprawnie.
  • Uczenie synchronicznego serwera parametrów nie jest obsługiwane.
  • Zwykle konieczne jest spakowanie wielu kroków w jedną funkcję, aby uzyskać optymalną wydajność.
  • Nie jest obsługiwane ładowanie save_model za pośrednictwem tf.saved_model.load zawierającego zmienne podzielone na fragmenty. Należy zauważyć, że ładowanie takiego save_model przy użyciu usługi TensorFlow Serving powinno zadziałać.
  • Nie jest obsługiwane ładowanie punktu kontrolnego zawierającego podzielone na fragmenty zmienne boksu optymalizatora do innej liczby fragmentów.
  • Nie jest obsługiwane odtwarzanie po awarii serwera parametrów bez ponownego uruchomienia zadania koordynatora.
  • Użycie tf.lookup.StaticHashTable (które jest powszechnie używane przez niektóre warstwy tf.keras.layers.experimental.preprocessing , takie jak IntegerLookup , StringLookup i TextVectorization ) skutkuje umieszczeniem zasobów w koordynatorze w tym czasie z treningiem PS. Ma to wpływ na wydajność wyszukiwania RPC od pracowników do koordynatora. Jest to obecnie wysoki priorytet, którym należy się zająć.

Specyfika Model.fit

  • Argument steps_per_epoch jest wymagany w Model.fit . Możesz wybrać wartość, która zapewnia odpowiednie odstępy czasu w epoce.
  • ParameterServerStrategy nie obsługuje niestandardowych wywołań zwrotnych, które mają wywołania na poziomie wsadowym ze względu na wydajność. Powinieneś przekonwertować te wywołania na wywołania na poziomie epoki z odpowiednio steps_per_epoch , tak aby były nazywane każdą liczbą kroków steps_per_epoch . Nie ma to wpływu na wbudowane wywołania zwrotne: ich wywołania na poziomie wsadowym zostały zmodyfikowane, aby były wydajne. Planowana jest obsługa wywołań na poziomie partii dla ParameterServerStrategy .
  • Z tego samego powodu, w przeciwieństwie do innych strategii, pasek postępu i metryki są rejestrowane tylko na granicach epoki.
  • Dane wejściowe dla Model.fit przyjmują tylko typ DatasetCreator .
  • run_eagerly nie jest obsługiwany.
  • Ocena w Model.fit nie jest jeszcze obsługiwana. To jeden z priorytetów.
  • Model.evaluate i Model.predict nie są jeszcze obsługiwane.

Specyfika niestandardowej pętli szkoleniowej