RSVP для вашего местного мероприятия TensorFlow Everywhere сегодня!
Эта страница переведена с помощью Cloud Translation API.
Switch to English

Обучение нескольких рабочих с Керасом

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

В этом руководстве демонстрируется распределенное обучение нескольких сотрудников с использованием модели tf.distribute.Strategy с использованием tf.distribute.Strategy API, в частности tf.distribute.MultiWorkerMirroredStrategy . С помощью этой стратегии модель Keras, которая была разработана для работы с одним рабочим, может беспрепятственно работать с несколькими рабочими с минимальным изменением кода.

Руководство по распределенному обучению в TensorFlow доступно для обзора стратегий распространения, поддерживаемых TensorFlow, для тех, кто заинтересован в более глубоком понимании API-интерфейсов tf.distribute.Strategy .

Настраивать

Во-первых, необходимый импорт.

import json
import os
import sys

Перед импортом TensorFlow внесите несколько изменений в среду.

Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие пытаются использовать один и тот же графический процессор. В реальном приложении каждый рабочий будет на разных машинах.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Сбросьте TF_CONFIG среды TF_CONFIG , вы узнаете об этом позже.

os.environ.pop('TF_CONFIG', None)

Убедитесь, что текущий каталог находится на пути Python. Это позволяет записной книжке позже импортировать файлы, записанные %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Теперь импортируйте TensorFlow.

import tensorflow as tf

Определение набора данных и модели

Затем создайте файл mnist.py с простой настройкой модели и набора данных. Этот файл python будет использоваться рабочими процессами в этом руководстве:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Попробуйте обучить модель небольшому количеству эпох и понаблюдайте за результатами одного рабочего, чтобы убедиться, что все работает правильно. По мере продвижения тренировки потери должны уменьшаться, а точность - увеличиваться.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2959 - accuracy: 0.0977
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2311 - accuracy: 0.2726
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1668 - accuracy: 0.4236

<tensorflow.python.keras.callbacks.History at 0x7f62c6ec0780>

Многопользовательская конфигурация

Теперь давайте войдем в мир обучения нескольких сотрудников. В TF_CONFIG переменная среды TF_CONFIG требуется для обучения на нескольких машинах, каждая из которых, возможно, играет свою роль. TF_CONFIG - это строка JSON, используемая для указания конфигурации кластера для каждого рабочего, являющегося частью кластера.

Вот пример конфигурации:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Вот тот же TF_CONFIG сериализованный как строка JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG из двух компонентов: cluster и task .

  • cluster одинаков для всех рабочих и предоставляет информацию об обучающем кластере, который состоит из разных типов заданий, таких как worker . При обучении нескольких рабочих с помощью MultiWorkerMirroredStrategy обычно есть один worker который берет на себя немного больше ответственности, например, сохранение контрольной точки и написание сводного файла для TensorBoard в дополнение к тому, что делает обычный worker . Такой рабочий называется chief рабочим, и обычно worker с index 0 назначается главным worker (на самом деле так tf.distribute.Strategy ).

  • task предоставляет информацию о текущей задаче и отличается для каждого работника. Он определяет type и index этого рабочего.

В этом примере вы устанавливаете type задачи "worker" а index задачи - 0 . Эта машина является первым рабочим, который будет назначен главным рабочим и будет выполнять больше работы, чем другие. Обратите внимание, что на других машинах также должна быть TF_CONFIG переменная среды TF_CONFIG , и она должна иметь тот же cluster dict, но другой type задачи или index задачи в зависимости от того, каковы роли этих машин.

В целях иллюстрации в этом руководстве показано, как можно установить TF_CONFIG с двумя рабочими TF_CONFIG на localhost . На практике пользователи могут создать несколько рабочих TF_CONFIG на внешних IP-адресах / портах и ​​соответствующим образом установить TF_CONFIG для каждого рабочего.

В этом примере вы будете использовать 2 TF_CONFIG , TF_CONFIG первого TF_CONFIG показан выше. Для второго рабочего вы должны установить tf_config['task']['index']=1

Выше tf_config - это просто локальная переменная в python. Чтобы действительно использовать его для настройки обучения, этот словарь необходимо сериализовать как JSON и поместить в переменную среды TF_CONFIG .

Переменные среды и подпроцессы в записных книжках

Подпроцессы наследуют переменные среды от своего родителя. Итак, если вы установите переменную среды в этом jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Вы можете получить доступ к переменной среды из подпроцессов:

echo ${GREETINGS}
Hello TensorFlow!

В следующем разделе вы будете использовать это для передачи TF_CONFIG рабочим подпроцессам. Вы бы никогда не запустили свои рабочие места таким образом, но этого достаточно для целей данного руководства: продемонстрировать минимальный пример с несколькими рабочими.

