Пользовательский цикл обучения с Keras и MultiWorkerMirroredStrategy

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

В этом руководстве демонстрируется обучение нескольких рабочих с помощью настраиваемого API цикла обучения, распространяемого через MultiWorkerMirroredStrategy, поэтому модель Keras, разработанная для работы с одним рабочим, может беспрепятственно работать с несколькими рабочими с минимальным изменением кода.

Мы используем настраиваемые циклы обучения для обучения нашей модели, потому что они дают нам гибкость и больший контроль над обучением. Кроме того, легче отлаживать модель и цикл обучения. Более подробная информация доступна в разделе Написание цикла обучения с нуля .

Если вы ищете, как использовать MultiWorkerMirroredStrategy с keras model.fit , обратитесь к этому руководству .

Руководство по распределенному обучению в TensorFlow доступно для обзора стратегий распространения, поддерживаемых TensorFlow, для тех, кто заинтересован в более глубоком понимании API-интерфейсов tf.distribute.Strategy .

Настраивать

Во-первых, необходимый импорт.

import json
import os
import sys

Перед импортом TensorFlow внесите несколько изменений в среду.

Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие пытаются использовать один и тот же графический процессор. В реальном приложении каждый рабочий будет на разных машинах.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Сбросьте TF_CONFIG среды TF_CONFIG , вы узнаете об этом позже.

os.environ.pop('TF_CONFIG', None)

Убедитесь, что текущий каталог находится на пути Python. Это позволяет записной книжке позже импортировать файлы, записанные %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Теперь импортируйте TensorFlow.

import tensorflow as tf

Набор данных и определение модели

Затем создайте файл mnist.py с простой настройкой модели и набора данных. Этот файл python будет использоваться рабочими процессами в этом руководстве:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Многопользовательская конфигурация

Теперь давайте войдем в мир обучения нескольких сотрудников. В TF_CONFIG переменная среды TF_CONFIG требуется для обучения на нескольких машинах, каждая из которых, возможно, имеет разные роли. TF_CONFIG используемый ниже, представляет собой строку JSON, используемую для указания конфигурации кластера для каждого рабочего, являющегося частью кластера. Это метод по умолчанию для указания кластера с помощью cluster_resolver.TFConfigClusterResolver , но в модуле distribute.cluster_resolver доступны и другие параметры.

Опишите свой кластер

Вот пример конфигурации:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Вот тот же TF_CONFIG сериализованный как строка JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG из двух компонентов: cluster и task .

  • cluster является одинаковым для всех рабочих и предоставляет информацию об обучающем кластере, который состоит из разных типов заданий, таких как worker . При обучении нескольких сотрудников с помощью MultiWorkerMirroredStrategy обычно есть один worker который берет на себя немного больше ответственности, например, сохранение контрольной точки и написание сводного файла для TensorBoard в дополнение к тому, что делает обычный worker . Такой рабочий называется chief рабочим, и обычно worker с index 0 назначается главным worker (на самом деле так tf.distribute.Strategy ).

  • task предоставляет информацию о текущей задаче и отличается для каждого работника. Он определяет type и index этого рабочего.

В этом примере вы устанавливаете type задачи на "worker" и index задачи на 0 . Эта машина является первым рабочим, который будет назначен главным рабочим и будет выполнять больше работы, чем другие. Обратите внимание, что на других машинах также должна быть TF_CONFIG переменная среды TF_CONFIG , и у нее должен быть такой же cluster dict, но другой type задачи или index задачи в зависимости от того, каковы роли этих машин.

В целях иллюстрации в этом руководстве показано, как можно установить TF_CONFIG с двумя рабочими TF_CONFIG на localhost TF_CONFIG . На практике пользователи могут создать несколько рабочих TF_CONFIG на внешних IP-адресах / портах и ​​соответствующим образом установить TF_CONFIG для каждого рабочего.

В этом примере вы будете использовать 2 TF_CONFIG , TF_CONFIG первого TF_CONFIG показан выше. Для второго tf_config['task']['index']=1 вы должны установить tf_config['task']['index']=1

Выше tf_config - это просто локальная переменная в Python. Чтобы использовать его для настройки обучения, этот словарь необходимо сериализовать как JSON и поместить в переменную среды TF_CONFIG .

Переменные среды и подпроцессы в записных книжках

Подпроцессы наследуют переменные среды от своего родителя. Итак, если вы установите переменную среды в этом jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Вы можете получить доступ к переменной среды из подпроцессов:

echo ${GREETINGS}
Hello TensorFlow!

В следующем разделе вы будете использовать это, чтобы передать TF_CONFIG рабочим подпроцессам. Вы бы никогда не запустили свои рабочие места таким образом, но этого достаточно для целей данного руководства: продемонстрировать минимальный пример с несколькими рабочими.

MultiWorkerMirroredStrategy

Для обучения модели используйте экземпляр tf.distribute.MultiWorkerMirroredStrategy , который создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих процессов. В руководстве tf.distribute.Strategy есть более подробная информация об этой стратегии.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

Используйте tf.distribute.Strategy.scope чтобы указать, что стратегия должна использоваться при построении вашей модели. Это помещает вас в « контекст кросс-реплик » для этой стратегии, что означает, что стратегия получает контроль над такими вещами, как размещение переменных.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Автоматически распределяйте данные между воркерами

При обучении с несколькими работниками сегментирование набора данных не обязательно требуется, однако оно дает вам только один раз семантику, что делает большее обучение более воспроизводимым, то есть обучение нескольких рабочих должно быть таким же, как обучение одного рабочего. Примечание: в некоторых случаях производительность может снизиться.

См. distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Определите пользовательский цикл обучения и обучите модель

Укажите оптимизатор

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Определите шаг обучения с помощью tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Сохранение и восстановление контрольной точки

Реализация контрольной точки в пользовательском цикле обучения требует, чтобы пользователь обрабатывал ее вместо использования обратного вызова keras. Это позволяет сохранять веса модели и восстанавливать их без необходимости сохранять всю модель.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Здесь вы создадите одну tf.train.Checkpoint которая отслеживает модель, которой управляет tf.train.CheckpointManager так что tf.train.CheckpointManager только последняя контрольная точка.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Теперь, когда вам нужно восстановить, вы можете найти последнюю сохраненную контрольную точку с помощью удобной функции tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

После восстановления контрольной точки вы можете продолжить обучение своему индивидуальному циклу обучения.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

Полная настройка кода на воркеры

Чтобы действительно работать с MultiWorkerMirroredStrategy вам нужно запустить рабочие процессы и передать им TF_CONFIG .

Как и файл mnist.py написанный ранее, вот main.py который содержит тот же код, который мы шаг за шагом прошли ранее в этом colab, мы просто записываем его в файл, чтобы каждый из рабочих запускал его:

Файл: main.py

Writing main.py

Тренируй и оценивай

Текущий каталог теперь содержит оба файла Python:

ls *.py
main.py
mnist.py

Итак, json-сериализуйте TF_CONFIG и добавьте его в переменные среды:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь вы можете запустить рабочий процесс, который будет запускать main.py и использовать TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

По поводу приведенной выше команды следует отметить несколько моментов:

  1. Он использует %%bash который представляет собой «волшебство» записной книжки для выполнения некоторых команд bash.
  2. Он использует флаг --bg для запуска процесса bash в фоновом режиме, потому что этот рабочий процесс не завершится. Перед запуском он ждет всех рабочих.

Фоновый рабочий процесс не будет печатать вывод в этот блокнот, поэтому &> перенаправляет его вывод в файл, чтобы вы могли видеть, что произошло.

Итак, подождите несколько секунд, пока процесс запустится:

import time
time.sleep(20)

Теперь посмотрим, что уже было выведено в лог-файл рабочего:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

В последней строке файла журнала должно быть указано: Started server with target: grpc://localhost:12345 . Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить.

Итак, обновите tf_config чтобы его tf_config второй рабочий процесс:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь запускаем второго рабочего. Это запустит обучение, поскольку все рабочие активны (поэтому нет необходимости в фоновом режиме этого процесса):

python main.py > /dev/null 2>&1

Теперь, если вы перепроверите журналы, написанные первым воркером, вы увидите, что он участвовал в обучении этой модели:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Углубленное обучение нескольких сотрудников

В этом руководстве был продемонстрирован рабочий процесс Custom Training Loop для многопользовательской настройки. Подробное описание других тем доступно в руководстве model.fit's guide настройке нескольких рабочих и применимо к CTL.

Смотрите также

  1. Руководство по распределенному обучению в TensorFlow предоставляет обзор доступных стратегий распространения.
  2. Официальные модели , многие из которых можно настроить для запуска нескольких стратегий распространения.
  3. В разделе « Производительность» руководства представлена ​​информация о других стратегиях и инструментах, которые вы можете использовать для оптимизации производительности ваших моделей TensorFlow.