Pomoc chronić Wielkiej Rafy Koralowej z TensorFlow na Kaggle Dołącz Wyzwanie

Szkolenie dla wielu pracowników z Keras

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Ten poradnik pokazuje, jak wykonać wielu pracowników rozproszonych szkolenia z modelu Keras i Model.fit API używając tf.distribute.Strategy API konkretniej tf.distribute.MultiWorkerMirroredStrategy klasę. Dzięki tej strategii model Keras, który został zaprojektowany do pracy na jednym pracowniku, może bezproblemowo pracować na wielu pracownikach przy minimalnych zmianach kodu.

Dla zainteresowanych głębszym zrozumieniu tf.distribute.Strategy API The Ukazuje szkolenie w TensorFlow przewodnika jest dostępna za przegląd strategii dystrybucji TensorFlow podporach.

Aby dowiedzieć się, jak korzystać z MultiWorkerMirroredStrategy z Keras i pętli niestandardowe szkolenia, odnoszą się do niestandardowej pętli szkolenia z Keras i MultiWorkerMirroredStrategy .

Zauważ, że celem tego samouczka jest zademonstrowanie minimalnego przykładu wielu pracowników z dwoma pracownikami.

Ustawiać

Zacznij od kilku niezbędnych importów:

import json
import os
import sys

Przed zaimportowaniem TensorFlow wprowadź kilka zmian w środowisku:

  1. Wyłącz wszystkie procesory graficzne. Zapobiega to błędom powodowanym przez wszystkich pracowników próbujących korzystać z tego samego GPU. W rzeczywistej aplikacji każdy pracownik byłby na innej maszynie.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Zresetować TF_CONFIG zmienną środowiskową (dowiesz się więcej o tym później):
os.environ.pop('TF_CONFIG', None)
  1. Upewnij się, że aktualny katalog jest na ścieżce Pythona-pozwala to notebook importowania plików napisanych przez %%writefile później:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Teraz zaimportuj TensorFlow:

import tensorflow as tf

Zbiór danych i definicja modelu

Następnie utwórz mnist.py plik za pomocą prostego modelu i konfiguracji zestawu danych. Ten plik Pythona będzie używany przez procesy robocze w tym samouczku:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Szkolenie modelowe na jednym pracowniku

Spróbuj szkolenia model dla niewielkiej liczby epok i obserwować wyniki pojedynczego pracownika, aby upewnić się, że wszystko działa poprawnie. W miarę postępu treningu strata powinna spadać, a dokładność powinna wzrastać.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

Konfiguracja wielostanowiskowa

Teraz wejdźmy w świat szkoleń wieloosobowych.

Klaster z zadaniami i zadaniami

W TensorFlow, rozprowadzane szkolenie obejmuje: a 'cluster' z kilku zadań, a każde z zadań może mieć jeden lub więcej 'task' s.

Będziesz potrzebował TF_CONFIG zmienną środowiskową konfiguracji za szkolenie na wielu maszynach, z których każda może ma inną rolę. TF_CONFIG jest ciągiem JSON używany do określenia konfiguracji klastra dla każdego pracownika, który jest częścią klastra.

Istnieją dwa składniki TF_CONFIG zmiennej: 'cluster' i 'task' .

  • A 'cluster' jest taki sam dla wszystkich pracowników i dostarcza informacji o klastrze szkoleniowym, który jest DICT składające się z różnych typów zadań, takich jak 'worker' i 'chief' .

    • W szkoleniu multi-pracownik tf.distribute.MultiWorkerMirroredStrategy , jest zwykle jeden 'worker' , który odbywa się na obowiązki, takie jak oszczędność punkt kontrolny i zapisanie pliku podsumowania dla TensorBoard, oprócz tego, co regularne 'worker' robi. Takie 'worker' jest określany jako główny pracownika (z nazwą zadania 'chief' ).
    • Jest w zwyczaju, że 'chief' mieć 'index' 0 zostać wyznaczony do (w rzeczywistości jest to jak tf.distribute.Strategy jest realizowany).
  • A 'task' zawiera informacje o bieżącym zadaniu i jest różna dla każdego pracownika. Określa 'type' i 'index' tego pracownika.

Poniżej przykładowa konfiguracja:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Oto sam TF_CONFIG szeregowane jako ciąg JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Zauważ, że tf_config jest tylko zmienna lokalna w Pythonie. Aby móc wykorzystać go w układzie szkoleniowym, to DICT musi być szeregowane jako JSON i umieszczone w TF_CONFIG zmiennej środowiskowej.

