Mam pytanie? Połącz się ze społecznością na Forum TensorFlow Odwiedź Forum

Szkolenie dla wielu pracowników z Keras

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

W tym samouczku przedstawiono rozproszone szkolenie dla wielu pracowników z modelem Keras przy użyciu interfejsu API tf.distribute.Strategy , w szczególności tf.distribute.MultiWorkerMirroredStrategy . Dzięki tej strategii model Keras, który został zaprojektowany do pracy na jednym pracowniku, może bezproblemowo pracować na wielu pracownikach przy minimalnej zmianie kodu.

Przewodnik dotyczący szkoleń rozproszonych w TensorFlow zawiera przegląd strategii dystrybucji obsługiwanych przez TensorFlow dla osób zainteresowanych głębszym zrozumieniem interfejsów API tf.distribute.Strategy .

Ustawiać

Po pierwsze, niektóre niezbędne importy.

import json
import os
import sys

Przed zaimportowaniem TensorFlow wprowadź kilka zmian w środowisku.

Wyłącz wszystkie procesory graficzne. Zapobiega to błędom powodowanym przez wszystkich pracowników próbujących korzystać z tego samego GPU. W przypadku rzeczywistej aplikacji każdy pracownik byłby na innej maszynie.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Zresetuj TF_CONFIG środowiskową TF_CONFIG , więcej na ten temat dowiesz się później.

os.environ.pop('TF_CONFIG', None)

Upewnij się, że bieżący katalog znajduje się na ścieżce Pythona. Dzięki temu notatnik może później zaimportować pliki zapisane przez %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Teraz zaimportuj TensorFlow.

import tensorflow as tf

Zbiór danych i definicja modelu

Następnie utwórz plik mnist.py z prostą konfiguracją modelu i zestawu danych. Ten plik Pythona będzie używany przez procesy robocze w tym samouczku:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Spróbuj wytrenować model dla niewielkiej liczby epok i obserwuj wyniki jednego pracownika, aby upewnić się, że wszystko działa poprawnie. W miarę postępu treningu strata powinna spadać, a dokładność powinna wzrastać.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2783 - accuracy: 0.2152
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2298 - accuracy: 0.3783
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1673 - accuracy: 0.5096
<tensorflow.python.keras.callbacks.History at 0x7fb074a356d0>

Konfiguracja dla wielu pracowników

Teraz wejdźmy w świat szkoleń wieloosobowych. W TensorFlow zmienna środowiskowa TF_CONFIG jest wymagana do uczenia na wielu komputerach, z których każdy może mieć inną rolę. TF_CONFIG to ciąg JSON używany do określenia konfiguracji klastra na każdym TF_CONFIG który jest częścią klastra.

Oto przykładowa konfiguracja:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Oto ten sam TF_CONFIG zserializowany jako ciąg JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Istnieją dwa składniki TF_CONFIG : cluster i task .

  • cluster jest taki sam dla wszystkich pracowników i dostarcza informacji o klastrze szkoleniowym, który jest dyktatem składającym się z różnych rodzajów zawodów, takich jak worker . W szkoleniu dla wielu pracowników z MultiWorkerMirroredStrategy , zazwyczaj jeden worker bierze na siebie nieco większą odpowiedzialność, jak zapisywanie punktu kontrolnego i pisanie pliku podsumowującego dla TensorBoard, oprócz tego, co robi zwykły worker . Taki pracownik jest nazywany chief pracownikiem, a zwyczajem jest, że worker z index 0 jest mianowany głównym worker (w rzeczywistości tak tf.distribute.Strategy ).

  • task dostarcza informacji o bieżącym zadaniu i jest inne dla każdego pracownika. Określa type i index tego pracownika.

W tym przykładzie type zadania jest ustawiony na "worker" a index zadania na 0 . Ta maszyna jest pierwszym pracownikiem i zostanie wyznaczona jako główny pracownik i wykona więcej pracy niż inni. Należy zauważyć, że inne komputery również będą musiały mieć ustawioną zmienną środowiskową TF_CONFIG , która powinna mieć ten sam dyktat cluster , ale inny type zadania lub index zadania w zależności od ról tych komputerów.

W celach ilustracyjnych, ten samouczek pokazuje, jak można ustawić TF_CONFIG z 2 pracownikami na localhost . W praktyce użytkownicy tworzyliby wiele TF_CONFIG na zewnętrznych adresach IP/portach i odpowiednio ustawiali TF_CONFIG na każdym TF_CONFIG .

