Многопрофильное обучение с Керасом

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

В этом учебнике показано , как выполнять мульти-мастер , распределенное обучение с моделью Keras и Model.fit API с использованием tf.distribute.Strategy API-специфически tf.distribute.MultiWorkerMirroredStrategy класс. С помощью этой стратегии модель Keras, которая была разработана для работы с одним воркером, может беспрепятственно работать с несколькими воркерами с минимальными изменениями кода.

Для тех , кто заинтересован в более глубоком понимании tf.distribute.Strategy API - интерфейсах, то Распределенное обучение в TensorFlow руководстве доступно для обзора стратегий распределения TensorFlow опор.

Чтобы узнать , как использовать MultiWorkerMirroredStrategy с Keras и пользовательские учебным циклом, обратитесь к изготовленному на заказ цикла обучения с Keras и MultiWorkerMirroredStrategy .

Обратите внимание, что цель этого руководства - продемонстрировать минимальный пример с несколькими рабочими с двумя рабочими.

Настраивать

Начнем с необходимого импорта:

import json
import os
import sys

Перед импортом TensorFlow внесите несколько изменений в среду:

  1. Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие пытаются использовать один и тот же графический процессор. В реальном приложении каждый рабочий будет на разных машинах.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Сбросить TF_CONFIG переменного окружения (вы узнаете об этом позже):
os.environ.pop('TF_CONFIG', None)
  1. Убедитесь , что текущий каталог на Пайтон пути, это позволяет ноутбуку импортировать файлы , написанные %%writefile позже:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Теперь импортируйте TensorFlow:

import tensorflow as tf

Набор данных и определение модели

Затем создайте mnist.py файл с моделью простой и набором данных настройки. Этот файл Python будет использоваться рабочими процессами в этом руководстве:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Модельное обучение на одного работника

Попробуйте обучение модели для небольшого числа эпох и наблюдать результаты одного работника , чтобы правильно убедиться , что все работает. По мере развития тренировки потери должны уменьшаться, а точность - увеличиваться.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

Многопользовательская конфигурация

Теперь давайте войдем в мир обучения нескольких сотрудников.

Кластер с заданиями и задачами

В TensorFlow, распределенное обучение включает в себя: а 'cluster' с нескольких рабочих мест, и каждый из рабочих мест может иметь один или несколько 'task' с.

Вам потребуются TF_CONFIG переменного окружения конфигурации для обучения на нескольких машинах, каждый из которых , возможно , имеет другую роль. TF_CONFIG является JSON строка используется для задания конфигурации кластера для каждого работника , который является частью кластера.

Есть два компонента TF_CONFIG переменный: 'cluster' и 'task' .

  • A 'cluster' является одинаковым для всех работников и предоставляет информацию о тренировочной группе, которая представляет собой ДИКТ , состоящий из различных видов работ, таких как 'worker' или 'chief' .

    • В нескольких рабочих тренировках с tf.distribute.MultiWorkerMirroredStrategy , есть обычно один 'worker' , который берет на себя ответственность, такие как сохранение контрольных точек и писать итоговый файл для TensorBoard, в дополнении к тому , что обычный 'worker' делает. Такие 'worker' упоминаются в качестве главного работника (с именем задания 'chief' ).
    • Это обычное для 'chief' иметь 'index' 0 назначается на (на самом деле, это как tf.distribute.Strategy реализуется).
  • A 'task' содержит информацию о текущей задаче и отличаются для каждого работника. Он определяет 'type' и 'index' этого работника.

Ниже приведен пример конфигурации:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Здесь же TF_CONFIG сериализовать в виде строки JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Обратите внимание , что tf_config только локальная переменная в Python. Для того, чтобы иметь возможность использовать его для конфигурации обучения, это ДИКТ необходимо сериализовать как JSON и помещен в TF_CONFIG переменной окружения.

В приведенном примере выше, вы поставили задачу 'type' к 'worker' и задачам 'index' на 0 . Таким образом, эта машина является первым рабочим. Он будет назначен 'chief' рабочим и делать больше работы , чем другие.

Для иллюстрации, это руководство показывает , как вы можете создать TF_CONFIG переменного с двумя рабочими на localhost .

На практике, вы можете создать несколько рабочих на внешних IP - адресов / портов и установить TF_CONFIG переменную на каждого работника соответственно.

В этом уроке вы будете использовать двух воркеров:

  • В первом ( 'chief' ) работника TF_CONFIG показан выше.
  • Для второго работника, вы будете устанавливать tf_config['task']['index']=1

Переменные среды и подпроцессы в записных книжках

Подпроцессы наследуют переменные среды от своего родителя.

