Weź udział w sympozjum Women in ML 7 grudnia Zarejestruj się teraz

Szkolenie dla wielu pracowników z Keras

Zadbaj o dobrą organizację dzięki kolekcji Zapisuj i kategoryzuj treści zgodnie ze swoimi preferencjami.

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

W tym samouczku pokazano, jak przeprowadzić rozproszone szkolenie z wieloma pracownikami za pomocą modelu Keras i interfejsu API Model.fit przy użyciu interfejsu API tf.distribute.Strategy — w szczególności klasy tf.distribute.MultiWorkerMirroredStrategy . Dzięki tej strategii model Keras, który został zaprojektowany do pracy na jednym pracowniku, może bezproblemowo pracować na wielu pracownikach przy minimalnych zmianach kodu.

Dla osób zainteresowanych głębszym zrozumieniem interfejsów API tf.distribute.Strategy dostępny jest przewodnik dotyczący szkolenia rozproszonego w zakresie TensorFlow , który zawiera przegląd strategii dystrybucji obsługiwanych przez TensorFlow.

Aby dowiedzieć się, jak korzystać z MultiWorkerMirroredStrategy z Keras i niestandardowej pętli szkoleniowej, zapoznaj się z Niestandardową pętlą szkoleniową z Keras i MultiWorkerMirroredStrategy .

Zauważ, że celem tego samouczka jest zademonstrowanie minimalnego przykładu wielu pracowników z dwoma pracownikami.

Ustawiać

Zacznij od kilku niezbędnych importów:

import json
import os
import sys

Przed zaimportowaniem TensorFlow wprowadź kilka zmian w środowisku:

  1. Wyłącz wszystkie procesory graficzne. Zapobiega to błędom powodowanym przez wszystkich pracowników próbujących korzystać z tego samego GPU. W rzeczywistej aplikacji każdy pracownik byłby na innej maszynie.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Zresetuj zmienną środowiskową TF_CONFIG (więcej na ten temat dowiesz się później):
os.environ.pop('TF_CONFIG', None)
  1. Upewnij się, że bieżący katalog znajduje się na ścieżce Pythona — dzięki temu notatnik będzie mógł później zaimportować pliki zapisane przez %%writefile :
if '.' not in sys.path:
  sys.path.insert(0, '.')

Teraz zaimportuj TensorFlow:

import tensorflow as tf

Zbiór danych i definicja modelu

Następnie utwórz plik mnist_setup.py z prostą konfiguracją modelu i zestawu danych. Ten plik Pythona będzie używany przez procesy robocze w tym samouczku:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Szkolenie modelowe na jednym pracowniku

Spróbuj wytrenować model dla niewielkiej liczby epok i obserwuj wyniki jednego pracownika , aby upewnić się, że wszystko działa poprawnie. W miarę postępu treningu strata powinna spadać, a dokładność powinna wzrastać.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Konfiguracja wielostanowiskowa

Teraz wejdźmy w świat szkoleń wieloosobowych.

Klaster z zadaniami i zadaniami

W TensorFlow szkolenie rozproszone obejmuje: 'cluster' z kilkoma zadaniami, a każde z zadań może mieć jedno lub więcej 'task' .

Potrzebna będzie zmienna środowiskowa konfiguracji TF_CONFIG do uczenia na wielu komputerach, z których każdy może mieć inną rolę. TF_CONFIG to ciąg JSON używany do określenia konfiguracji klastra dla każdego procesu roboczego, który jest częścią klastra.

Istnieją dwa składniki zmiennej TF_CONFIG : 'cluster' i 'task' .

  • 'cluster' jest taki sam dla wszystkich pracowników i dostarcza informacji o klastrze szkoleniowym, który jest dyktatem składającym się z różnych rodzajów zawodów, takich jak 'worker' lub 'chief' .

    • W szkoleniu dla wielu pracowników z tf.distribute.MultiWorkerMirroredStrategy , oprócz tego, co robi zwykły 'worker' , zwykle jest jeden 'worker' , który przejmuje obowiązki, takie jak zapisywanie punktu kontrolnego i pisanie pliku podsumowującego dla TensorBoard. Taki 'worker' jest określany jako główny pracownik (z nazwą stanowiska 'chief' ).
    • Jest zwyczajem, że 'chief' ma wyznaczony 'index' 0 (w rzeczywistości jest to sposób implementacji tf.distribute.Strategy ).
  • 'task' dostarcza informacji o bieżącym zadaniu i jest inne dla każdego pracownika. Określa 'type' i 'index' tego pracownika.

