Pomoc chronić Wielkiej Rafy Koralowej z TensorFlow na Kaggle Dołącz Wyzwanie

Trening serwera parametrów za pomocą ParameterServerStrategy

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Trening serwer parametr jest powszechną metodą danych równolegle do skalowania modelu szkolenia na wielu maszynach.

Klaster szkolenie serwer parametr składa się z pracowników i serwerów parametrów. Zmienne są tworzone na serwerach parametrów i na każdym kroku są odczytywane i aktualizowane przez pracowników. Domyślnie pracownicy odczytują i aktualizują te zmienne niezależnie bez wzajemnej synchronizacji. To dlatego czasami parametr szkolenia serwer stylu nazywa trening asynchroniczny.

W TensorFlow 2, szkolenia serwer parametr jest zasilany przez tf.distribute.experimental.ParameterServerStrategy klasy, który rozprowadza etapy szkolenia w klastrze, który skaluje się do tysięcy pracowników (w towarzystwie serwerach parametr).

Obsługiwane metody szkoleniowe

Istnieją dwie główne wspierane metody szkoleniowe:

Klaster z zadaniami i zadaniami

Niezależnie od wyboru API ( Model.fit lub pętli niestandardowe szkolenia), rozprowadzane szkolenie w TensorFlow 2 dotyczy: 'cluster' z kilkoma 'jobs' , a każdy z pracy może mieć jeden lub więcej 'tasks' .

W przypadku korzystania z uczenia serwera parametrów zaleca się posiadanie:

  • Jeden koordynator pracy (który ma nazwę zadania chief )
  • Wiele pracy pracownika (nazwa zadania worker ); oraz
  • Wiele parametrów pracy serwera (nazwa zadania ps )

Natomiast koordynator tworzy zasoby, depesze zadania szkoleniowe, pisze punktów kontrolnych i dotyczy awarii zadaniowych, pracowników i serwerów parametrów uruchomienia tf.distribute.Server że nasłuchiwać żądań od koordynatora.

Trening serwer parametr z Model.fit API

Trening serwer parametr z Model.fit API wymaga koordynatora do korzystania z tf.distribute.experimental.ParameterServerStrategy obiektu, a tf.keras.utils.experimental.DatasetCreator jako wejście. Podobny do Model.fit użytkowania bez strategii, lub z innymi strategiami, workflow polega na tworzeniu i opracowywaniu modelu, przygotowanie wywołania zwrotne, a następnie przez Model.fit rozmowy.

Trening parametrów serwera z niestandardową pętlą treningową

Z niestandardowych pętli szkoleniowych, tf.distribute.experimental.coordinator.ClusterCoordinator klasa jest kluczowym składnikiem stosowanym do koordynatora.

Najważniejszą API dostarczone przez ClusterCoordinator obiektu jest schedule :

  • schedule API kolejkuje do tf.function i zwraca przyszłość-jak RemoteValue natychmiast.
  • W kolejce funkcje zostaną wysłane do zdalnych pracowników w tle wątków i ich RemoteValue s zostaną wypełnione w sposób asynchroniczny.
  • Ponieważ schedule nie wymaga przypisania pracownika The tf.function przekazany można wykonać na dowolnym dostępnym pracownika.
  • Jeśli pracownik, na którym jest wykonywana, stanie się niedostępny przed jej zakończeniem, funkcja zostanie ponowiona na innym dostępnym pracowniku.
  • Ze względu na ten fakt oraz fakt, że wykonanie funkcji nie jest niepodzielne, funkcja może być wykonana więcej niż raz.

Oprócz wysyłania funkcje zdalnego The ClusterCoordinator również przyczynia się do tworzenia zestawów danych na temat wszystkich pracowników i odbudowa tych zestawów danych, gdy odzyskuje robotnik z awarii.

Konfiguracja samouczka

Tutorial rozgałęzia się Model.fit i niestandardowych ścieżek pętlowych szkolenia, można wybrać ten, który pasuje do Twoich potrzeb. Sekcje inne niż „Trening z X” mają zastosowanie do obu ścieżek.