Выбери правильную стратегию

В TensorFlow есть две основные формы распределенного обучения:

  • Синхронное обучение, при котором шаги обучения синхронизируются между рабочими и репликами, а также
  • Асинхронное обучение, при котором шаги обучения строго не синхронизируются.

MultiWorkerMirroredStrategy этом руководстве будет продемонстрирована стратегия MultiWorkerMirroredStrategy , которая является рекомендуемой стратегией для синхронного обучения нескольких сотрудников. Для обучения модели используйте экземпляр tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих MultiWorkerMirroredStrategy . Он использует CollectiveOps , операцию TensorFlow для коллективного общения, чтобы агрегировать градиенты и синхронизировать переменные. В руководстве tf.distribute.Strategy есть более подробная информация об этой стратегии.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy предоставляет несколько реализаций через параметр CommunicationOptions . RING реализует группы на основе кольца, используя gRPC в качестве уровня связи между хостами. NCCL использует NCCL Nvidia для реализации коллективов. AUTO откладывает выбор на время выполнения. Наилучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также от сетевого соединения в кластере.

Обучите модель

С интеграцией tf.distribute.Strategy API в tf.keras единственное изменение, которое вы сделаете для распространения обучения среди нескольких рабочих, - это включение построения модели и model.compile() в strategy.scope() . Объем стратегии распределения определяет, как и где создаются переменные, а в случае MultiWorkerMirroredStrategy создаваемые переменные являются MirroredVariable s, и они реплицируются на каждом из рабочих процессов.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Чтобы на самом деле работать с MultiWorkerMirroredStrategy вам необходимо запустить рабочие процессы и передать им TF_CONFIG .

Как и в mnist.py написанном файле mnist.py файл main.py который будет запускать каждый из рабочих процессов:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

В приведенном выше фрагменте кода обратите внимание, что global_batch_size , который передается в Dataset.batch , имеет значение per_worker_batch_size * num_workers . Это гарантирует, что каждый рабочий процесс обрабатывает партии примеров per_worker_batch_size независимо от количества рабочих.

Текущий каталог теперь содержит оба файла Python:

ls *.py
main.py
mnist.py

Итак, json-сериализуйте TF_CONFIG и добавьте его в переменные среды:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь вы можете запустить рабочий процесс, который будет запускать main.py и использовать TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.

python main.py &> job_0.log

О приведенной выше команде следует отметить несколько моментов:

  1. Он использует %%bash который представляет собой «волшебство» записной книжки для выполнения некоторых команд bash.
  2. Он использует флаг --bg для запуска процесса bash в фоновом режиме, потому что этот рабочий процесс не завершится. Он ждет всех рабочих перед запуском.

Фоновый рабочий процесс не будет печатать вывод в этот блокнот, поэтому &> перенаправляет его вывод в файл, чтобы вы могли видеть, что произошло.

Итак, подождите несколько секунд, пока процесс запустится:

import time
time.sleep(10)

Теперь посмотрим, что уже было выведено в лог-файл рабочего:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

В последней строке файла журнала должно быть указано: Started server with target: grpc://localhost:12345 . Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить.

Итак, обновите tf_config чтобы его tf_config второй рабочий процесс:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь запускаем второй рабочий. Это запустит обучение, так как все рабочие активны (поэтому нет необходимости в фоновом режиме этого процесса):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

2021-02-23 02:20:43.794926: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:45.375845: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:45.376779: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:46.347650: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:46.347716: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347726: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347847: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:46.347887: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:46.347898: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:46.348715: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:46.349096: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.349700: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.353497: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:46.353936: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-02-23 02:20:47.285814: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.507974: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508360: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz

Теперь, если вы повторно проверите журналы, написанные первым воркером, вы увидите, что он участвовал в обучении этой модели:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-02-23 02:20:47.286117: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.508657: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508964: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

Неудивительно, что это работало медленнее, чем тестовый запуск в начале этого руководства. Запуск нескольких рабочих процессов на одной машине только увеличивает накладные расходы. Целью здесь было не сократить время обучения, а только показать пример обучения нескольких рабочих.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Углубленное обучение нескольких сотрудников

До сих пор это руководство демонстрировало базовую настройку для нескольких рабочих. Остальная часть этого документа подробно рассматривает другие факторы, которые могут быть полезны или важны для реальных случаев использования.

Шардинг набора данных

При обучении нескольких сотрудников сегментирование набора данных необходимо для обеспечения конвергенции и производительности.