Poniżej przykładowa konfiguracja:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Oto ten sam TF_CONFIG zserializowany jako ciąg JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Zauważ, że tf_config jest tylko zmienną lokalną w Pythonie. Aby móc go używać do konfiguracji szkolenia, ten dict musi być serializowany jako JSON i umieszczony w zmiennej środowiskowej TF_CONFIG .

W przykładowej konfiguracji powyżej ustawisz zadanie 'type' na 'worker' i zadanie 'index' na 0 . Dlatego ta maszyna jest pierwszym pracownikiem. Zostanie wyznaczony na 'chief' pracownika i wykona więcej pracy niż inni.

W celach ilustracyjnych ten samouczek pokazuje, jak można skonfigurować zmienną TF_CONFIG z dwoma procesami roboczymi na localhost .

W praktyce należy utworzyć wiele procesów roboczych na zewnętrznych adresach IP/portach i odpowiednio ustawić zmienną TF_CONFIG dla każdego pracownika.

W tym samouczku użyjesz dwóch robotników:

  • TF_CONFIG pierwszego ( 'chief' ) pracownika pokazano powyżej.
  • Dla drugiego pracownika ustawisz tf_config['task']['index']=1

Zmienne środowiskowe i podprocesy w notebookach

Podprocesy dziedziczą zmienne środowiskowe po swoich rodzicach.

Na przykład możesz ustawić zmienną środowiskową w tym procesie Jupyter Notebook w następujący sposób:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Następnie możesz uzyskać dostęp do zmiennej środowiskowej z podprocesów:

echo ${GREETINGS}
Hello TensorFlow!

W następnej sekcji użyjesz podobnej metody, aby przekazać TF_CONFIG do podprocesów roboczych. W rzeczywistym scenariuszu nie uruchamiałbyś swoich zadań w ten sposób, ale w tym przykładzie jest to wystarczające.

Wybierz odpowiednią strategię

W TensorFlow istnieją dwie główne formy szkolenia rozproszonego:

  • Szkolenie synchroniczne , w którym etapy szkolenia są synchronizowane między procesami roboczymi i replikami oraz
  • Trening asynchroniczny , w którym kroki trenowania nie są ściśle synchronizowane (na przykład trenowanie serwera parametrów ).

W tym samouczku pokazano, jak wykonać synchroniczne szkolenie z wieloma pracownikami przy użyciu wystąpienia tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy tworzy kopie wszystkich zmiennych w warstwach modelu na każdym urządzeniu dla wszystkich pracowników. Wykorzystuje CollectiveOps , operację TensorFlow do komunikacji zbiorowej, aby agregować gradienty i synchronizować zmienne. Przewodnik tf.distribute.Strategy zawiera więcej szczegółów na temat tej strategii.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy zapewnia wiele implementacji za pośrednictwem parametru tf.distribute.experimental.CommunicationOptions : 1) RING implementuje kolektywy oparte na pierścieniu, używając gRPC jako warstwy komunikacji między hostami; 2) NCCL używa zbiorowej biblioteki komunikacyjnej NVIDIA do wdrażania kolektywów; oraz 3) AUTO odracza wybór do czasu pracy. Najlepszy wybór implementacji zbiorowej zależy od liczby i rodzaju procesorów graficznych oraz połączenia sieciowego w klastrze.

Trenuj modelkę

Dzięki integracji tf.distribute.Strategy API z tf.keras jedyną zmianą, jaką wprowadzisz w celu dystrybucji szkolenia do wielu pracowników, jest uwzględnienie budowania modelu i model.compile() wewnątrz strategy.scope() . Zakres strategii dystrybucji dyktuje sposób i miejsce tworzenia zmiennych, aw przypadku MultiWorkerMirroredStrategy tworzone zmienne są MirroredVariable i są replikowane na każdym z pracowników.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Aby faktycznie działać z MultiWorkerMirroredStrategy , musisz uruchomić procesy robocze i przekazać im TF_CONFIG .

Podobnie jak napisany wcześniej plik mnist_setup.py , tutaj jest main.py , który będzie uruchamiał każdy z pracowników:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

W powyższym fragmencie kodu należy zauważyć, że parametr global_batch_size , który jest przekazywany do Dataset.batch , jest ustawiony na per_worker_batch_size * num_workers . Dzięki temu każdy proces roboczy przetwarza partie przykładów per_worker_batch_size niezależnie od liczby procesów roboczych.

Bieżący katalog zawiera teraz oba pliki Pythona:

ls *.py
main.py
mnist_setup.py