pip install portpicker

Konfiguracja klastra

Jak wspomniano powyżej, klaster serwerów szkolenie parametr wymaga zadania koordynatora, który uruchamia swój program treningowy, jednego lub kilku pracowników i zadania serwera parametrów, które działają TensorFlow servers- tf.distribute.Server -i ewentualnie dodatkowym zadaniem oceny tej oceny biegnie side-car (patrz sekcja dotycząca oceny wózka bocznego poniżej). Wymagania do ich założenia to:

  • Zadanie koordynatora musi znać adresy i porty wszystkich innych serwerów TensorFlow z wyjątkiem oceniającego.
  • Pracownicy i serwery parametrów muszą wiedzieć, na którym porcie mają nasłuchiwać. Dla uproszczenia zazwyczaj można przekazać pełne informacje o klastrze podczas tworzenia serwerów TensorFlow do tych zadań.
  • Zadanie oceniającego nie musi znać konfiguracji klastra szkoleniowego. Jeśli tak, nie powinien próbować nawiązać połączenia z klastrem szkoleniowym.
  • Pracownicy i serwery parametr powinien mieć typów zadań jako "worker" i "ps" , odpowiednio. Koordynator powinien używać "chief" jako typ zadania ze względów starszych.

W tym samouczku utworzysz klaster wewnątrzprocesowy, aby całe szkolenie dotyczące serwera parametrów można było uruchomić w Colab. Dowiesz się, jak skonfigurować prawdziwe klastrów w dalszej części.

Klaster w procesie

Zaczniesz od utworzenia kilku serwerów TensorFlow z wyprzedzeniem i połączysz się z nimi później. Należy pamiętać, że to tylko dla celów demonstracyjnych Ten poradnik, aw rzeczywistym szkolenia zostanie uruchomiona na serwerach "worker" i "ps" maszyn.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Konfiguracja klastra w trakcie procesu jest często używane w testach jednostkowych, takich jak tutaj .

Inną opcją dla lokalnego testowania jest uruchomienie procesów na lokalnym komputerze-check out szkolenia Wielu pracowników z Keras na przykład tego podejścia.

Utwórz wystąpienie ParameterServerStrategy

Zanim przejdziesz do kodu szkoleniowej, niech instancji ParameterServerStrategy obiekt. Należy pamiętać, że jest to potrzebne, niezależnie od tego, czy postępowanie z Model.fit lub pętli niestandardowe szkolenia. variable_partitioner argumentem zostaną wyjaśnione w zmiennym przekroju sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:16686', 'localhost:23151'], 'worker': ['localhost:16753', 'localhost:22750', 'localhost:20823']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:16686', 'localhost:23151'], 'worker': ['localhost:16753', 'localhost:22750', 'localhost:20823']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Aby używać procesorów GPU do szkolenia, przydziel procesory GPU widoczne dla każdego pracownika. ParameterServerStrategy wykorzysta wszystkie dostępne procesory graficzne na każdego pracownika, z zastrzeżeniem, że wszyscy pracownicy powinni mieć taką samą liczbę dostępnych procesorów graficznych.

Zmienna sharding

Zmienna sharding odnosi się do rozszczepienia zmienną na wiele mniejszych zmiennych, które są zwane kawałki. Fragmentowanie zmiennych może być przydatne do rozłożenia obciążenia sieci podczas uzyskiwania dostępu do tych fragmentów. Przydatne jest również rozdzielenie obliczeń i przechowywania normalnej zmiennej na wiele serwerów parametrów.

Aby włączyć zmienną sharding, można przekazać w variable_partitioner przy konstruowaniu ParameterServerStrategy obiekt. variable_partitioner zostanie wywołany za każdym razem, gdy zmienna jest tworzona i oczekuje się, że zwróci liczbę odłamków wzdłuż każdego wymiaru zmiennej. Niektóre out-of-box variable_partitioner s są takie jak tf.distribute.experimental.partitioners.MinSizePartitioner . Zaleca się używać zaborców rozmiaru opartego jak tf.distribute.experimental.partitioners.MinSizePartitioner aby uniknąć podziału małych zmiennych, które mogłyby mieć negatywny wpływ na prędkość modelu szkolenia.