W tym przykładzie użyjesz 2 robotników, TF_CONFIG pierwszego robotnika pokazano powyżej. Dla drugiego pracownika ustawisz tf_config['task']['index']=1

Powyżej tf_config jest tylko zmienną lokalną w Pythonie. Aby faktycznie użyć go do skonfigurowania uczenia, ten słownik musi być serializowany jako JSON i umieszczony w zmiennej środowiskowej TF_CONFIG .

Zmienne środowiskowe i podprocesy w notebookach

Podprocesy dziedziczą zmienne środowiskowe po rodzicach. Jeśli więc ustawisz zmienną środowiskową w tym jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Dostęp do zmiennej środowiskowej można uzyskać z podprocesów:

echo ${GREETINGS}
Hello TensorFlow!

W następnej sekcji użyjesz tego do przekazania TF_CONFIG do podprocesów roboczych. Nigdy nie uruchamiałbyś swoich zadań w ten sposób, ale to wystarczy na potrzeby tego samouczka: aby zademonstrować minimalny przykład wielu pracowników.

Wybierz odpowiednią strategię

W TensorFlow istnieją dwie główne formy szkolenia rozproszonego:

  • Szkolenie synchroniczne, w którym etapy szkolenia są synchronizowane między pracownikami i replikami, oraz
  • Trening asynchroniczny, w którym etapy treningu nie są ściśle zsynchronizowane.

MultiWorkerMirroredStrategy zostanie przedstawiona strategia MultiWorkerMirroredStrategy , która jest zalecaną strategią synchronicznego szkolenia dla wielu pracowników. Aby wytrenować model, użyj wystąpienia tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy tworzy kopie wszystkich zmiennych w warstwach modelu na każdym urządzeniu dla wszystkich pracowników. Wykorzystuje CollectiveOps , operację TensorFlow do komunikacji zbiorowej, aby agregować gradienty i synchronizować zmienne. Przewodnik tf.distribute.Strategy zawiera więcej szczegółów na temat tej strategii.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy zapewnia wiele implementacji za pośrednictwem parametru CommunicationOptions . RING implementuje kolektywy oparte na pierścieniu, wykorzystując gRPC jako warstwę komunikacji między hostami. NCCL używa NCCL Nvidii do wdrażania kolektywów. AUTO odracza wybór do czasu pracy. Najlepszy wybór implementacji zbiorczej zależy od liczby i rodzaju procesorów graficznych oraz połączenia sieciowego w klastrze.

Trenuj modelkę

Dzięki integracji tf.distribute.Strategy API z tf.keras jedyną zmianą, jaką wprowadzisz w celu dystrybucji szkolenia do wielu pracowników, jest uwzględnienie budowania modelu i model.compile() wewnątrz strategy.scope() . Zakres strategii dystrybucji dyktuje sposób i miejsce tworzenia zmiennych, aw przypadku MultiWorkerMirroredStrategy tworzone zmienne są MirroredVariable i są replikowane na każdym z pracowników.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Aby faktycznie uruchomić z MultiWorkerMirroredStrategy , musisz uruchomić procesy TF_CONFIG i przekazać im TF_CONFIG .

Podobnie jak napisany wcześniej plik mnist.py , tutaj jest main.py który będzie uruchamiał każdy z pracowników:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

W powyższym fragmencie kodu należy zauważyć, że parametr global_batch_size , który jest przekazywany do Dataset.batch , jest ustawiony na per_worker_batch_size * num_workers . Dzięki temu każdy proces roboczy przetwarza partie przykładów per_worker_batch_size niezależnie od liczby procesów per_worker_batch_size .

Bieżący katalog zawiera teraz oba pliki Pythona:

ls *.py
main.py
mnist.py

Więc json-serializuj TF_CONFIG i dodaj go do zmiennych środowiskowych:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz możesz uruchomić proces roboczy, który uruchomi main.py i użyje TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

W powyższym poleceniu należy zwrócić uwagę na kilka rzeczy:

  1. Używa %%bash który jest "magią" notatnika do uruchamiania niektórych poleceń basha.
  2. Używa flagi --bg do uruchomienia procesu bash w tle, ponieważ ten proces roboczy nie zostanie zakończony. Czeka na wszystkich pracowników, zanim się zacznie.

Proces roboczy działający w tle nie wydrukuje danych wyjściowych do tego notatnika, więc &> przekierowuje dane wyjściowe do pliku, dzięki czemu można zobaczyć, co się stało.

Poczekaj kilka sekund na uruchomienie procesu:

import time
time.sleep(10)

Teraz spójrz, co do tej pory zostało wyprowadzone do pliku dziennika pracownika:

cat job_0.log
2021-06-16 18:40:46.618023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:47.717322: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:48.633623: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:48.633682: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633691: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633802: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:48.633834: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:48.633841: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:48.634554: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:48.639603: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:48.640062: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

Ostatnia linia pliku dziennika powinna Started server with target: grpc://localhost:12345 . Pierwszy pracownik jest teraz gotowy i czeka, aż pozostali pracownicy będą gotowi do pracy.

Zaktualizuj więc tf_config dla procesu drugiego pracownika, aby pobrać:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz wystrzel drugiego robotnika. Rozpocznie to szkolenie, ponieważ wszyscy pracownicy są aktywni (więc nie ma potrzeby wprowadzania tego procesu w tle):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 53ms/step - loss: 2.2947 - accuracy: 0.1365
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2710 - accuracy: 0.2564
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2412 - accuracy: 0.3920
2021-06-16 18:40:56.710304: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:57.818915: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:58.745385: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:58.745442: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:58.745451: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:58.745567: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:58.745603: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:58.745609: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:58.746272: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:58.751609: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:58.752063: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:40:59.797443: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-06-16 18:41:00.007608: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:41:00.007984: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz

Teraz, jeśli ponownie sprawdzisz dzienniki zapisane przez pierwszego pracownika, zobaczysz, że uczestniczył on w szkoleniu tego modelu:

cat job_0.log
2021-06-16 18:40:46.618023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:47.717322: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:48.633623: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:48.633682: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633691: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633802: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:48.633834: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:48.633841: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:48.634554: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:48.639603: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:48.640062: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:40:59.794960: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-06-16 18:41:00.007264: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:41:00.007667: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch 1/3
70/70 [==============================] - 6s 53ms/step - loss: 2.2947 - accuracy: 0.1365
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2710 - accuracy: 0.2564
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2412 - accuracy: 0.3920

Nic dziwnego, że przebiegało to wolniej niż test na początku tego samouczka. Uruchamianie wielu pracowników na jednej maszynie tylko zwiększa koszty. Celem tutaj nie było poprawienie czasu szkolenia, a jedynie podanie przykładu szkolenia wielopracowniczego.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Dogłębne szkolenie dla wielu pracowników

Jak dotąd ten samouczek zademonstrował podstawową konfigurację wielu pracowników. W pozostałej części tego dokumentu szczegółowo omówiono inne czynniki, które mogą być przydatne lub ważne w rzeczywistych przypadkach użycia.

Fragmentowanie zbioru danych

W przypadku szkolenia z wieloma pracownikami fragmentacja zestawu danych jest niezbędna do zapewnienia zbieżności i wydajności.

Przykład w poprzedniej sekcji opiera się na domyślnym autoshardingu dostarczonym przez interfejs API tf.distribute.Strategy . Można kontrolować fragmentowanie, ustawiając tf.data.experimental.AutoShardPolicy tf.data.experimental.DistributeOptions . Aby dowiedzieć się więcej o automatycznym dzieleniu na fragmenty, zobacz Przewodnik wprowadzania danych rozproszonych .

Oto szybki przykład wyłączania automatycznego fragmentowania, aby każda replika przetwarzała każdy przykład (niezalecane):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Ocena

Jeśli przekażesz validation_data do model.fit , będzie to naprzemiennie trenować i oceniać dla każdej epoki. Ocena pobierająca validation_data jest rozdzielana na ten sam zestaw pracowników, a wyniki oceny są agregowane i dostępne dla wszystkich pracowników. Podobnie jak w przypadku szkolenia, zestaw danych walidacji jest automatycznie dzielony na fragmenty na poziomie pliku. Musisz ustawić globalny rozmiar partii w zestawie danych walidacji i ustawić validation_steps . Do oceny zaleca się również powtórny zbiór danych.

Alternatywnie można również utworzyć inne zadanie, które okresowo odczytuje punkty kontrolne i uruchamia ocenę. To właśnie robi Estymator. Nie jest to jednak zalecany sposób wykonywania ewaluacji i dlatego pomija się jej szczegóły.

Występ