Więc json-serializuj TF_CONFIG i dodaj go do zmiennych środowiskowych:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz możesz uruchomić proces roboczy, który uruchomi main.py i użyje TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

W powyższym poleceniu należy zwrócić uwagę na kilka rzeczy:

  1. Używa %%bash , który jest "magią" notatnika do uruchamiania niektórych poleceń basha.
  2. Używa flagi --bg do uruchomienia procesu bash w tle, ponieważ ten proces roboczy nie zostanie zakończony. Czeka na wszystkich pracowników, zanim się zacznie.

Proces roboczy działający w tle nie drukuje danych wyjściowych do tego notatnika, więc &> przekierowuje dane wyjściowe do pliku, dzięki czemu można później sprawdzić, co się stało w pliku dziennika.

Poczekaj więc kilka sekund, aż proces się uruchomi:

import time
time.sleep(10)

Teraz sprawdź, co do tej pory zostało wyprowadzone do pliku dziennika pracownika:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Ostatnia linia pliku dziennika powinna Started server with target: grpc://localhost:12345 . Pierwszy pracownik jest teraz gotowy i czeka, aż pozostali pracownicy będą gotowi do pracy.

Zaktualizuj więc tf_config dla procesu drugiego pracownika, aby pobrać:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Uruchom drugiego pracownika. Rozpocznie to szkolenie, ponieważ wszyscy pracownicy są aktywni (więc nie ma potrzeby wprowadzania tego procesu w tle):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Jeśli ponownie sprawdzisz dzienniki zapisane przez pierwszego pracownika, dowiesz się, że uczestniczył on w szkoleniu tego modelu:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Nic dziwnego, że działało to wolniej niż test na początku tego samouczka.

Praca wielu pracowników na jednej maszynie tylko zwiększa koszty.

Celem tutaj nie było poprawienie czasu szkolenia, a jedynie podanie przykładu szkolenia wielopracowniczego.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Dogłębne szkolenie dla wielu pracowników

Do tej pory nauczyłeś się, jak wykonać podstawową konfigurację dla wielu pracowników.

W dalszej części samouczka szczegółowo poznasz inne czynniki, które mogą być przydatne lub ważne w rzeczywistych przypadkach użycia.

Fragmentowanie zbioru danych

W przypadku szkolenia z wieloma pracownikami fragmentacja zestawu danych jest niezbędna do zapewnienia zbieżności i wydajności.

Przykład w poprzedniej sekcji opiera się na domyślnym autoshardingu dostarczonym przez interfejs API tf.distribute.Strategy . Możesz kontrolować fragmentowanie, ustawiając tf.data.experimental.AutoShardPolicy tf.data.experimental.DistributeOptions .

Aby dowiedzieć się więcej o automatycznym dzieleniu na fragmenty , zapoznaj się z przewodnikiem dotyczącym wprowadzania danych rozproszonych .