Kiedy variable_partitioner jest przekazywana i jeśli utworzyć zmienną bezpośrednio pod strategy.scope() , staje się rodzajem pojemnika ze variables własności, która zapewnia dostęp do listy odłamków. W większości przypadków ten kontener zostanie automatycznie przekonwertowany na tensor poprzez połączenie wszystkich odłamków. W rezultacie może być używana jako normalna zmienna. Z drugiej strony, niektóre metody TensorFlow takie jak tf.nn.embedding_lookup zapewnić skuteczną realizację tego typu pojemnika i w tych metod będą unikać automatyczne powiązanie.

Proszę zobaczyć docs API tf.distribute.experimental.ParameterServerStrategy więcej szczegółów.

Szkolenie z Model.fit

Keras zapewnia łatwy w użyciu API poprzez szkolenia Model.fit że uchwyty pętli szkolenie pod maską, z elastycznością przeciążać train_step i wywołania zwrotne, które zapewniają funkcje, takie jak oszczędność kontrolnego lub podsumowaniu oszczędności dla TensorBoard. Z Model.fit , ten sam kod szkolenia mogą być wykorzystane do innych strategii z prostym zamiany przedmiotu strategii.

Dane wejściowe

Model.fit szkolenia serwerze parametr wymaga, aby dane wejściowe być dostarczone w wywoływalnym że przyjmuje jeden argument typu tf.distribute.InputContext i zwraca tf.data.Dataset . Następnie utwórz tf.keras.utils.experimental.DatasetCreator obiekt, który zajmuje tak callable , a opcjonalny tf.distribute.InputOptions sprzeciw poprzez input_options argument.

Należy zauważyć, że zaleca się przetasować i powtórzyć dane z treningu serwerze parametr i określ steps_per_epoch w fit rozmowy więc biblioteka zna granic epoki.

Proszę zobaczyć Rozproszone wej samouczek, aby uzyskać więcej informacji na temat InputContext argument.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Kod w dataset_fn zostanie wywołany na urządzeniu wejściowym, który jest zwykle CPU, na każdej z maszyn roboczych.

Budowa i kompilacja modeli

Teraz można stworzyć tf.keras.Model -a trywialny tf.keras.models.Sequential model dla celów demonstracyjnych-obserwowanych przez Model.compile wezwanie do włączenia składników, takich jak optymalizator, metryki lub parametrów, takich jak steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Oddzwonienia i szkolenia

Zanim zadzwonisz model.fit do rzeczywistego treningu, niech przygotuje potrzebne zwrotnych dla typowych zadań, takich jak:

  • ModelCheckpoint : aby zapisać model ciężarów.
  • BackupAndRestore : upewnić się, że postęp szkolenie jest automatycznie kopie zapasowe, a odzyskane czy niedostępność doświadczenia klastrów (takich jak przerwanie lub wywłaszczania); lub
  • TensorBoard : aby zapisać raporty z postępu prac do plików podsumowania, które ulegną wizualizowane w narzędziu TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-12-02 02:22:17.429288: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 6s - loss: 0.6550 - 6s/epoch - 286ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.5718 - 546ms/epoch - 27ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f4b38365dd0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f4b4a806c20> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 1s - loss: 0.4267 - 502ms/epoch - 25ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.3612 - 394ms/epoch - 20ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.3184 - 385ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f4b4b93c510>

Bezpośrednie wykorzystanie z ClusterCoordinator (opcjonalnie)

Nawet jeśli wybierzesz Model.fit ścieżkę szkoleniową, można opcjonalnie instancji tf.distribute.experimental.coordinator.ClusterCoordinator obiektu zaplanować inne funkcje, które chciałbyś być wykonywane na robotników. Zobacz Trening z niestandardowej pętli trening sekcji więcej szczegółów i przykładów.

Trening z niestandardową pętlą treningową