Masz teraz model Keras, który jest skonfigurowany do uruchamiania na wielu pracownikach za pomocą MultiWorkerMirroredStrategy . Możesz wypróbować następujące techniki, aby poprawić wydajność szkolenia z wieloma pracownikami za pomocą MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy zapewnia wiele implementacji komunikacji zbiorowej . RING implementuje kolektywy oparte na pierścieniu, wykorzystując gRPC jako warstwę komunikacji między hostami. NCCL używa NCCL Nvidii do wdrażania kolektywów. AUTO odracza wybór do czasu pracy. Najlepszy wybór implementacji zbiorczej zależy od liczby i rodzaju procesorów graficznych oraz połączenia sieciowego w klastrze. Aby zastąpić automatycznego wyboru określ communication_options parametr MultiWorkerMirroredStrategy konstruktora „s, np communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • Jeśli to możliwe, tf.float zmienne na tf.float . Oficjalny model ResNet zawiera przykład tego, jak można to zrobić.

Tolerancja błędów

W szkoleniu synchronicznym klaster ulegnie awarii, jeśli jeden z procesów roboczych ulegnie awarii i nie istnieje mechanizm odzyskiwania po awarii. Korzystanie z Keras z tf.distribute.Strategy ma tę zaletę, że jest tf.distribute.Strategy na błędy w przypadkach, gdy pracownicy umierają lub są niestabilni w inny sposób. W tym celu zachowuje się stan uczący w wybranym rozproszonym systemie plików, tak aby po ponownym uruchomieniu instancji, która wcześniej nie powiodła się lub została wywłaszczona, stan uczący zostanie odzyskany.

Gdy pracownik stanie się niedostępny, inni pracownicy przestaną działać (prawdopodobnie po przekroczeniu limitu czasu). W takich przypadkach niedostępny pracownik musi zostać ponownie uruchomiony, a także inni pracownicy, których nie powiodło się.

Odwołanie zwrotne do punktu kontrolnego modelu

ModelCheckpoint zwrotne ModelCheckpoint nie zapewnia już funkcji odporności na błędy, zamiast tego użyj wywołania zwrotnego BackupAndRestore .

ModelCheckpoint zwrotne ModelCheckpoint może nadal służyć do zapisywania punktów kontrolnych. Ale dzięki temu, jeśli szkolenie zostało przerwane lub pomyślnie zakończone, aby kontynuować szkolenie z punktu kontrolnego, użytkownik jest odpowiedzialny za ręczne załadowanie modelu.

Opcjonalnie użytkownik może wybrać zapisanie i przywrócenie modelu/wag poza wywołaniem zwrotnym ModelCheckpoint .

Zapisywanie i ładowanie modelu

Aby zapisać model za pomocą model.save lub tf.saved_model.save , miejsce docelowe zapisu musi być inne dla każdego pracownika. W przypadku pracowników niegłównych będziesz musiał zapisać model w katalogu tymczasowym, a w przypadku szefa będziesz musiał zapisać model w podanym katalogu modeli. Katalogi tymczasowe w procesie roboczym muszą być unikalne, aby zapobiec błędom wynikającym z wielu procesów roboczych próbujących pisać w tej samej lokalizacji. Model zapisany we wszystkich katalogach jest identyczny i zazwyczaj tylko model zapisany przez szefa powinien być odwoływany do przywracania lub udostępniania. Powinieneś mieć jakąś logikę czyszczenia, która usuwa katalogi tymczasowe utworzone przez pracowników po zakończeniu szkolenia.

Powodem, dla którego musisz jednocześnie oszczędzać na szefie i pracownikach, jest to, że możesz agregować zmienne podczas tworzenia punktów kontrolnych, co wymaga udziału zarówno szefa, jak i pracowników w protokole komunikacyjnym allreduce. Z drugiej strony zezwolenie szefowi i pracownikom na zapisywanie w tym samym katalogu modelu spowoduje błędy z powodu rywalizacji.

Dzięki MultiWorkerMirroredStrategy program jest uruchamiany na każdym pracowniku i aby wiedzieć, czy aktualny pracownik jest szefem, korzysta z obiektu przeliczania klastra, który ma atrybuty task_type i task_id . task_type mówi ci, jaka jest bieżąca praca (np. 'pracownik'), a task_id podaje identyfikator pracownika. Pracownik o identyfikatorze 0 jest wyznaczony jako główny pracownik.

W poniższym fragmencie kodu write_filepath udostępnia ścieżkę pliku do zapisania, która zależy od identyfikatora write_filepath . W przypadku szefa (pracownika o id 0) zapisuje do oryginalnej ścieżki pliku; dla innych tworzy katalog tymczasowy (o identyfikatorze w ścieżce katalogu), w którym można pisać:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Dzięki temu możesz teraz oszczędzać:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Jak opisano powyżej, później model powinien być ładowany tylko z zapisanej ścieżki głównej, więc usuńmy tymczasowe zapisane przez pracowników niegłównych:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Teraz, gdy tf.keras.models.load_model czas ładowania, skorzystajmy z wygodnego API tf.keras.models.load_model i kontynuujmy dalszą pracę. W tym przypadku załóżmy, że do ładowania i kontynuowania szkolenia używa się tylko jednego tf.keras.models.load_model , w którym to przypadku nie wywołujesz funkcji tf.keras.models.load_model w ramach innej strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.3081 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2914 - accuracy: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7fb08ad02dd0>