Пример в предыдущем разделе основан на автошардинге по умолчанию, предоставляемом tf.distribute.Strategy API. Вы можете управлять сегментированием, задав tf.data.experimental.AutoShardPolicy для tf.data.experimental.DistributeOptions . Чтобы узнать больше об автоматическом сегментировании, см. Руководство по распределенному вводу .

Вот быстрый пример того, как отключить автоматическое разделение, чтобы каждая реплика обрабатывала каждый пример (не рекомендуется):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Оценка

Если вы передадите validation_data в model.fit , он будет чередовать обучение и оценку для каждой эпохи. Оценка, принимающая validation_data , распределяется между одним и тем же набором рабочих, а результаты оценки агрегируются и доступны для всех рабочих. Подобно обучению, набор данных проверки автоматически сегментируется на уровне файла. Вам необходимо установить глобальный размер пакета в наборе данных проверки и установить validation_steps . Для оценки также рекомендуется повторный набор данных.

В качестве альтернативы вы также можете создать другую задачу, которая периодически считывает контрольные точки и запускает оценку. Это то, что делает Оценщик. Но это не рекомендуемый способ выполнения оценки, поэтому его подробности опускаются.

Спектакль

Теперь у вас есть модель MultiWorkerMirroredStrategy настроенная для работы в нескольких рабочих MultiWorkerMirroredStrategy с помощью MultiWorkerMirroredStrategy . Вы можете попробовать следующие методы, чтобы настроить производительность обучения нескольких сотрудников с помощью MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy предоставляет несколько реализаций коллективного взаимодействия . RING реализует группы на основе кольца, используя gRPC в качестве уровня связи между хостами. NCCL использует NCCL Nvidia для реализации коллективов. AUTO откладывает выбор на время выполнения. Наилучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также от сетевого соединения в кластере. Чтобы отменить автоматический выбор, указать communication_options параметр MultiWorkerMirroredStrategy конструктора «s, например communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • Если возможно, tf.float переменные в tf.float . Официальная модель ResNet включает пример того, как это можно сделать.

Отказоустойчивость

При синхронном обучении кластер выйдет из строя, если один из рабочих выйдет из строя и не существует механизма восстановления после сбоя. Использование tf.distribute.Strategy с tf.distribute.Strategy преимущество отказоустойчивости в случаях, когда рабочие умирают или работают нестабильно. Вы делаете это, сохраняя состояние обучения в распределенной файловой системе по вашему выбору, так что при перезапуске экземпляра, который ранее не удался или был вытеснен, состояние обучения восстанавливается.

Когда рабочий становится недоступным, другие рабочие терпят неудачу (возможно, после тайм-аута). В таких случаях необходимо перезапустить недоступный работник, а также другие вышедшие из строя рабочие.

Обратный вызов ModelCheckpoint

ModelCheckpoint вызов ModelCheckpoint больше не обеспечивает функциональность отказоустойчивости, используйте вместо BackupAndRestore обратный вызов BackupAndRestore .

ModelCheckpoint вызов ModelCheckpoint прежнему можно использовать для сохранения контрольных точек. Но при этом, если обучение было прервано или успешно завершено, чтобы продолжить обучение с контрольной точки, пользователь должен загрузить модель вручную.

При желании пользователь может выбрать сохранение и восстановление модели / весов вне ModelCheckpoint обратного вызова ModelCheckpoint .

Сохранение и загрузка модели

Чтобы сохранить модель с помощью model.save или tf.saved_model.save , место сохранения должно быть различным для каждого рабочего. На нештатных рабочих вам нужно будет сохранить модель во временный каталог, а на начальнике вам нужно будет сохранить в предоставленный каталог модели. Временные каталоги на рабочем сервере должны быть уникальными, чтобы предотвратить ошибки, возникающие из-за того, что несколько рабочих процессов пытаются писать в одно и то же место. Модели, сохраненные во всех каталогах, идентичны, и обычно для восстановления или обслуживания следует ссылаться только на модель, сохраненную руководителем. У вас должна быть некоторая логика очистки, которая удаляет временные каталоги, созданные рабочими, после завершения вашего обучения.

Причина, по которой вам нужно сэкономить на начальнике и рабочих одновременно, заключается в том, что вы можете агрегировать переменные во время контрольной точки, что требует, чтобы и руководитель, и рабочие участвовали в протоколе связи allreduce. С другой стороны, разрешение начальнику и рабочим сохранять в одном каталоге модели приведет к ошибкам из-за разногласий.

С MultiWorkerMirroredStrategy программа запускается для каждого рабочего, и для того, чтобы узнать, является ли текущий работник главным, он использует объект task_type кластера, который имеет атрибуты task_type и task_id . task_type сообщает вам текущее задание (например, «worker»), а task_id сообщает вам идентификатор исполнителя. Рабочий с идентификатором 0 назначается старшим.