Korzystanie pętle zwyczaj szkoleniowych z tf.distribute.Strategy zapewnia dużą elastyczność definiowania pętle treningowe. Z ParameterServerStrategy zdefiniowane powyżej (jako strategy ), można użyć tf.distribute.experimental.coordinator.ClusterCoordinator wysyłką wykonanie etapów szkoleniowych dla pracowników zdalnych.

Następnie będzie stworzenie modelu, określenie zestawu danych oraz funkcję kroku, jak to zrobić w pętli szkoleniowej z innymi tf.distribute.Strategy s. Można znaleźć więcej szczegółów w szkoleniu niestandardowej z tf.distribute.Strategy tutoriala.

W celu zapewnienia skutecznego zestawu danych prefetching, użyj polecany rozprowadzane zestawu danych API do tworzenia wymienione w krokach szkoleniowych wysyłki do zdalnych pracowników sekcji poniżej. Ponadto, upewnij się, aby zadzwonić Strategy.run wewnątrz worker_fn aby w pełni wykorzystać możliwości procesorów graficznych przeznaczonych dla pracowników. Pozostałe kroki są takie same w przypadku treningu z procesorami graficznymi lub bez nich.

Stwórzmy te komponenty w następujących krokach:

Skonfiguruj dane

Po pierwsze, napisać funkcję, która tworzy zbiór danych, który obejmuje przerób logiki realizowane przez Keras przerób warstw .

Będziesz tworzyć te warstwy poza dataset_fn jednak zastosować transformację wewnątrz dataset_fn , ponieważ będzie zawijać dataset_fn w tf.function , która nie pozwala na zmienne tworzone wewnątrz niego.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Wygeneruj przykłady zabawek w zbiorze danych:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Następnie należy utworzyć zestaw danych szkoleniowy zawinięte w dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Zbuduj model

Następnie utwórz model i inne obiekty. Upewnij się, aby stworzyć wszystkie zmienne pod strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Potwierdź Powiedzmy, że stosowanie FixedShardsPartitioner podzielić wszystkie zmienne na dwa odłamki i każdy odłamek został przydzielony do różnych serwerów parametrów:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Zdefiniuj etap szkolenia

Po trzecie, należy utworzyć etap treningowy owinięte w tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

W powyższej funkcji etapie szkolenia, nazywając Strategy.run i Strategy.reduce w step_fn może obsługiwać wiele GPU na pracownika. Jeżeli robotnicy GPU przydzielone Strategy.run będzie rozpowszechniać zestawów danych na wielu replikach.

Wyślij kroki szkoleniowe do pracowników zdalnych

Po wszystkie obliczenia są zdefiniowane przez ParameterServerStrategy , można użyć tf.distribute.experimental.coordinator.ClusterCoordinator klasę do tworzenia zasobów i rozpowszechniać etapy szkolenia pracowników zdalnych.

Niech najpierw utworzyć ClusterCoordinator obiekt i przekazać w obiekcie strategii:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Następnie utwórz zestaw danych na pracownika i iterator. W per_worker_dataset_fn poniżej, owijając dataset_fn do strategy.distribute_datasets_from_function zaleca się, aby umożliwić wydajne wstępne pobieranie do GPU bezproblemowo.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Ostatnim krokiem jest, aby rozprowadzić obliczeń dla pracowników zdalnych korzystających ClusterCoordinator.schedule :

  • schedule metoda kolejkuje do tf.function i zwraca przyszłość-jak RemoteValue natychmiast. W kolejce funkcje zostaną wysłane do zdalnych pracowników w wątkach tła i RemoteValue zostaną wypełnione w sposób asynchroniczny.
  • join metoda ( ClusterCoordinator.join ) mogą być wykorzystane do czekać, aż wszystkie zaplanowane funkcje są wykonywane.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.637500.
Finished epoch 1, accuracy is 0.906250.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Oto w jaki sposób można pobrać jego wynik RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternatywnie możesz uruchomić wszystkie kroki i zrobić coś podczas oczekiwania na zakończenie:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Dla pełnego szkolenia i obsługujących przepływ pracy w tym konkretnym przypadku, należy sprawdzić tego testu .