Oto szybki przykład, jak wyłączyć automatyczne sharding, aby każda replika przetwarzała każdy przykład ( niezalecane ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Ocena

Jeśli przekażesz validation_data do Model.fit , będzie to naprzemiennie trenować i oceniać dla każdej epoki. Ocena z validation_data jest rozdzielana na ten sam zestaw pracowników, a wyniki oceny są agregowane i dostępne dla wszystkich pracowników.

Podobnie jak w przypadku szkolenia, zestaw danych walidacji jest automatycznie dzielony na fragmenty na poziomie pliku. Musisz ustawić globalny rozmiar partii w zestawie danych walidacji i ustawić validation_steps .

Do oceny zaleca się również powtórny zbiór danych.

Alternatywnie można również utworzyć inne zadanie, które okresowo odczytuje punkty kontrolne i uruchamia ocenę. To właśnie robi Estymator. Nie jest to jednak zalecany sposób wykonywania ewaluacji, przez co pomijane są jego szczegóły.

Wydajność

Masz teraz model Keras, który jest skonfigurowany do uruchamiania w wielu pracownikach za pomocą MultiWorkerMirroredStrategy .

Aby poprawić wydajność szkolenia wieloosobowego, możesz wypróbować następujące rozwiązania:

  • tf.distribute.MultiWorkerMirroredStrategy zapewnia wiele implementacji komunikacji zbiorowej :

    • RING implementuje kolektywy oparte na pierścieniu, wykorzystując gRPC jako warstwę komunikacji między hostami.
    • NCCL używa zbiorowej biblioteki komunikacyjnej NVIDIA do wdrażania kolektywów.
    • AUTO odracza wybór do czasu pracy.

    Najlepszy wybór implementacji zbiorowej zależy od liczby procesorów GPU, typu procesorów GPU i połączenia sieciowego w klastrze. Aby pominąć wybór automatyczny, określ parametr communication_options konstruktora MultiWorkerMirroredStrategy . Na przykład:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Przerzuć zmienne na tf.float , jeśli to możliwe:

    • Oficjalny model ResNet zawiera przykład tego, jak można to zrobić.

Tolerancja błędów

W szkoleniu synchronicznym klaster ulegnie awarii, jeśli jeden z procesów roboczych ulegnie awarii i nie istnieje mechanizm odzyskiwania po awarii.

Używanie Keras z tf.distribute.Strategy ma tę zaletę, że jest odporne na błędy w przypadkach, gdy pracownicy umierają lub są niestabilni w inny sposób. Można to zrobić, zachowując stan uczący w wybranym rozproszonym systemie plików, tak aby po ponownym uruchomieniu instancji, która wcześniej nie powiodła się lub została wywłaszczona, stan uczący zostanie odzyskany.

Gdy pracownik stanie się niedostępny, inni pracownicy przestaną działać (prawdopodobnie po przekroczeniu limitu czasu). W takich przypadkach niedostępnego pracownika należy ponownie uruchomić, a także innych pracowników, których nie powiodło się.

Odwołanie zwrotne do punktu kontrolnego modelu

Wywołanie zwrotne ModelCheckpoint nie zapewnia już funkcji odporności na błędy, zamiast tego użyj wywołania zwrotnego BackupAndRestore .

Wywołanie zwrotne ModelCheckpoint może nadal służyć do zapisywania punktów kontrolnych. Ale dzięki temu, jeśli szkolenie zostało przerwane lub pomyślnie zakończone, aby kontynuować szkolenie z punktu kontrolnego, użytkownik jest odpowiedzialny za ręczne załadowanie modelu.

Opcjonalnie użytkownik może wybrać zapisanie i przywrócenie modelu/wag poza wywołaniem zwrotnym ModelCheckpoint .

Zapisywanie i ładowanie modelu

Aby zapisać model za pomocą model.save lub tf.saved_model.save , miejsce docelowe zapisu musi być inne dla każdego pracownika.

  • W przypadku pracowników niegłównych konieczne będzie zapisanie modelu w katalogu tymczasowym.
  • Dla szefa będziesz musiał zapisać w podanym katalogu modeli.

Katalogi tymczasowe w procesie roboczym muszą być unikatowe, aby zapobiec błędom wynikającym z wielu procesów roboczych próbujących pisać w tej samej lokalizacji.

Model zapisany we wszystkich katalogach jest identyczny i zazwyczaj tylko model zapisany przez szefa powinien być przywoływany w celu przywrócenia lub udostępnienia.

Powinieneś mieć pewną logikę czyszczenia, która usuwa katalogi tymczasowe utworzone przez pracowników po zakończeniu szkolenia.

Powodem jednoczesnego oszczędzania na szefie i pracownikach jest możliwość agregowania zmiennych podczas tworzenia punktów kontrolnych, co wymaga udziału zarówno szefa, jak i pracowników w protokole komunikacyjnym allreduce. Z drugiej strony zezwolenie szefowi i pracownikom na zapisywanie w tym samym katalogu modelu spowoduje błędy z powodu rywalizacji.

Korzystając z MultiWorkerMirroredStrategy , program jest uruchamiany na każdym pracowniku, a aby wiedzieć, czy aktualny pracownik jest szefem, wykorzystuje obiekt przeliczania klastra, który ma atrybuty task_type i task_id :

  • task_type mówi ci, jaka jest aktualna praca (np. 'worker' ).
  • task_id podaje identyfikator pracownika.
  • Pracownik o task_id == 0 jest wyznaczony jako główny pracownik.

W poniższym fragmencie kodu funkcja write_filepath udostępnia ścieżkę pliku do zapisu, która zależy od identyfikatora task_id pracownika:

  • W przypadku głównego pracownika (z task_id == 0 ) zapisuje w oryginalnej ścieżce pliku.
  • W przypadku innych pracowników tworzy katalog tymczasowy — temp_dir — z identyfikatorem task_id w ścieżce do katalogu, w którym mają być zapisywane:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Dzięki temu możesz teraz oszczędzać:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Jak opisano powyżej, później model powinien być ładowany tylko z zapisanej ścieżki głównej, więc usuńmy tymczasowe zapisane przez pracowników niegłównych:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Teraz, gdy nadejdzie czas ładowania, skorzystajmy z wygodnego API tf.keras.models.load_model i kontynuujmy dalszą pracę.

W tym przypadku załóżmy, że do ładowania i kontynuowania szkolenia używa się tylko jednego pracownika, w którym to przypadku nie wywołuje się tf.keras.models.load_model w ramach innej strategy.scope() (zwróć uwagę, że strategy = tf.distribute.MultiWorkerMirroredStrategy() , jak zdefiniowano wcześniej ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Zapisywanie i przywracanie punktu kontrolnego

Z drugiej strony, checkpointy pozwalają na zapisywanie wag modelu i przywracanie ich bez konieczności zapisywania całego modelu.

Tutaj utworzysz jeden tf.train.Checkpoint , który śledzi model, zarządzany przez tf.train.CheckpointManager , dzięki czemu zachowany zostanie tylko najnowszy punkt kontrolny:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Po skonfigurowaniu CheckpointManager możesz teraz zapisać i usunąć punkty kontrolne zapisane przez pracowników niegłównych:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Teraz, gdy musisz przywrócić model, możesz znaleźć najnowszy punkt kontrolny zapisany za pomocą wygodnej funkcji tf.train.latest_checkpoint . Po przywróceniu punktu kontrolnego możesz kontynuować trening.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Wywołanie zwrotne kopii zapasowej i przywracania

Wywołanie zwrotne tf.keras.callbacks.BackupAndRestore zapewnia odporność na awarie, tworząc kopię zapasową modelu i bieżącego numeru epoki w tymczasowym pliku punktu kontrolnego pod argumentem backup_dir do BackupAndRestore . Odbywa się to na końcu każdej epoki.

Po przerwaniu i ponownym uruchomieniu zadań wywołanie zwrotne przywraca ostatni punkt kontrolny, a szkolenie jest kontynuowane od początku przerwanej epoki. Wszelkie częściowe szkolenia, które zostały już wykonane w niedokończonej epoce przed przerwaniem, zostaną odrzucone, aby nie wpłynęły na ostateczny stan modelu.

Aby z niego skorzystać, podaj instancję tf.keras.callbacks.BackupAndRestore w wywołaniu Model.fit .

W przypadku MultiWorkerMirroredStrategy , jeśli pracownik zostanie przerwany, cały klaster zostanie wstrzymany do momentu ponownego uruchomienia przerwanego procesu roboczego. Inne procesy robocze również zostaną ponownie uruchomione, a przerwany proces roboczy ponownie dołączy do klastra. Następnie każdy pracownik odczytuje wcześniej zapisany plik punktu kontrolnego i przywraca jego poprzedni stan, umożliwiając w ten sposób przywrócenie synchronizacji klastra. Następnie szkolenie trwa.

Wywołanie zwrotne BackupAndRestore używa CheckpointManager do zapisywania i przywracania stanu uczenia, który generuje plik o nazwie punkt kontrolny, który śledzi istniejące punkty kontrolne wraz z najnowszym. Z tego powodu, backup_dir nie powinien być ponownie używany do przechowywania innych punktów kontrolnych w celu uniknięcia kolizji nazw.

Obecnie wywołanie zwrotne BackupAndRestore obsługuje szkolenia dla jednego pracownika bez strategii — MirroredStrategy — oraz szkolenia dla wielu pracowników z MultiWorkerMirroredStrategy .

Poniżej znajdują się dwa przykłady zarówno szkolenia dla wielu pracowników, jak i szkolenia dla jednego pracownika:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Jeśli sprawdzisz katalog backup_dir określony w BackupAndRestore , możesz zauważyć tymczasowo wygenerowane pliki punktów kontrolnych. Te pliki są potrzebne do odzyskania wcześniej utraconych instancji i zostaną usunięte przez bibliotekę na końcu Model.fit po pomyślnym zakończeniu szkolenia.

Dodatkowe zasoby

  1. Szkolenie rozproszone w przewodniku TensorFlow zawiera przegląd dostępnych strategii dystrybucji.
  2. Samouczek Niestandardowa pętla szkoleniowa z Keras i MultiWorkerMirroredStrategy pokazuje, jak używać MultiWorkerMirroredStrategy z Keras i niestandardową pętlą szkoleniową.
  3. Sprawdź oficjalne modele , z których wiele można skonfigurować do uruchamiania wielu strategii dystrybucji.
  4. Przewodnik Lepsza wydajność dzięki tf.function zawiera informacje o innych strategiach i narzędziach, takich jak TensorFlow Profiler , których można użyć do optymalizacji wydajności modeli TensorFlow.