Например, вы можете установить переменную среды в этом процессе Jupyter Notebook следующим образом:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Затем вы можете получить доступ к переменной среды из подпроцессов:

echo ${GREETINGS}
Hello TensorFlow!

В следующем разделе вы будете использовать подобный метод для передачи TF_CONFIG на рабочих подпроцессы. В реальном сценарии вы бы не запускали свои задания таким образом, но в этом примере этого достаточно.

Выбери правильную стратегию

В TensorFlow есть две основные формы распределенного обучения:

  • Синхронное обучение, где этапы подготовки синхронизируются по рабочим и репликами, и
  • Асинхронное обучение, где учебные шаги не строго синхронизированы (например, обучение сервера параметров ).

В этом учебнике показано , как выполнить синхронное обучение мульти-мастер , используя экземпляр tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy создает копии всех переменных в слоях модели на каждом устройстве во всех рабочих. Он использует CollectiveOps , а TensorFlow оп для коллективного общения, агрегатных градиентов и сохранить переменные в синхронизации. tf.distribute.Strategy руководство имеет более подробную информацию об этой стратегии.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy обеспечивает несколько реализаций через CommunicationOptions параметр: 1) RING орудия кольца на основе коллективов , использующих КПГРЫ в качестве слоя связи кросса-хозяине; 2) NCCL использует NVIDIA коллективной коммуникации библиотека для реализации коллективов; и 3) AUTO отсрочивает выбор для выполнения. Наилучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также от сетевого соединения в кластере.

Обучите модель

С интеграцией tf.distribute.Strategy API в tf.keras , единственное изменение , которое вы будете делать , чтобы распространять обучение множественных рабочих ограждающих модель здания и model.compile() вызов внутри strategy.scope() . Область применения диктует стратегию распространения в том , как и где создаются переменные, и в случае MultiWorkerMirroredStrategy , переменные создаваемые MirroredVariable s, и они реплицируются на каждом из рабочих.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Для того, чтобы реально работать с MultiWorkerMirroredStrategy вам необходимо запускать рабочие процессы и передать TF_CONFIG им.

Как mnist.py файла , написанного ранее, здесь main.py , что каждый из рабочих будет работать:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

В фрагменте кода выше , отмечают , что global_batch_size , который переходит в руки Dataset.batch , устанавливается в per_worker_batch_size * num_workers . Это гарантирует , что каждый работник обрабатывает партию per_worker_batch_size примеров , независимо от количества работников.

Текущий каталог теперь содержит оба файла Python:

ls *.py
main.py
mnist.py

Так JSON-сериализации TF_CONFIG и добавить его в переменных окружения:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь вы можете запустить рабочий процесс , который будет запускать main.py и использовать TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

По поводу приведенной выше команды следует отметить несколько моментов:

  1. Он использует %%bash , который является ноутбук «магией» для запуска некоторых команд Баша.
  2. Он использует --bg флаг для запуска bash процесса в фоновом режиме, поскольку этот работник не будет прекращен. Перед запуском он ждет всех рабочих.

Фоновый рабочий процесс не будет печатать вывод в этом ноутбуке, так &> перенаправляет его вывод в файл , так что вы можете проверить , что произошло в лог - файл позже.

Итак, подождите несколько секунд, пока процесс запустится:

import time
time.sleep(10)

Теперь проверьте, что уже было выведено в файл журнала рабочего:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

В последней строке файла журнала должен сказать: Started server with target: grpc://localhost:12345 . Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить.

Таким образом , обновление tf_config для процесса второго работника , чтобы забрать:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Запускаем второго рабочего. Это запустит обучение, поскольку все рабочие активны (поэтому нет необходимости в фоновом режиме этого процесса):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

Если вы еще раз проверите журналы, написанные первым воркером, вы узнаете, что он участвовал в обучении этой модели:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

Неудивительно, что это побежал медленнее , чем тест запуска в начале этого урока.

Запуск нескольких рабочих процессов на одной машине только увеличивает накладные расходы.

Цель здесь заключалась не в сокращении времени обучения, а только в том, чтобы показать пример обучения нескольких рабочих.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Углубленное обучение нескольких сотрудников

Итак, вы узнали, как выполнить базовую настройку с несколькими рабочими.

В оставшейся части руководства вы подробно узнаете о других факторах, которые могут быть полезны или важны для реальных случаев использования.

Шардинг набора данных

В обучении нескольких работников, набор данных Sharding необходим для обеспечения сходимости и производительности.

Пример , приведенный в предыдущем разделе , опирается на autosharding по умолчанию , предоставленного tf.distribute.Strategy API. Вы можете управлять шардингом, установив tf.data.experimental.AutoShardPolicy из tf.data.experimental.DistributeOptions .