Więcej o tworzeniu zbiorów danych

Zbiór danych w powyższym kodzie jest tworzony za pomocą ClusterCoordinator.create_per_worker_dataset API). Tworzy jeden zestaw danych na pracownika i zwraca obiekt kontenera. Możesz zadzwonić do iter sposób na to, aby stworzyć iterator per-pracownika. Per pracownika iteracyjnej zawiera jeden iterację na pracownika odpowiedni plaster pracownika będą podstawione argument wejściowy w funkcji przekazywany do ClusterCoordinator.schedule sposobu przed funkcja jest wykonywana w konkretnym pracownika.

Obecnie ClusterCoordinator.schedule metoda zakłada robotnicy są równoważne i w ten sposób przyjmuje zestawy danych w różnych pracowników są takie same z wyjątkiem mogą być przetasowane inaczej, jeśli zawierają one Dataset.shuffle operacji. Z tego powodu zaleca się również, że zbiory danych powinien być powtarzany w nieskończoność i zaplanować skończoną liczbę kroków, zamiast opierania się na OutOfRangeError z zestawu danych.

Kolejna ważna uwaga jest taka, że tf.data zbiory danych nie obsługują niejawny serializacji i deserializacji ponad granicami zadaniowych. Dlatego ważne jest, aby stworzyć cały zestaw danych wewnątrz funkcji przekazany do ClusterCoordinator.create_per_worker_dataset .

Ocena

Istnieje więcej niż jeden sposób zdefiniowania i uruchomienia pętli oceny w szkoleniu rozproszonym. Każdy ma swoje zalety i wady, jak opisano poniżej. Metoda oceny inline jest zalecana, jeśli nie masz preferencji.

Ocena inline

W tej metodzie, zastępcy koordynatora pomiędzy szkoleń oraz oceny i dlatego nazywa się go ocenie rolkach.

Istnieje kilka korzyści z oceny wbudowanej. Na przykład:

  • Może obsługiwać duże modele ewaluacyjne i zestawy danych ewaluacyjnych, których nie może pomieścić pojedyncze zadanie.
  • Wyniki ewaluacji mogą posłużyć do podejmowania decyzji dotyczących szkolenia w następnej epoce.

Istnieją dwa sposoby wdrażania ewaluacji inline: ewaluacja bezpośrednia i ewaluacja rozproszona.

  • Ocena Direct: Dla małych modeli i zestawów danych ewaluacyjnych, koordynator może działać bezpośrednio na ocenę rozproszonego modelu z zestawu danych z oceny koordynatora:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Ukazuje ocena: Dla dużych modeli lub zestawów danych, które są niemożliwe do uruchomienia bezpośrednio na koordynatora, zadanie koordynator może rozdzielać zadania z oceny pracowników poprzez ClusterCoordinator.schedule / ClusterCoordinator.join metod:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Ocena samochodu bocznego

Inną metodą jest nazywany ocena side-car gdzie utworzyć dedykowany zadanie oceniającego, że wielokrotnie odczytuje punkty kontrolne i prowadzi ocenę w najnowszym punkcie kontrolnym. Pozwala to na wcześniejsze zakończenie programu treningowego, jeśli nie musisz zmieniać pętli treningowej na podstawie wyników oceny. Wymaga to jednak dodatkowego zadania ewaluatora i okresowego sprawdzania punktów kontrolnych, aby wyzwolić ocenę. Poniżej znajduje się możliwa pętla oceny wózka bocznego:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Klastry w świecie rzeczywistym

W rzeczywistym środowisku produkcyjnym uruchomisz wszystkie zadania w różnych procesach na różnych maszynach. Najprostszym sposobem, aby informacje na temat każdego klastra Konfigurowanie zadania jest ustawiony "TF_CONFIG" zmienne środowiskowe i używać tf.distribute.cluster_resolver.TFConfigClusterResolver do analizowania "TF_CONFIG" .

Ogólny opis "TF_CONFIG" zmiennych środowiskowych, patrz Ukazuje szkolenia przewodnika.

