Pomoc chronić Wielkiej Rafy Koralowej z TensorFlow na Kaggle Dołącz Wyzwanie

Niestandardowa pętla treningowa z Keras i MultiWorkerMirroredStrategy

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Ten poradnik pokazuje, szkolenie pracowników z wielu pętli zwyczaj szkolenie API, rozproszonych poprzez MultiWorkerMirroredStrategy, więc model Keras zaprojektowany do pracy na pojedynczym pracownikiem mogą bezproblemowo pracę na wielu pracowników z minimalnym zmiany kodu.

Używamy niestandardowych pętli treningowych do trenowania naszego modelu, ponieważ zapewniają nam elastyczność i większą kontrolę nad treningiem. Co więcej, łatwiej jest debugować model i pętlę treningową. Bardziej szczegółowe informacje dostępne są w Pisanie pętlę szkolenia od podstaw .

Jeśli szukasz sposobu korzystania MultiWorkerMirroredStrategy z Keras model.fit , odnoszą się do tego poradnika zamiast.

Ukazuje Szkolenie w TensorFlow przewodnika jest dostępna za przegląd strategii dystrybucji TensorFlow podpory dla zainteresowanych głębszym zrozumieniu tf.distribute.Strategy API.

Ustawiać

Po pierwsze, niektóre niezbędne importy.

import json
import os
import sys

Przed zaimportowaniem TensorFlow wprowadź kilka zmian w środowisku.

Wyłącz wszystkie procesory graficzne. Zapobiega to błędom powodowanym przez wszystkich pracowników próbujących korzystać z tego samego GPU. W przypadku rzeczywistej aplikacji każdy pracownik byłby na innej maszynie.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Zresetować TF_CONFIG zmienną środowiskową, zobaczysz więcej o tym później.

os.environ.pop('TF_CONFIG', None)

Upewnij się, że bieżący katalog znajduje się na ścieżce Pythona. Pozwala to na notebooka do importowania plików napisanych przez %%writefile później.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Teraz zaimportuj TensorFlow.

import tensorflow as tf

Zbiór danych i definicja modelu

Następnie tworzymy mnist.py plik za pomocą prostego modelu i konfiguracji zestawu danych. Ten plik Pythona będzie używany przez procesy robocze w tym samouczku:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Konfiguracja dla wielu pracowników

Teraz wejdźmy w świat szkoleń wieloosobowych. W TensorFlow The TF_CONFIG zmienna jest wymagana do szkolenia na wielu maszynach, z których każdy może ma inną rolę. TF_CONFIG stosowane poniżej, jest ciągiem JSON używany do określenia konfiguracji klastra na każdego pracownika, który jest częścią klastra. Jest to domyślna metoda określania klaster, używając cluster_resolver.TFConfigClusterResolver , ale istnieją inne opcje dostępne w distribute.cluster_resolver modułu.

Opisz swój klaster

Oto przykładowa konfiguracja:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Oto sam TF_CONFIG szeregowane jako ciąg JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Istnieją dwa składniki TF_CONFIG : cluster i task .

  • cluster jest taki sam dla wszystkich pracowników i dostarcza informacji o klastrze szkoleniowym, który jest DICT składające się z różnych typów zadań, takich jak worker . W szkoleniu multi-pracownik MultiWorkerMirroredStrategy , jest zazwyczaj jeden worker , który bierze na siebie większą odpowiedzialność trochę jak oszczędność kontrolny i pisanie podsumowania pliku dla TensorBoard oprócz co regularny worker robi. Taki pracownik jest określany jako chief pracownika, i to jest w zwyczaju, że worker z index 0 został wyznaczony jako główny worker (w istocie jest to jak tf.distribute.Strategy jest realizowany).

  • task zawiera informacje o bieżącym zadaniu i różni się na każdego pracownika. To określa type i index tego pracownika.

W tym przykładzie użytkownik ustawia zadanie type dla "worker" i zadanie index do 0 . Ta maszyna jest pierwszym pracownikiem i zostanie wyznaczona jako główny pracownik i wykona więcej pracy niż inni. Należy pamiętać, że inne maszyny będą musiały mieć TF_CONFIG zestaw zmiennej środowiskowej, jak również, i powinien mieć taką samą cluster dict, ale różne zadania type lub zadanie index zależności od role tych maszyn są.

W celach ilustracyjnych, ten tutorial pokazuje jak można ustawić TF_CONFIG z 2 pracowników na localhost . W praktyce, użytkownik może stworzyć wiele robotników na zewnętrznych adresów IP / portów i ustaw TF_CONFIG na każdego pracownika w odpowiedni sposób.

W tym przykładzie można użyć 2 pracowników, pierwszego pracownika TF_CONFIG przedstawiono powyżej. Dla drugiego pracownika należy ustawić tf_config['task']['index']=1

Powyżej, tf_config jest tylko zmienna lokalna w Pythonie. Aby właściwie go używać do treningu configure, Słownik ten musi być szeregowane jako JSON, i umieszczone w TF_CONFIG zmiennej środowiskowej.

Zmienne środowiskowe i podprocesy w notebookach

Podprocesy dziedziczą zmienne środowiskowe po swoich rodzicach. Więc jeśli ustawić zmienną środowiskową w tej jupyter notebook procesu:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Dostęp do zmiennej środowiskowej można uzyskać z podprocesów:

echo ${GREETINGS}
Hello TensorFlow!

W następnej części, będziesz korzystać z tego zdać TF_CONFIG do podprocesów pracownika. Nigdy nie uruchamiałbyś swoich zadań w ten sposób, ale to wystarczy do celów tego samouczka: aby zademonstrować minimalny przykład wielu pracowników.