Чтобы узнать больше о авто-сегментировании, обратитесь к распределенному вводу руководству .

Вот краткий пример того , как включить автоматическое сегментирование выключен, так что каждая реплика обрабатывает каждый пример (не рекомендуется):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Оценка

Если вы передаете validation_data в Model.fit , она будет чередоваться между обучением и оценкой для каждой эпохи. Оценка принимая validation_data распределяются по тому же набору рабочих и результаты оценки объединяется и становится доступным для всех работников.

Подобно обучению, набор данных проверки автоматически сегментируется на уровне файла. Вы должны установить глобальный размер пакета в наборе данных проверки и установить validation_steps .

Для оценки также рекомендуется повторный набор данных.

В качестве альтернативы вы также можете создать другую задачу, которая периодически считывает контрольные точки и запускает оценку. Это то, что делает Оценщик. Но это не рекомендуемый способ выполнения оценки, поэтому его подробности опускаются.

Представление

Теперь у вас есть модель Keras , что все готово для запуска на нескольких рабочих с MultiWorkerMirroredStrategy .

Чтобы настроить производительность обучения с несколькими сотрудниками, вы можете попробовать следующее:

  • tf.distribute.MultiWorkerMirroredStrategy предоставляет множество реализаций коллективного общения :

    • RING реализует кольца на основе коллектива с использованием КПГР в качестве слоя связи кросс-хост.
    • NCCL использует NVIDIA коллективной библиотеку связи для реализации коллективов.
    • AUTO отсрочивает выбор для выполнения.

    Наилучший выбор коллективной реализации зависит от количества графических процессоров, типа графических процессоров и сетевого соединения в кластере. Для отмены автоматического выбора, указать communication_options параметр MultiWorkerMirroredStrategy конструктора «s. Например:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Отливается переменные tf.float , если это возможно:

    • Официальная модель RESNET включает пример того , как это может быть сделано.

Отказоустойчивость

При синхронном обучении кластер выйдет из строя, если один из рабочих выйдет из строя и не существует механизма восстановления после сбоя.

Использование Keras с tf.distribute.Strategy приходит с преимуществом отказоустойчивости в тех случаях , когда рабочие умирают или иначе нестабильные. Вы можете сделать это, сохранив состояние обучения в распределенной файловой системе по вашему выбору, так что при перезапуске экземпляра, который ранее не удался или был прерван, состояние обучения восстанавливается.

Когда рабочий становится недоступным, другие рабочие терпят неудачу (возможно, после тайм-аута). В таких случаях необходимо перезапустить недоступный работник, а также другие вышедшие из строя рабочие.

Обратный вызов ModelCheckpoint

ModelCheckpoint обратного вызова больше не предоставляет функциональные возможности отказоустойчивости, пожалуйста , не используйте BackupAndRestore вместо обратного вызова.

ModelCheckpoint обратный вызов по- прежнему может быть использован для сохранения контрольно - пропускных пунктов. Но при этом, если обучение было прервано или успешно завершено, чтобы продолжить обучение с контрольной точки, пользователь должен загрузить модель вручную.

При желании пользователь может выбрать для сохранения и восстановления модели / веса за пределами ModelCheckpoint обратного вызова.

Сохранение и загрузка модели

Для сохранения модели с помощью model.save или tf.saved_model.save , потребность экономии места назначения , чтобы быть различной для каждого работника.

  • Если вы не являетесь руководителем, вам нужно будет сохранить модель во временном каталоге.
  • Для шефа вам нужно будет сохранить в предоставленный каталог модели.

Временные каталоги на воркере должны быть уникальными, чтобы предотвратить ошибки, возникающие в результате того, что несколько воркеров пытаются писать в одно и то же место.

Модель, сохраненная во всех каталогах, идентична, и обычно только модель, сохраненная руководителем, должна использоваться для восстановления или обслуживания.

У вас должна быть некоторая логика очистки, которая удаляет временные каталоги, созданные рабочими, после завершения вашего обучения.

Причина одновременной экономии на начальнике и рабочих заключается в том, что вы можете агрегировать переменные во время контрольной точки, что требует, чтобы и руководитель, и рабочие участвовали в протоколе связи allreduce. С другой стороны, разрешение начальнику и рабочим сохранять в один и тот же каталог модели приведет к ошибкам из-за разногласий.

Использование MultiWorkerMirroredStrategy , программа запускается на каждом работнике, и для того , чтобы знать , является ли главным текущим рабочим, он использует кластер объекта распознавателя , который имеет атрибуты task_type и task_id :

  • task_type говорит вам , что текущая работа (например , 'worker' ).
  • task_id говорит вам идентификатор работника.
  • Работник с task_id == 0 обозначается как главный работник.