Jeśli zaczniesz swoje zadania szkoleniowe korzystania Kubernetes lub innych szablonów konfiguracyjnych, to jest bardzo prawdopodobne, że te szablony zostały już ustawione “TF_CONFIG" dla Ciebie.

Ustaw "TF_CONFIG" zmienną środowiskową

Załóżmy, że masz 3 robotników i 2 serwery parametru, "TF_CONFIG" pracownika 1 mogą być:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" od oceniającego mogą być:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" część w powyższym "TF_CONFIG" ciąg dla oceniającego jest opcjonalne.

Jeśli używasz tego samego pliku binarnego do wszystkich zadań

Jeśli wolisz uruchamiać wszystkie te zadania za pomocą jednego pliku binarnego, na samym początku musisz pozwolić swojemu programowi rozgałęziać się na różne role:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Poniższy kod uruchamia serwer TensorFlow i czeka:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Obsługa niepowodzenia zadania

Awaria pracownika

tf.distribute.experimental.coordinator.ClusterCoordinator lub Model.fit zapewniają wbudowaną odporność na uszkodzenia niewydolności pracowników. Po przywróceniu pracownika, pod warunkiem wcześniej funkcja zestawu danych (albo ClusterCoordinator.create_per_worker_dataset dla niestandardowego pętli szkoleniowej lub tf.keras.utils.experimental.DatasetCreator dla Model.fit ) zostanie wywołany na pracowników, aby ponownie utworzyć zestawy danych.

Awaria serwera parametrów lub koordynatora

Jednak, gdy koordynator widzi błąd serwera parametr, to zgłosi UnavailableError lub AbortedError natychmiast. W takim przypadku możesz ponownie uruchomić koordynatora. Sam koordynator również może stać się niedostępny. Dlatego zalecane jest odpowiednie oprzyrządowanie, aby nie stracić postępów w treningu:

  • Dla Model.fit , należy użyć BackupAndRestore zwrotnego, która obsługuje zapisywanie postępów i przywracanie automatycznie. Zobacz wywołania zwrotne i szkolenia sekcja wyżej dla przykładu.

  • W przypadku niestandardowej pętli szkoleniowej należy okresowo sprawdzać zmienne modelu i załadować zmienne modelu z punktu kontrolnego, jeśli istnieje, przed rozpoczęciem szkolenia. Postęp szkolenia można wywieść od około optimizer.iterations jeśli optymalizator jest checkpointed:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Pobieranie z RemoteValue

Pobieranie z RemoteValue gwarantuje sukcesu, jeśli funkcja jest wykonana pomyślnie. Dzieje się tak, ponieważ obecnie wartość zwracana jest natychmiast kopiowana do koordynatora po wykonaniu funkcji. Jeśli podczas kopiowania wystąpi awaria pracownika, funkcja zostanie ponowiona na innym dostępnym pracowniku. Dlatego jeśli chcesz zoptymalizować wydajność, możesz zaplanować funkcje bez zwracanej wartości.

Zgłaszanie błędów

Gdy koordynator widzi błędu takiego jak UnavailableError z serwerów parametrów lub innych błędów aplikacji, takich jak InvalidArgument od tf.debugging.check_numerics będzie anulować wszystkie oczekujące w kolejce i funkcje przed podniesieniem błąd. Pobierania ich odpowiadające RemoteValue s podniesie CancelledError .

Po zgłoszeniu błędu koordynator nie zgłosi tego samego błędu ani żadnego błędu z anulowanych funkcji.

Poprawa wydajności

Istnieje kilka możliwych przyczyn, jeśli widzisz problemy z wydajnością, gdy trenujesz z ParameterServerStrategy i ClusterResolver .