MultiWorkerMirroredStrategy

Trenować model, należy wystąpienie tf.distribute.MultiWorkerMirroredStrategy , który tworzy kopie wszystkich zmiennych w warstwach tego modelu na każdym urządzeniu na wszystkich pracowników. tf.distribute.Strategy Podręcznik zawiera więcej szczegółów na temat tej strategii.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2021-11-23 02:29:16.957442: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:16.957748: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

Zastosowanie tf.distribute.Strategy.scope aby określić, że strategia powinna być stosowana przy budowie modelu. To stawia Cię w „ kontekście cross-replika ” dla tej strategii, co oznacza, że strategia jest umieścić w kontroli rzeczy jak zmiennego rozmieszczenia.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Automatycznie dziel dane między pracownikami

W przypadku szkolenia z udziałem wielu pracowników dzielenie zestawu danych na fragmenty niekoniecznie jest konieczne, jednak zapewnia jednorazową semantykę, dzięki czemu szkolenie jest bardziej powtarzalne, tj. szkolenie na wielu pracownikach powinno być takie samo, jak szkolenie na jednym pracowniku. Uwaga: w niektórych przypadkach może to mieć wpływ na wydajność.

Zobacz: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step

Zdefiniuj niestandardową pętlę treningową i trenuj model

Określ optymalizator

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Zdefiniować etap treningowy z tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Zapisywanie i przywracanie punktu kontrolnego

Implementacja punktów kontrolnych w niestandardowej pętli szkoleniowej wymaga od użytkownika obsługi tego zamiast używania wywołania zwrotnego Keras. Pozwala na zapisywanie wag modelu i przywracanie ich bez konieczności zapisywania całego modelu.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Tutaj będziesz tworzyć jedną tf.train.Checkpoint że torach model, który jest zarządzany przez tf.train.CheckpointManager tak, że tylko ostatni checkpoint jest zachowana.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Teraz, gdy trzeba przywrócić, można znaleźć najnowsze checkpoint zapisaną za pomocą wygodnego tf.train.latest_checkpoint funkcję.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Po przywróceniu punktu kontrolnego możesz kontynuować trening własnej pętli treningowej.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2021-11-23 02:29:18.214294: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.826228, train_loss: 0.540131.
Epoch: 1, accuracy: 0.937946, train_loss: 0.207413.
Epoch: 2, accuracy: 0.960603, train_loss: 0.137420.

Pełna konfiguracja kodu dla pracowników

Aby rzeczywiście uruchomić z MultiWorkerMirroredStrategy trzeba uruchomić procesy robocze i zdać TF_CONFIG do nich.

Podobnie jak mnist.py pliku pisemnej wcześniej, tutaj jest main.py które zawierają ten sam kod szliśmy krok po kroku opisane w tym colab, jesteśmy tylko pisanie do pliku tak każdy z pracowników będzie go uruchomić:

Plik: main.py

Writing main.py

Trenuj i oceniaj

Bieżący katalog zawiera teraz oba pliki Pythona:

ls *.py
main.py
mnist.py

Więc JSON serializacji TF_CONFIG i dodać je do zmiennych środowiskowych:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz można uruchomić proces roboczy, który będzie działał na main.py i korzystania z TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

W powyższym poleceniu należy zwrócić uwagę na kilka rzeczy:

  1. Używa %%bash który jest notebook „magia” , aby uruchomić niektórych poleceń bash.
  2. Używa --bg flagę uruchomić bash proces w tle, ponieważ pracownik nie zakończy. Czeka na wszystkich pracowników, zanim się zacznie.

Będącego w tle proces roboczy nie będzie drukować dane wyjściowe do tego notebooka, a więc &> przekierowuje jego wyjście do pliku, dzięki czemu można zobaczyć, co się stało.

Poczekaj więc kilka sekund, aż proces się uruchomi:

import time
time.sleep(20)

Teraz spójrz, co do tej pory zostało wyprowadzone do pliku dziennika pracownika:

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

Ostatni wiersz pliku logu powinien powiedzieć: Started server with target: grpc://localhost:12345 . Pierwszy pracownik jest teraz gotowy i czeka, aż pozostali pracownicy będą gotowi do pracy.

Więc zaktualizować tf_config przez proces drugiego pracownika odebrać:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz uruchom drugiego robotnika. Rozpocznie to szkolenie, ponieważ wszyscy pracownicy są aktywni (więc nie ma potrzeby wprowadzania tego procesu w tle):

python main.py > /dev/null 2>&1

Teraz, jeśli ponownie sprawdzisz dzienniki zapisane przez pierwszego pracownika, zobaczysz, że uczestniczył on w szkoleniu tego modelu:

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration
2021-11-23 02:29:50.709898: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.820424, train_loss: 0.575663.
Epoch: 1, accuracy: 0.927344, train_loss: 0.241324.
Epoch: 2, accuracy: 0.953237, train_loss: 0.154762.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Dogłębne szkolenie dla wielu pracowników

Ten poradnik wykazał Custom Training Loop workflow w konfiguracji multi-pracownik. Szczegółowy opis innych tematów jest dostępna w tego model.fit's guide w konfiguracji multi-pracowników i zastosowanie CTL.

Zobacz też

  1. Ukazuje Szkolenie w TensorFlow przewodnika zawiera przegląd dostępnych strategii dystrybucji.
  2. Oficjalne modele , z których wiele może być skonfigurowane do uruchamiania wielu strategii dystrybucji.
  3. Sekcja Wydajność w przewodniku dostarcza informacji na temat innych strategii i narzędzi można użyć, aby zoptymalizować wydajność swoich modeli TensorFlow.