Обучение сервера параметров с помощью ParameterServerStrategy

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

Обучение сервера Параметр является распространенным методом данных параллельно для расширения модели обучения на нескольких машинах.

Учебный кластер серверов параметров состоит из рабочих серверов и параметров. Переменные создаются на серверах параметров, и они читаются и обновляются рабочими на каждом этапе. По умолчанию рабочие читают и обновляют эти переменные независимо, без синхронизации друг с другом. Поэтому иногда параметр обучения сервера стиль называется асинхронным обучение.

В TensorFlow 2, обучение сервера параметров питаются от tf.distribute.experimental.ParameterServerStrategy класса, который распределяет учебные шаги в кластер , который масштабируется до тысяч рабочих ( в сопровождении серверов параметров).

Поддерживаемые методы обучения

Есть два основных поддерживаемых метода обучения:

Кластер с заданиями и задачами

Независимо от API выбора ( Model.fit или пользовательские учебного цикла), распределенное обучение в TensorFlow 2 включает в себя: а 'cluster' с несколькими 'jobs' , и каждый из рабочих мест может иметь один или несколько 'tasks' .

При использовании обучения сервера параметров рекомендуется иметь:

  • Один координатор работы (который имеет название вакансии chief )
  • Несколько рабочих места рабочих (имя задания worker ); и
  • Несколько заданий сервера параметр (имя задания ps )

В то время как координатор создает ресурсы, депеши учебных задачи, пишет контрольные точки, и имеет дело с неудачами задач, рабочими и серверами параметров запуска tf.distribute.Server , что прослушивание запросов от координатора.

Обучение сервера параметров с Model.fit API

Сервер обучение параметров с Model.fit API требует координатора , чтобы использовать tf.distribute.experimental.ParameterServerStrategy объект, и tf.keras.utils.experimental.DatasetCreator в качестве входных данных. Подобно Model.fit использование без какой - либо стратегии, или с другими стратегиями, рабочий процесс включает в себя создание и составления модели, подготовки обратных вызовов, сопровождаемые Model.fit вызова.

Обучение сервера параметров с помощью настраиваемого цикла обучения

С пользовательских тренировочных циклов, то tf.distribute.experimental.coordinator.ClusterCoordinator класс является ключевым компонентом , используемым для координатора.

  • ClusterCoordinator класс должен работать совместно с tf.distribute.Strategy объекта.
  • Этот tf.distribute.Strategy объект необходим для предоставления информации кластера и используется для определения стадии подготовки, как показано на заказ обучения с tf.distribute.Strategy .
  • ClusterCoordinator объект затем отправляет выполнение этих учебных шагов для удаленных работников.
  • Для подготовки сервера параметров, то ClusterCoordinator нужно работать с tf.distribute.experimental.ParameterServerStrategy .

Наиболее важный API обеспечивается ClusterCoordinator объекта schedule :

  • schedule API ставит в очередь на tf.function и возвращает будущее-как RemoteValue немедленно.
  • В очереди функция будет отправлена в удаленные рабочие фоновых потоках и их RemoteValue s будет заполнен асинхронно.
  • Поскольку schedule не требует назначений рабочих, tf.function прошел в может быть выполнен на любом доступном работнике.
  • Если рабочий процесс, на котором она выполняется, становится недоступным до своего завершения, функция будет повторена на другом доступном работнике.
  • Из-за этого факта и того факта, что выполнение функции не является атомарным, функция может выполняться более одного раза.

В дополнение к диспетчеризации удаленных функций, то ClusterCoordinator также помогает создавать наборы данных на всех рабочих и восстановить эти наборы данных , когда работник восстанавливается после сбоя.

Настройка учебного пособия

Учебник будет разветвляться в Model.fit и пользовательских пути обучения петли, и вы можете выбрать тот , который соответствует вашим потребностям. Разделы, отличные от «Обучение с X», применимы к обоим путям.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

Настройка кластера