В приведенном ниже фрагменте кода write_filepath предоставляет путь к файлу для записи, который зависит от идентификатора рабочего стола. В случае руководителя (работника с идентификатором 0) он записывает в исходный путь к файлу; для других он создает временный каталог (с идентификатором в пути к каталогу) для записи:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Теперь вы готовы сохранить:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Как описано выше, позже модель должна загружаться только из пути, в который был сохранен руководитель, поэтому давайте удалим временные, которые сохранили неосновные рабочие:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Теперь, когда пора загружаться, воспользуемся удобным API tf.keras.models.load_model и продолжим дальнейшую работу. Здесь предположим, что для загрузки и продолжения обучения используется только один рабочий процесс, и в этом случае вы не вызываете tf.keras.models.load_model в другой strategy.scope() tf.keras.models.load_model strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 13ms/step - loss: 2.3041 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2873 - accuracy: 0.0023

<tensorflow.python.keras.callbacks.History at 0x7f62c4ef5048>

Сохранение и восстановление контрольной точки

С другой стороны, контрольные точки позволяют сохранять веса модели и восстанавливать их без необходимости сохранять всю модель. Здесь вы создадите одну tf.train.Checkpoint которая отслеживает модель, которой управляет tf.train.CheckpointManager так что tf.train.CheckpointManager только последняя контрольная точка.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

После настройки CheckpointManager вы готовы к сохранению и удалению сохраненных контрольных точек, не являющихся руководителями.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Теперь, когда вам нужно восстановить, вы можете найти последнюю сохраненную контрольную точку с помощью удобной функции tf.train.latest_checkpoint . После восстановления КПП можно продолжить обучение.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.3050 - accuracy: 0.0920
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2963 - accuracy: 0.0896

<tensorflow.python.keras.callbacks.History at 0x7f62c44a2710>

Обратный вызов BackupAndRestore

Обратный вызов BackupAndRestore обеспечивает отказоустойчивость путем резервного копирования модели и номера текущей эпохи во временном файле контрольной backup_dir аргументе BackupAndRestore для BackupAndRestore . Это делается в конце каждой эпохи.

Как только задания прерываются и перезапускаются, обратный вызов восстанавливает последнюю контрольную точку, и обучение продолжается с начала прерванной эпохи. Любое частичное обучение, уже выполненное в незавершенную эпоху до прерывания, будет отброшено, так что это не повлияет на окончательное состояние модели.

Чтобы использовать его, предоставьте экземпляр tf.keras.callbacks.experimental.BackupAndRestore при tf.keras.Model.fit() .

С MultiWorkerMirroredStrategy, если воркер прерывается, весь кластер приостанавливается, пока прерванный воркер не будет перезапущен. Другие воркеры также перезапустятся, и прерванный воркер снова присоединится к кластеру. Затем каждый рабочий читает файл контрольной точки, который был ранее сохранен, и выбирает его прежнее состояние, тем самым позволяя кластеру вернуться в синхронизацию. Затем обучение продолжается.

BackupAndRestore вызов BackupAndRestore использует CheckpointManager для сохранения и восстановления состояния обучения, который создает файл с именем контрольная точка, который отслеживает существующие контрольные точки вместе с последней. По этой причине backup_dir не следует повторно использовать для хранения других контрольных точек, чтобы избежать конфликта имен.

В настоящее время BackupAndRestore вызов BackupAndRestore поддерживает одного рабочего без стратегии, MirroredStrategy, и нескольких рабочих с MultiWorkerMirroredStrategy. Ниже приведены два примера обучения нескольких сотрудников и обучения одного сотрудника.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2930 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2467 - accuracy: 0.2765
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1963 - accuracy: 0.3645

<tensorflow.python.keras.callbacks.History at 0x7f62c4371390>

Если вы проверите каталог backup_dir вы указали в BackupAndRestore , вы можете заметить некоторые временно созданные файлы контрольных точек. Эти файлы необходимы для восстановления ранее утерянных экземпляров, и они будут удалены библиотекой в ​​конце tf.keras.Model.fit() после успешного завершения вашего обучения.

Смотрите также

  1. Руководство по распределенному обучению в TensorFlow предоставляет обзор доступных стратегий распространения.
  2. Официальные модели , многие из которых можно настроить для запуска нескольких стратегий распространения.
  3. В разделе « Производительность» руководства представлена ​​информация о других стратегиях и инструментах, которые вы можете использовать для оптимизации производительности ваших моделей TensorFlow.