W konfiguracji powyższym przykładzie, zadanie ustawić 'type' do 'worker' i zadanie 'index' do 0 . Dlatego też, ta maszyna jest pierwszym pracownikiem. Zostanie on powołany jako 'chief' pracownika i wykonać więcej pracy niż inni.

W celach ilustracyjnych, ten tutorial pokazuje, jak można założyć TF_CONFIG zmiennej z dwóch pracowników na localhost .

W praktyce, by utworzyć wiele robotników na zewnętrznych adresów IP / portów i ustawić TF_CONFIG zmienną na każdego pracownika odpowiednio.

W tym samouczku użyjesz dwóch robotników:

  • Pierwszy ( 'chief' ) pracownika TF_CONFIG przedstawiono powyżej.
  • Dla drugiego pracownika, zostanie ustawiony tf_config['task']['index']=1

Zmienne środowiskowe i podprocesy w notebookach

Podprocesy dziedziczą zmienne środowiskowe po rodzicach.

Na przykład możesz ustawić zmienną środowiskową w tym procesie Jupyter Notebook w następujący sposób:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Następnie możesz uzyskać dostęp do zmiennej środowiskowej z podprocesów:

echo ${GREETINGS}
Hello TensorFlow!

W następnej części, będziesz używać podobny sposób przekazać TF_CONFIG do podprocesów pracownika. W rzeczywistym scenariuszu nie uruchamiałbyś swoich zadań w ten sposób, ale w tym przykładzie jest to wystarczające.

Wybierz odpowiednią strategię

W TensorFlow istnieją dwie główne formy szkolenia rozproszonego:

  • Synchroniczny szkolenia, gdzie etapy szkolenia są synchronizowane w całej robotników i replik, a
  • Asynchroniczny szkolenie, jeżeli czynności szkoleniowe nie są ściśle zsynchronizowane (na przykład szkolenie serwer parametr ).

Ten poradnik pokazuje, jak wykonać synchroniczną szkolenia pracowników za pomocą multi-wystąpienie tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy tworzy kopie wszystkich zmiennych w warstwach tego modelu na każdym urządzeniu na wszystkich pracowników. Wykorzystuje CollectiveOps , op TensorFlow dla komunikacji zbiorowej, agregowanie gradientów i przechowywać zmienne w synchronizacji. tf.distribute.Strategy Podręcznik zawiera więcej szczegółów na temat tej strategii.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy dostarcza wielu implementacji pośrednictwem CommunicationOptions parametr: 1) RING narzędzi collectives pierścieniu za pomocą programu gRPC jako warstwę komunikacyjną przekroju gospodarza; 2) NCCL korzysta z komunikacji zbiorowej Biblioteka NVIDIA wdrożyć kolektywy; oraz 3) AUTO odracza wybór na starcie. Najlepszy wybór implementacji zbiorowej zależy od liczby i rodzaju procesorów graficznych oraz połączenia sieciowego w klastrze.

Trenuj modelkę

Dzięki integracji tf.distribute.Strategy API do tf.keras , jedyną zmianą będzie zrobić, aby rozpowszechniać szkolenie z wieloma pracownikami jest otaczający budynek modelu i model.compile() wywołanie wewnątrz strategy.scope() . Dyktuje zakres strategii dystrybucji w jaki sposób i gdzie zmienne są tworzone, aw przypadku MultiWorkerMirroredStrategy zmienne tworzone są MirroredVariable s, a oni są replikowane na każdym z pracowników.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Aby rzeczywiście uruchomić z MultiWorkerMirroredStrategy trzeba uruchomić procesy robocze i zdać TF_CONFIG do nich.

Podobnie jak mnist.py pliku pisemnej wcześniej, tutaj jest main.py , że każdy z pracowników będzie działać:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

W powyższym fragmencie kodu nocie że global_batch_size , który zostanie przekazany do Dataset.batch , jest ustawiony na per_worker_batch_size * num_workers . Gwarantuje to, że każdy pracownik przetwarza partie per_worker_batch_size przykładów niezależnie od liczby pracowników.

Bieżący katalog zawiera teraz oba pliki Pythona:

ls *.py
main.py
mnist.py

Więc JSON serializacji TF_CONFIG i dodać je do zmiennych środowiskowych:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz można uruchomić proces roboczy, który będzie działał na main.py i korzystania z TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