Как уже упоминалось выше, учебный сервер параметров кластера требует задача координатора , который работает программу тренировок, один или несколько рабочих и задач сервера параметров , которые работают TensorFlow Серверы- tf.distribute.Server -И , возможно , дополнительная задача оценки , которая по оценке работает бок автомобиля (см. раздел оценки коляски ниже). Требования для их установки:

  • Задача координатора должна знать адреса и порты всех других серверов TensorFlow, кроме оценщика.
  • Рабочие и серверы параметров должны знать, какой порт им нужно слушать. Для простоты вы обычно можете передавать полную информацию о кластере при создании серверов TensorFlow для этих задач.
  • Задаче оценщика не обязательно знать настройку обучающего кластера. Если это так, он не должен пытаться подключиться к обучающему кластеру.
  • Рабочие и серверы параметров должны иметь типы задач , как "worker" и "ps" соответственно. Координатор должен использовать "chief" в качестве типа задачи унаследованных причин.

В этом руководстве вы создадите внутрипроцессный кластер, чтобы все обучение сервера параметров можно было запустить в Colab. Вы узнаете , как создать реальные кластеры в следующем разделе.

Внутрипроцессный кластер

Вы начнете с создания нескольких серверов TensorFlow заранее и подключитесь к ним позже. Обратите внимание , что это только с целью демонстрации этого обучающей программы , и в режиме реального обучения сервера будут запущены на "worker" и "ps" машины.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

Установка кластера в-процесс часто используется в модульном тестировании, например, здесь .

Еще один вариант для локального тестирования для запуска процессов на локальном компьютер-заканчивал несколько рабочих тренировки с Keras для примера такого подхода.

Создание экземпляра ParameterServerStrategy

Перед тем, как погрузиться в учебном код, давайте экземпляр ParameterServerStrategy объекту. Обратите внимание , что это необходимо , независимо от того , вы приступить Model.fit или пользовательского учебного цикла. variable_partitioner аргумент будет объяснен в разделе Variable сегментирования .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

Чтобы использовать графические процессоры для обучения, выделите графические процессоры, видимые каждому работнику. ParameterServerStrategy будет использовать все доступные графические процессоры на каждом работнике, с тем ограничением , что все работники должны иметь одинаковое количество графических процессоров , доступных.

Переменный сегмент

Переменная Sharding относится к расщеплению переменной на несколько меньших переменных, которые называются осколками. Переменное сегментирование может быть полезно для распределения сетевой нагрузки при доступе к этим сегментам. Также полезно распределить вычисление и хранение нормальной переменной между несколькими серверами параметров.

Чтобы включить переменный шардинг, вы можете перейти в variable_partitioner при построении ParameterServerStrategy объекта. variable_partitioner будет вызываться каждый раз , когда создается переменная , и , как ожидается, возвращает количество черепков по каждому измерению переменной. Некоторые из-из-коробки variable_partitioner s предоставляется таким как tf.distribute.experimental.partitioners.MinSizePartitioner . Рекомендуется использовать размер на основе , как редактирования разделов tf.distribute.experimental.partitioners.MinSizePartitioner , чтобы избежать разделения мелких переменных, которые могут оказать негативное влияние на скорость модели обучения.

Когда variable_partitioner передаются в и если вы создаете переменную непосредственно под strategy.scope() , он станет типом контейнера с variables свойством , которое обеспечивает доступ к списку осколков. В большинстве случаев этот контейнер будет автоматически преобразован в тензор путем объединения всех осколков. В результате его можно использовать как обычную переменную. С другой стороны, некоторые методы , такие как TensorFlow tf.nn.embedding_lookup обеспечивают эффективную реализацию для данного типа контейнера , и в этих методах автоматическая конкатенация будет избегать.

Пожалуйста , смотрите API документацию по tf.distribute.experimental.ParameterServerStrategy для более подробной информации.

Обучение с Model.fit