Jedną z częstych przyczyn jest to, że serwery parametrów mają niezrównoważone obciążenie, a niektóre mocno obciążone serwery parametrów osiągnęły pojemność. Przyczyn może być również wiele. Niektóre proste metody złagodzenia tego problemu to:

  1. Shard swoich dużych zmiennych modelu poprzez określenie variable_partitioner przy konstruowaniu ParameterServerStrategy .
  2. Jeśli to możliwe, unikaj tworzenia zmiennej hotspotu wymaganej przez wszystkie serwery parametrów w jednym kroku. Na przykład użyć stałej szybkości uczenia się lub podklasy tf.keras.optimizers.schedules.LearningRateSchedule w optymalizujące ponieważ domyślne zachowanie jest, że szybkość uczenia się będzie zmienna umieszczone na określonym serwerze parametrów i oczekiwana przez wszystkich innych serwerów parametrów w każdym kroku .
  3. Potasuj swoje duże słowniki przed przekazaniem ich do warstw przetwarzania wstępnego Keras.

Innym możliwym powodem problemów z wydajnością jest koordynator. Twoja pierwsza realizacja schedule / join to Python oparte a zatem mogły gwintowania napowietrznych. Również opóźnienie między koordynatorem a pracownikami może być duże. Jeżeli o to chodzi,

  • Dla Model.fit można ustawić steps_per_execution argumentu przewidzianego w Model.compile do wartości większej niż 1.

  • W przypadku pętli zwyczaj treningowej, można zapakować kilka kroków w jedną tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Ponieważ biblioteka jest dalej optymalizowana, miejmy nadzieję, że większość użytkowników nie będzie musiała ręcznie pakować kroków w przyszłości.

Ponadto małą sztuczką w celu poprawy wydajności jest zaplanowanie funkcji bez wartości zwracanej, jak wyjaśniono w sekcji dotyczącej niepowodzenia zadania obsługi powyżej.

Znane ograniczenia

Większość znanych ograniczeń została już omówiona w powyższych sekcjach. Ta sekcja zawiera podsumowanie.

ParameterServerStrategy ogólnie

  • os.environment["grpc_fail_fast"]="use_caller" jest potrzebna na każdym zadaniu tym koordynatora, aby uczynić pracę odporność na uszkodzenia prawidłowo.
  • Uczenie serwera parametrów synchronicznych nie jest obsługiwane.
  • Zwykle konieczne jest spakowanie wielu kroków w jedną funkcję, aby osiągnąć optymalną wydajność.
  • To nie jest obsługiwana załadować saved_model poprzez tf.saved_model.load zawierającej sharded zmienne. Uwaga: ładowanie takiego zapisanego modelu przy użyciu TensorFlow Serving powinno działać.
  • Nie jest obsługiwane ładowanie punktu kontrolnego zawierającego zmienne slotu optymalizatora podzielonego na fragmenty do innej liczby fragmentów.
  • Nie jest obsługiwane odzyskiwanie po awarii serwera parametrów bez restartowania zadania koordynatora.
  • Zastosowanie tf.lookup.StaticHashTable (które często stosuje się w niektórych Keras przerób warstw, takich jak tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup i tf.keras.layers.TextVectorization ) prowadzi do środków umieszczonych na koordynator w tym czasie ze szkoleniem serwera parametrów. Ma to wpływ na wydajność wyszukiwania RPC od pracowników do koordynatora. Jest to obecnie wysoki priorytet do rozwiązania.

Model.fit specyfika

  • steps_per_epoch argumentem jest wymagane w Model.fit . Możesz wybrać wartość, która zapewnia odpowiednie odstępy w epoce.
  • ParameterServerStrategy nie posiada wsparcie dla niestandardowych wywołań zwrotnych, które mają połączenia partia poziomie ze względu na wydajność. Należy przekonwertować te rozmowy do rozmowy epoka szczebla z odpowiednio zbierane steps_per_epoch , tak że są one nazywane każdy steps_per_epoch liczbę kroków. Nie ma to wpływu na wbudowane wywołania zwrotne: ich wywołania na poziomie wsadu zostały zmodyfikowane tak, aby były wydajne. Wspieranie połączenia partii szczebla dla ParameterServerStrategy jest planowane.
  • Z tego samego powodu, w przeciwieństwie do innych strategii, pasek postępu i metryki są rejestrowane tylko na granicach epok.
  • run_eagerly nie jest obsługiwany.

Specyfika niestandardowej pętli treningowej