W powyższym poleceniu należy zwrócić uwagę na kilka rzeczy:

  1. Używa %%bash który jest notebook „magia” , aby uruchomić niektórych poleceń bash.
  2. Używa --bg flagę uruchomić bash proces w tle, ponieważ pracownik nie zakończy. Czeka na wszystkich pracowników, zanim się zacznie.

Będącego w tle proces roboczy nie będzie drukować dane wyjściowe do tego notebooka, a więc &> przekierowuje jego wyjście do pliku, dzięki czemu można sprawdzić, co się stało w pliku dziennika później.

Poczekaj kilka sekund na uruchomienie procesu:

import time
time.sleep(10)

Teraz sprawdź, co do tej pory zostało wyprowadzone do pliku dziennika pracownika:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

Ostatni wiersz pliku logu powinien powiedzieć: Started server with target: grpc://localhost:12345 . Pierwszy pracownik jest teraz gotowy i czeka, aż pozostali pracownicy będą gotowi do pracy.

Więc zaktualizować tf_config przez proces drugiego pracownika odebrać:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Uruchom drugiego pracownika. Rozpocznie to szkolenie, ponieważ wszyscy pracownicy są aktywni (więc nie ma potrzeby wprowadzania tego procesu w tle):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

Jeśli ponownie sprawdzisz dzienniki zapisane przez pierwszego pracownika, dowiesz się, że uczestniczył on w szkoleniu tego modelu:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

Nic dziwnego, że ten pobiegł wolniej niż próba uruchomienia na początku tego tutoriala.

Praca wielu pracowników na jednej maszynie tylko zwiększa koszty.

Celem tutaj nie było poprawienie czasu szkolenia, a jedynie podanie przykładu szkolenia wielopracowniczego.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Dogłębne szkolenie dla wielu pracowników

Do tej pory nauczyłeś się, jak wykonać podstawową konfigurację dla wielu pracowników.

W dalszej części samouczka szczegółowo poznasz inne czynniki, które mogą być przydatne lub ważne w rzeczywistych przypadkach użycia.

Fragmentowanie zbioru danych

W treningu wielu pracowników, potrzebny jest zestaw danych sharding w celu zapewnienia spójności i wydajności.

Przykład w poprzedniej części opiera się na domyślnym autosharding dostarczonych przez tf.distribute.Strategy API. Można kontrolować sharding ustawiając tf.data.experimental.AutoShardPolicy z tf.data.experimental.DistributeOptions .

Aby dowiedzieć się więcej o auto-sharding, patrz Ukazuje przewodnika wejściowego .

Oto krótki przykład jak włączyć auto sharding wyłączony, tak że każda replika przetwarza każdy przykład (nie zalecane):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Ocena

Jeśli zdasz validation_data do Model.fit będzie naprzemiennie szkolenia i oceny dla każdej epoki. Ocenę biorąc validation_data jest rozłożone na tym samym zestawie pracowników, a wyniki oceny są sumowane i dostępny dla wszystkich pracowników.

Podobnie jak w przypadku szkolenia, zestaw danych walidacji jest automatycznie dzielony na fragmenty na poziomie pliku. Trzeba ustawić globalną wielkość wsadu w zbiorze walidacji i ustawić validation_steps .

Do oceny zaleca się również powtórny zbiór danych.

Alternatywnie można również utworzyć inne zadanie, które okresowo odczytuje punkty kontrolne i uruchamia ocenę. To właśnie robi Estymator. Nie jest to jednak zalecany sposób wykonywania ewaluacji, przez co pomijane są jego szczegóły.

Wydajność

Teraz masz model Keras że wszystko jest uruchamiane w wielu robotników z MultiWorkerMirroredStrategy .

Aby poprawić wydajność szkolenia wieloosobowego, możesz wypróbować następujące rozwiązania:

  • tf.distribute.MultiWorkerMirroredStrategy zapewnia wiele implementacji komunikacji zbiorowej :

    • RING Kolektywy narzędzia pierścienia za pomocą programu gRPC jako warstwę komunikacyjną przekroju gospodarza.
    • NCCL używa Collective Biblioteka komunikacji NVIDIA wdrożyć kolektywów.
    • AUTO odracza wybór na starcie.

    Najlepszy wybór implementacji zbiorowej zależy od liczby procesorów GPU, typu procesorów GPU i połączenia sieciowego w klastrze. Aby zastąpić automatycznego wyboru określ communication_options parametr MultiWorkerMirroredStrategy konstruktora „s. Na przykład:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Obsada zmienne do tf.float jeśli to możliwe:

    • Oficjalna modelu ResNet zawiera przykład tego, jak można to zrobić.