Keras обеспечивает простые в использовании обучения API с помощью Model.fit , который обрабатывает цикл обучения под капотом, с гибкостью Overridable train_step , и обратные вызовы, которые обеспечивают функциональные возможности, такие как сохранение контрольных точек или резюме спасительное для TensorBoard. С Model.fit , тот же код обучения может быть использован для других стратегий с помощью простого обмена объекта стратегии.

Входные данные

Model.fit с обучением сервера параметра требует , чтобы входные данные были представлены в вызываемом , которая принимает один аргумент типа tf.distribute.InputContext и возвращает tf.data.Dataset . Затем создайте tf.keras.utils.experimental.DatasetCreator объект , который принимает такие callable и необязательный tf.distribute.InputOptions объект с помощью input_options аргумента.

Обратите внимание , что рекомендуется перемешать и повторить данные с сервером подготовкой параметров, а также указать steps_per_epoch в fit звонка , поэтому библиотека знает границы эпох.

Пожалуйста , обратитесь к децентрализованному вводу учебника для получения дополнительной информации о InputContext аргументе.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Код в dataset_fn будет вызван на устройство ввода, которое обычно ЦП, на каждом из рабочих машин.

Построение и компиляция модели

Теперь вы создадите tf.keras.Model -a тривиального tf.keras.models.Sequential модели для демонстрационных целей-сопровождаемых Model.compile вызова включать компоненты, такие как оптимизатор, метрику или параметры , такие как steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Обратные звонки и обучение

Перед тем, как позвонить model.fit для фактического обучения, давайте подготовить необходимые обратные вызовы для общих задач, таких как:

  • ModelCheckpoint : для сохранения веса модели.
  • BackupAndRestore : чтобы убедиться , что обучение прогресс автоматически резервное копирование и восстановить , если кластер опыт недоступность (например, преждевременное прекращение или упреждение); или
  • TensorBoard : сохранять отчеты о ходе работы в сводные файлы, которые получают визуализированных в TensorBoard инструмента.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

Прямое использование с ClusterCoordinator ( по желанию)

Даже если вы выбираете Model.fit учебный путь, вы можете дополнительно создать экземпляр tf.distribute.experimental.coordinator.ClusterCoordinator объекта планировать другие функции , которые вы хотели бы выполнить на рабочих. Смотрите обучение с тренировки петлевой пользовательского раздела для более подробной информации и примеров.

Обучение с индивидуальным циклом обучения

Использование пользовательских учебных циклов с tf.distribute.Strategy обеспечивает большую гибкость для определения учебных циклов. С ParameterServerStrategy определенно выше (как strategy ), вы будете использовать tf.distribute.experimental.coordinator.ClusterCoordinator направить на выполнение учебных шагов для удаленных работников.

Затем вы создадите модель, определить набор данных и ступенчатую функцию, как вы сделали в тренировочном цикле с другим tf.distribute.Strategy s. Вы можете найти более подробную информацию в пользовательской тренировке с tf.distribute.Strategy учебником.

Для обеспечения эффективных набора данных предзапросов, использовать рекомендованный распределенную набора данных API для создания упомянутого в учебных шагах диспетчерских к удаленным работникам разделу ниже. Кроме того , убедитесь , чтобы позвонить Strategy.run внутри worker_fn , чтобы в полной мере воспользоваться графическими процессорами , выделенных рабочих. Остальные шаги одинаковы для обучения с графическим процессором или без него.

Давайте создадим эти компоненты, выполнив следующие шаги:

Настроить данные

Во- первых, написать функцию , которая создает набор данных , который включает в себя предварительную обработку , логику , реализованный Keras Preprocessing слоев .

Вы будете создавать эти слои за пределами dataset_fn но применить преобразование внутри dataset_fn , так как вы будете обернуть dataset_fn в tf.function , которая не позволяет переменные , которые будут созданы внутри него.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Сгенерируйте примеры игрушек в наборе данных:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Затем создать обучающий набор данных , завернутые в dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Построить модель