Zapisywanie i przywracanie punktu kontrolnego

Z drugiej strony, checkpointy pozwalają na zapisanie wag modelu i ich przywrócenie bez konieczności zapisywania całego modelu. Tutaj utworzysz jeden tf.train.Checkpoint który śledzi model, który jest zarządzany przez tf.train.CheckpointManager dzięki czemu zachowany zostanie tylko najnowszy punkt kontrolny.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Po skonfigurowaniu CheckpointManager możesz teraz zapisywać i usuwać zapisane punkty kontrolne, które nie są głównymi pracownikami.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Teraz, gdy musisz przywrócić, możesz znaleźć najnowszy punkt kontrolny zapisany za pomocą wygodnej funkcji tf.train.latest_checkpoint . Po przywróceniu punktu kontrolnego możesz kontynuować trening.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 12ms/step - loss: 2.3080 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 11ms/step - loss: 2.2896 - accuracy: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7fb08a2b6bd0>

Wywołanie zwrotne kopii zapasowej i przywracania

BackupAndRestore zwrotna zapewnia funkcjonalność odporność na uszkodzenia, poprzez tworzenie kopii zapasowych model i numer aktualnej epoki w pliku tymczasowego punktu kontrolnego pod backup_dir argument BackupAndRestore . Odbywa się to pod koniec każdej epoki.

Po przerwaniu i ponownym uruchomieniu zadań wywołanie zwrotne przywraca ostatni punkt kontrolny, a szkolenie jest kontynuowane od początku przerwanej epoki. Wszelkie częściowe szkolenia, które zostały już wykonane w niedokończonej epoce przed przerwaniem, zostaną odrzucone, aby nie wpłynęły na ostateczny stan modelu.

Aby z niego skorzystać, podaj instancję tf.keras.callbacks.experimental.BackupAndRestore w tf.keras.Model.fit() .

Dzięki MultiWorkerMirroredStrategy, jeśli pracownik zostanie przerwany, cały klaster zostanie wstrzymany do momentu ponownego uruchomienia przerwanego pracownika. Inne procesy robocze również zostaną ponownie uruchomione, a przerwany proces roboczy ponownie dołączy do klastra. Następnie każdy pracownik odczytuje wcześniej zapisany plik punktu kontrolnego i przywraca jego poprzedni stan, umożliwiając w ten sposób przywrócenie synchronizacji klastra. Następnie szkolenie trwa.

BackupAndRestore zwrotne BackupAndRestore używa CheckpointManager do zapisywania i przywracania stanu treningu, który generuje plik o nazwie punkt kontrolny, który śledzi istniejące punkty kontrolne wraz z najnowszym. Z tego powodu, backup_dir nie powinien być ponownie używany do przechowywania innych punktów kontrolnych, aby uniknąć kolizji nazw.

Obecnie BackupAndRestore wywołania zwrotnego BackupAndRestore obsługuje pojedynczego pracownika bez strategii, MirroredStrategy i wielu pracowników z MultiWorkerMirroredStrategy. Poniżej znajdują się dwa przykłady zarówno szkolenia dla wielu pracowników, jak i szkolenia dla jednego pracownika.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2774 - accuracy: 0.2183
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2264 - accuracy: 0.3663
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1693 - accuracy: 0.4643
<tensorflow.python.keras.callbacks.History at 0x7fb08a032790>

Jeśli skontrolować katalogu backup_dir określonym w BackupAndRestore , można zauważyć pewne tymczasowo generowane pliki kontrolnego. Te pliki są potrzebne do odzyskania wcześniej utraconych instancji i zostaną usunięte przez bibliotekę na końcu tf.keras.Model.fit() po pomyślnym zakończeniu treningu.

Zobacz też

  1. Poradnik dotyczący szkoleń rozproszonych w TensorFlow zawiera przegląd dostępnych strategii dystrybucji.
  2. Oficjalne modele , z których wiele można skonfigurować do obsługi wielu strategii dystrybucji.
  3. Sekcja Wydajność w przewodniku zawiera informacje o innych strategiach i narzędziach, których można użyć do optymalizacji wydajności modeli TensorFlow.