Tolerancja błędów

W szkoleniu synchronicznym klaster ulegnie awarii, jeśli jeden z procesów roboczych ulegnie awarii i nie istnieje mechanizm odzyskiwania po awarii.

Korzystanie Keras z tf.distribute.Strategy pochodzi z przewagą odporność na uszkodzenia w przypadku, gdy pracownicy umierają lub są w inny sposób niestabilny. Można to zrobić, zachowując stan uczący w wybranym rozproszonym systemie plików, tak aby po ponownym uruchomieniu instancji, która wcześniej nie powiodła się lub została wywłaszczona, stan uczący zostanie odzyskany.

Gdy pracownik stanie się niedostępny, inni pracownicy przestaną działać (prawdopodobnie po przekroczeniu limitu czasu). W takich przypadkach niedostępny pracownik musi zostać ponownie uruchomiony, a także inni pracownicy, których nie powiodło się.

Odwołanie zwrotne do punktu kontrolnego modelu

ModelCheckpoint zwrotna nie zapewnia funkcjonalność tolerancji błędu, skorzystaj BackupAndRestore zwrotnego zamiast.

ModelCheckpoint zwrotna może być nadal używany do zapisywania punktów kontrolnych. Ale dzięki temu, jeśli szkolenie zostało przerwane lub pomyślnie zakończone, aby kontynuować szkolenie z punktu kontrolnego, użytkownik jest odpowiedzialny za ręczne załadowanie modelu.

Opcjonalnie użytkownik może wybrać, aby zapisać i przywrócić modelu / ciężary poza ModelCheckpoint zwrotnego.

Zapisywanie i ładowanie modelu

Aby zapisać swój model używając model.save lub tf.saved_model.save , potrzeby oszczędzania docelowe być różne dla każdego pracownika.

  • W przypadku pracowników niegłównych konieczne będzie zapisanie modelu w katalogu tymczasowym.
  • Dla szefa będziesz musiał zapisać w podanym katalogu modeli.

Katalogi tymczasowe w procesie roboczym muszą być unikatowe, aby zapobiec błędom wynikającym z wielu procesów roboczych próbujących pisać w tej samej lokalizacji.

Model zapisany we wszystkich katalogach jest identyczny i zazwyczaj tylko model zapisany przez szefa powinien być przywoływany w celu przywrócenia lub udostępnienia.

Powinieneś mieć pewną logikę czyszczenia, która usuwa katalogi tymczasowe utworzone przez pracowników po zakończeniu szkolenia.

Powodem jednoczesnego oszczędzania na szefie i pracownikach jest możliwość agregowania zmiennych podczas tworzenia punktów kontrolnych, co wymaga udziału zarówno szefa, jak i pracowników w protokole komunikacyjnym allreduce. Z drugiej strony zezwolenie szefowi i pracownikom na zapisywanie w tym samym katalogu modelu spowoduje błędy z powodu rywalizacji.

Korzystanie z MultiWorkerMirroredStrategy , program jest uruchamiany na każdego pracownika, a także w celu ustalenia, czy pracownik jest obecny szef, to korzysta z obiektu przelicznika klastra, który ma atrybuty task_type i task_id :

  • task_type informuje, jaka jest obecna praca jest (na przykład 'worker' ).
  • task_id informuje identyfikator pracownika.
  • Pracownik z task_id == 0 jest oznaczony jako główny pracownika.

W Fragment kodu poniżej, write_filepath funkcja zapewnia ścieżkę pliku do pisania, która zależy od pracownika task_id :

  • Dla głównego pracownika (z task_id == 0 ), to pisze do oryginalnej ścieżki pliku.
  • W przypadku innych pracowników, tworzy tymczasowy directory- temp_dir -Z tym task_id w ścieżce katalogu pisać:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Dzięki temu możesz teraz oszczędzać:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Jak opisano powyżej, później model powinien być ładowany tylko z zapisanej ścieżki głównej, więc usuńmy tymczasowe zapisane przez pracowników niegłównych:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Teraz, kiedy nadszedł czas, aby obciążenia, użyjmy dogodnym tf.keras.models.load_model API i kontynuować dalszej pracy.