Далее создайте модель и другие объекты. Убедитесь в том , чтобы создать все переменные под strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Давай подтвердить , что использование FixedShardsPartitioner разделить все переменные на два осколки и каждый осколок был назначен к различным серверам параметров:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Определите шаг обучения

В- третьих, создать шаг обучения , обернутый в tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

В приведенном выше обучение ступенчатой функции, вызывая Strategy.run и Strategy.reduce в step_fn может поддерживать несколько графических процессоров на одного работника. Если рабочие графические процессоры выделяются, Strategy.run будет распространять наборы данных на нескольких репликах.

Рассылка инструкций по обучению удаленным сотрудникам

После того, как все вычисления определяется ParameterServerStrategy , вы будете использовать tf.distribute.experimental.coordinator.ClusterCoordinator класс для создания ресурсов и распространять учебные шаги для удаленных работников.

Давайте сначала создадим ClusterCoordinator объект и передать в объект стратегии:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Затем создайте набор данных для каждого работника и итератор. В per_worker_dataset_fn ниже, обертывание dataset_fn в strategy.distribute_datasets_from_function рекомендуется обеспечить эффективное предзапросы для графических процессоров бесшовно.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Заключительный шаг , чтобы распределить вычисления для удаленных сотрудников , использующих ClusterCoordinator.schedule :

  • schedule метод ставит в очередь на tf.function и возвращает будущее-как RemoteValue немедленно. В очереди функция будет отправлена в удаленные рабочий в фоновых потоках и RemoteValue будет заполнен асинхронно.
  • join метод ( ClusterCoordinator.join ) можно подождать , пока все запланированные функции не выполняются.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Вот как вы можете получить результат в RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Как вариант, вы можете запустить все шаги и что-то сделать, ожидая завершения:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Для полного обучения и обслуживающего процесса для данного конкретного примера, пожалуйста , проверьте этот тест .

Подробнее о создании набора данных

Набор данных в приведенном выше коде создается с помощью ClusterCoordinator.create_per_worker_dataset API). Он создает один набор данных для каждого рабочего и возвращает объект-контейнер. Вы можете вызвать iter метод на него , чтобы создать в-рабочего итератор. За рабочий итератор содержит один итератор на одного работника и соответствующий срез работника будет заменен во входном аргументе функции переданного ClusterCoordinator.schedule метода до того , как функция выполняются на определенный работнике.

В настоящее время ClusterCoordinator.schedule метод предполагает рабочие эквивалентны и , таким образом , предполагает , что наборы данных на разных рабочих одни и те же , за исключением они могут быть перемешиваются по- разному , если они содержат Dataset.shuffle операцию. Из - за этого, рекомендуется также , что наборы данных повторяться бесконечно и запланировать конечное число шагов , вместо того , чтобы полагаться на OutOfRangeError из набора данных.

Еще одно важное замечание, что tf.data наборы данных не поддерживают неявной сериализации и десериализации через границы задач. Поэтому важно , чтобы создать целый набор данных внутри функции передается ClusterCoordinator.create_per_worker_dataset .

Оценка

Существует несколько способов определения и запуска цикла оценки в распределенном обучении. У каждого есть свои плюсы и минусы, как описано ниже. Если у вас нет предпочтений, рекомендуется использовать встроенный метод оценки.

Встроенная оценка

В этом методе координатор чередует подготовки и оценки и , таким образом , она называется его инлайн оценки.

Встроенная оценка дает несколько преимуществ. Например:

  • Он может поддерживать большие модели оценки и наборы оценочных данных, которые не может вместить одна задача.
  • Результаты оценки могут быть использованы для принятия решений для обучения следующей эпохе.

Есть два способа реализовать встроенную оценку: прямая оценка и распределенная оценка.

  • Прямая оценка: Для небольших моделей и наборов данных оценки, координатор может запустить оценку непосредственно на распределенной модели с набором данных об оценке координатора:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Распределенная оценка: Для больших моделей или наборов данных, которые неосуществимы для работы непосредственно на координаторе, координатор задача может распределять задачи оценки для рабочих через ClusterCoordinator.schedule / ClusterCoordinator.join методы:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Оценка коляски