В фрагменте кода ниже, write_filepath функция обеспечивает путь к файлу записи, который зависит от работника task_id :

  • Для главного работника (с task_id == 0 ), он пишет к оригинальному пути к файлу.
  • Для других работников, он создает временный СПРАВОЧНИК temp_dirtask_id в пути каталога для записи в:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Теперь вы готовы сохранить:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Как описано выше, позже модель должна загружаться только из пути, в который был сохранен руководитель, поэтому давайте удалим временные, которые сохранили не главные рабочие:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Теперь, когда пришло время , чтобы нагрузки, давайте использовать удобный tf.keras.models.load_model API, и продолжить дальнейшую работу.

Здесь, предположит , что только с помощью одного работника нагрузки и продолжить обучение, в этом случае вы не вызывает tf.keras.models.load_model внутри другого strategy.scope() (обратите внимание , что strategy = tf.distribute.MultiWorkerMirroredStrategy() , как определено ранее ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

Сохранение и восстановление контрольной точки

С другой стороны, контрольные точки позволяют сохранять веса вашей модели и восстанавливать их без необходимости сохранять всю модель.

Здесь вы будете создавать один tf.train.Checkpoint , который отслеживает модель, которая управляется с помощью tf.train.CheckpointManager , так что сохраняется только последняя контрольная точка:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

После того , как CheckpointManager установлен, теперь вы готовы сохранить и удалить контрольные точки были сохранены не-главные рабочие:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Теперь, когда вам нужно восстановить модель, вы можете найти последнюю контрольную точку сохраненной с помощью удобного tf.train.latest_checkpoint функции. После восстановления КПП можно продолжить обучение.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

Обратный вызов BackupAndRestore

tf.keras.callbacks.experimental.BackupAndRestore обратного вызова обеспечивает функции отказоустойчивости путем резервного копирования модели и номер текущей эпохи в временный файл контрольной точки под backup_dir аргумента BackupAndRestore . Это делается в конце каждой эпохи.

Как только задания прерываются и перезапускаются, обратный вызов восстанавливает последнюю контрольную точку, и обучение продолжается с начала прерванной эпохи. Любое частичное обучение, уже выполненное в незавершенную эпоху до прерывания, будет отброшено, так что это не повлияет на окончательное состояние модели.

Для того, чтобы использовать его, предоставить экземпляр tf.keras.callbacks.experimental.BackupAndRestore на Model.fit вызова.

С MultiWorkerMirroredStrategy , если работник будет прерван, весь кластер останавливается до тех пор , прерванный работник не будет перезапущен. Другие воркеры также перезапустятся, и прерванный воркер снова присоединится к кластеру. Затем каждый рабочий читает файл контрольной точки, который был ранее сохранен, и выбирает его прежнее состояние, тем самым позволяя кластеру вернуться в синхронизацию. Затем обучение продолжается.

BackupAndRestore обратного вызова использует CheckpointManager для сохранения и восстановления состояния обучения, который генерирует файл с именем контрольной точки , который отслеживает существующие контрольно - пропускные пункты вместе с последними один. По этой причине, backup_dir не должен быть повторно использована для хранения других контрольно - пропускных пунктов для того , чтобы избежать столкновения имени.

В настоящее время BackupAndRestore обратного вызова поддерживает ни одного работника без каких - либо стратегии, MirroredStrategy и мульти-работника с MultiWorkerMirroredStrategy. Ниже приведены два примера обучения нескольких сотрудников и обучения одного сотрудника.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

Если вы осмотрите каталог backup_dir , указанные в BackupAndRestore , вы можете заметить некоторые временные файлы контрольных точек. Эти файлы необходимы для восстановления ранее утраченных экземпляров, и они будут удалены библиотеками в конце Model.fit после успешной , выходящей вашего обучения.

Дополнительные ресурсы

  1. Распределенное обучение в TensorFlow руководстве дается обзор имеющихся стратегий распределения.
  2. Пользовательские тренировки петля с Keras и MultiWorkerMirroredStrategy учебник показывает , как использовать MultiWorkerMirroredStrategy с Keras и пользовательский тренировочный цикл.
  3. Проверьте официальные модели , многие из которых могут быть сконфигурированы для запуска нескольких стратегий распределения.
  4. Производительность Лучше с tf.function руководством содержит информацию о других стратегиях и инструментах, таких как TensorFlow Profiler можно использовать для оптимизации производительности ваших моделей TensorFlow.