Tutaj tylko przypuszczać, używając jednego pracownika do obciążenia i kontynuować szkolenie, w którym to przypadku nie nazywaj tf.keras.models.load_model w innym strategy.scope() (zauważmy, że strategy = tf.distribute.MultiWorkerMirroredStrategy() , jak zdefiniowano wcześniej ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

Zapisywanie i przywracanie punktu kontrolnego

Z drugiej strony, checkpointy pozwalają na zapisywanie wag modelu i przywracanie ich bez konieczności zapisywania całego modelu.

Tutaj będziesz tworzyć jedną tf.train.Checkpoint który śledzi model, który jest zarządzany przez tf.train.CheckpointManager , tak, że tylko ostatni checkpoint jest zachowana:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Po CheckpointManager jest ustawione, jesteś gotowy, aby zapisać i usunąć punkty kontrolne uratował niż główni pracownicy:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Teraz, gdy trzeba przywrócić modelu można znaleźć najnowsze checkpoint zapisaną za pomocą wygodnego tf.train.latest_checkpoint funkcję. Po przywróceniu punktu kontrolnego możesz kontynuować trening.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

Wywołanie zwrotne kopii zapasowej i przywracania

tf.keras.callbacks.experimental.BackupAndRestore zwrotna zapewnia funkcjonalność odporność na uszkodzenia przez wykonanie kopii zapasowej model i numer aktualnej epoki w pliku tymczasowego punktu kontrolnego pod backup_dir argument BackupAndRestore . Odbywa się to na końcu każdej epoki.

Po przerwaniu i ponownym uruchomieniu zadań wywołanie zwrotne przywraca ostatni punkt kontrolny, a szkolenie jest kontynuowane od początku przerwanej epoki. Wszelkie częściowe szkolenia, które zostały już wykonane w niedokończonej epoce przed przerwaniem, zostaną odrzucone, aby nie wpłynęły na ostateczny stan modelu.

Aby go użyć, zapewniają wystąpienie tf.keras.callbacks.experimental.BackupAndRestore na Model.fit rozmowy.

Z MultiWorkerMirroredStrategy , jeżeli pracownik zostaje przerwany, cała gromada wstrzymuje aż przerwany pracownik zostaje wznowiona. Inne procesy robocze również zostaną ponownie uruchomione, a przerwany proces roboczy ponownie dołączy do klastra. Następnie każdy pracownik odczytuje wcześniej zapisany plik punktu kontrolnego i przywraca jego poprzedni stan, umożliwiając w ten sposób przywrócenie synchronizacji klastra. Następnie szkolenie trwa.

BackupAndRestore zwrotna wykorzystuje CheckpointManager aby zapisać i przywrócić stan treningowy, który generuje plik o nazwie checkpoint że utwory istniejące punkty kontrolne wraz z najnowszym jeden. Z tego powodu, backup_dir nie powinny być ponownie wykorzystane do przechowywania innych punktów kontrolnych w celu uniknięcia zderzenia nazwa.

Obecnie BackupAndRestore zwrotna obsługuje jeden pracownik bez strategii, MirroredStrategy i multi-pracownika z MultiWorkerMirroredStrategy. Poniżej znajdują się dwa przykłady zarówno szkolenia dla wielu pracowników, jak i szkolenia dla jednego pracownika.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

Jeśli skontrolować katalogu backup_dir określonym w BackupAndRestore , można zauważyć pewne tymczasowo generowane pliki kontrolnego. Pliki te są niezbędne dla odzyskania utraconych wcześniej instancji, a zostaną one usunięte przez bibliotekę na koniec Model.fit po udanej wydostawaniu treningu.

Dodatkowe zasoby

  1. Ukazuje szkolenie w TensorFlow przewodnika zawiera przegląd dostępnych strategii dystrybucji.
  2. Pętla szkolenie klienta z Keras i MultiWorkerMirroredStrategy poradnik pokazuje, jak korzystać z MultiWorkerMirroredStrategy z Keras i pętlę zwyczaj treningowy.
  3. Sprawdź oficjalnych modeli , z których wiele może być skonfigurowany do uruchamiania wielu strategii dystrybucji.
  4. Lepsza wydajność z tf.function przewodnik zawiera informacje na temat innych strategii i narzędzi, takich jak TensorFlow Profiler można użyć, aby zoptymalizować wydajność swoich modeli TensorFlow.