Другой метод называется оценка со стороны автомобиля , где вы создаете специальную задачу оценщик , который неоднократно читает контрольно - пропускные пункты и проходит оценку на последней контрольной точки. Это позволяет вашей тренировочной программе завершиться раньше, если вам не нужно менять тренировочный цикл на основе результатов оценки. Однако для запуска оценки требуется дополнительная задача оценщика и периодические контрольные точки. Ниже приведен возможный цикл оценки коляски:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Кластеры в реальном мире

В реальной производственной среде вы будете запускать все задачи в разных процессах на разных машинах. Самый простой способ настроить кластер информации по каждой задаче , чтобы установить "TF_CONFIG" переменные среды и использовать tf.distribute.cluster_resolver.TFConfigClusterResolver для разбора "TF_CONFIG" .

Для общего описания о "TF_CONFIG" переменных сред, относится к распределенному учебному руководству.

Если начать подготовку задачи , используя Kubernetes или другие шаблоны конфигурации, то весьма вероятно , что эти шаблоны уже установили “TF_CONFIG" для вас.

Установите "TF_CONFIG" переменную окружения

Предположим , у вас есть 3 рабочих и 2 сервера параметров, то "TF_CONFIG" работника 1 может быть:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" оценщика может быть:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" часть в указанной выше "TF_CONFIG" строка для оценщика не является обязательным.

Если вы используете один и тот же двоичный файл для всех задач

Если вы предпочитаете запускать все эти задачи, используя один двоичный файл, вам нужно будет в самом начале разрешить вашей программе разветвляться на разные роли:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Следующий код запускает сервер TensorFlow и ждет:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Обработка сбоя задачи

Сбой рабочего

tf.distribute.experimental.coordinator.ClusterCoordinator или Model.fit обеспечивают встроенную отказоустойчивость для отказа работника. После восстановления работника, ранее при условии , функция набора данных (либо ClusterCoordinator.create_per_worker_dataset для пользовательского цикла обучения, или tf.keras.utils.experimental.DatasetCreator для Model.fit ) будет вызвана на рабочих , чтобы заново создать наборы данных.

Ошибка сервера параметров или координатора

Тем не менее, когда координатор видит ошибку сервера параметра, он будет поднимать UnavailableError или AbortedError немедленно. В этом случае вы можете перезапустить координатор. Сам координатор тоже может стать недоступным. Поэтому рекомендуется использовать определенные инструменты, чтобы не потерять прогресс в обучении:

  • Для Model.fit , вы должны использовать BackupAndRestore обратный вызов, который обрабатывает сохранение и восстановление прогресса автоматически. См возвраты и учебный раздел выше для примера.

  • Для пользовательского цикла обучения вы должны периодически проверять переменные модели и загружать переменные модели из контрольной точки, если таковые имеются, до начала обучения. Обучение прогресс может быть выведено примерно от optimizer.iterations если оптимизатор контрольная точка:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Извлечение RemoteValue

Извлечение RemoteValue гарантированно успешной , если функция выполнена успешно. Это связано с тем, что в настоящее время возвращаемое значение сразу же копируется в координатор после выполнения функции. Если во время копирования произойдет сбой какого-либо рабочего, функция будет повторена на другом доступном работнике. Поэтому, если вы хотите оптимизировать производительность, вы можете запланировать функции без возвращаемого значения.

Отчет об ошибках

После того , как координатор видит ошибку , такую как UnavailableError из серверов параметров или других ошибок приложений , таких как InvalidArgument от tf.debugging.check_numerics , он отменит все ожидающие и поставленные в очередь функции , прежде чем поднять ошибку. Получение их соответствующий RemoteValue s поднимет CancelledError .

После возникновения ошибки координатор не будет вызывать ту же ошибку или какую-либо ошибку из отмененных функций.

Улучшение производительности

Есть несколько возможных причин , если вы видите проблемы с производительностью , когда вы тренируетесь с ParameterServerStrategy и ClusterResolver .

Одна из частых причин заключается в том, что серверы параметров имеют несбалансированную нагрузку, а некоторые сильно загруженные серверы параметров достигли предела емкости. Также может быть несколько основных причин. Вот несколько простых методов решения этой проблемы:

  1. Осколок ваши большие модели переменные с помощью указания variable_partitioner при построении ParameterServerStrategy .
  2. По возможности избегайте создания переменной точки доступа, которая требуется всем серверам параметров за один шаг. Так , например, использовать константу скорость обучения или подкласс tf.keras.optimizers.schedules.LearningRateSchedule в оптимизаторах , поскольку поведение по умолчанию является то , что скорость обучения станет переменной помещается на определенном сервере параметров и просил все другими серверами параметров в каждом шаге .
  3. Перемешайте свои большие словари, прежде чем передавать их слоям предварительной обработки Keras.

Еще одна возможная причина проблем с производительностью - координатор. Ваша первая реализация schedule / join является Python на основе и , таким образом , возможно, нарезание резьбы над головой. Также может быть большая задержка между координатором и рабочими. Если это так,

  • Для Model.fit , вы можете установить steps_per_execution аргумент , предусмотренный на Model.compile до значения большего , чем 1.

  • Для пользовательского обучения петли, вы можете упаковать несколько шагов в одну tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

По мере дальнейшей оптимизации библиотеки, мы надеемся, что большинству пользователей в будущем не придется вручную упаковывать шаги.

Кроме того, небольшой трюк для повышения производительности заключается в том, чтобы запланировать функции без возвращаемого значения, как описано в разделе обработки сбоя задачи выше.

Известные ограничения

Большинство известных ограничений уже описано в приведенных выше разделах. В этом разделе содержится сводка.

ParameterServerStrategy вообще

  • os.environment["grpc_fail_fast"]="use_caller" необходим на каждой задаче , включая координатор, чтобы Отказоустойчивость работу должным образом.
  • Обучение синхронного сервера параметров не поддерживается.
  • Обычно для достижения оптимальной производительности необходимо объединить несколько шагов в одну функцию.
  • Она не поддерживается для загрузки saved_model с помощью tf.saved_model.load , содержащей sharded переменных. Обратите внимание, что загрузка такой сохраненной модели с использованием TensorFlow Serving должна работать.
  • Не поддерживается загрузка контрольной точки, содержащей сегментированные переменные слота оптимизатора, в другое количество сегментов.
  • Восстановление после сбоя сервера параметров без перезапуска задачи координатора не поддерживается.
  • Использование tf.lookup.StaticHashTable (который обычно используются некоторыми tf.keras.layers.experimental.preprocessing слоев, такие как IntegerLookup , StringLookup и TextVectorization ) приводит к ресурсам , размещенным на координаторе в это время с сервером обучением параметров. Это влияет на производительность при поиске RPC от рабочих к координатору. В настоящее время это одна из самых приоритетных задач.

Model.fit специфика

  • steps_per_epoch аргумент требуется Model.fit . Вы можете выбрать значение, обеспечивающее подходящие интервалы в эпоху.
  • ParameterServerStrategy не поддерживает пользовательские функции обратного вызова , которые имеют вызовы пакетного уровня для повышения производительности. Вы должны преобразовать эти вызовы в вызовы эпохи уровня с соответствующим выбрал steps_per_epoch , так что они называются каждым steps_per_epoch числа шагов. Встроенные обратные вызовы не затронуты: их вызовы на уровне пакета были изменены для повышения производительности. Поддержка вызовов пакетного уровня для ParameterServerStrategy планируется.
  • По той же причине, в отличие от других стратегий, индикатор выполнения и показатели регистрируются только на границах эпох.
  • run_eagerly не поддерживается.

Особенности пользовательского цикла обучения