Сохраните дату! Google I / O возвращается 18-20 мая Зарегистрируйтесь сейчас
Эта страница переведена с помощью Cloud Translation API.
Switch to English

Обучение работе с сервером параметров

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

Обучение параметрического сервера - это распространенный метод параллельной обработки данных для расширения обучения модели на нескольких машинах. Обучающий кластер сервера параметров состоит из рабочих и серверов параметров. Переменные создаются на серверах параметров, и они читаются и обновляются рабочими на каждом этапе. По умолчанию рабочие читают и обновляют эти переменные независимо, без синхронизации друг с другом. Вот почему иногда обучение параметрическому серверу называется асинхронным обучением.

В обучении сервера параметров TensorFlow 2 используется центральный координатор через класс tf.distribute.experimental.coordinator.ClusterCoordinator .

В этой реализации worker задачи и задачи parameter server запускают tf.distribute.Server которые прослушивают запросы от координатора. Координатор создает ресурсы, отправляет обучающие задания, записывает контрольные точки и устраняет сбои задач.

Мы считаем, что эта архитектура и новый класс ClusterCoordinator обеспечивают более гибкую и простую модель программирования.

ClusterCoordinator

Класс ClusterCoordinator должен работать вместе с объектом tf.distribute.Strategy . Этот объект tf.distribute.Strategy необходим для передачи информации о кластере и используется для определения шага обучения, как мы видели в настраиваемом обучении с MirroredStrategy . Затем объект ClusterCoordinator отправляет выполнение этих шагов обучения удаленным работникам. В настоящее время ClusterCoordinator работает только с tf.distribute.experimental.ParameterServerStrategy .

Самый важный API, предоставляемый объектом ClusterCoordinator - это schedule . API schedule ставит tf.function в tf.function и сразу же возвращает RemoteValue аналогичное RemoteValue . Функции в очереди будут отправлены удаленным работникам в фоновых потоках, а их RemoteValue будут заполнены асинхронно. Поскольку schedule не требует назначения tf.function переданная tf.function может быть выполнена на любом доступном tf.function . Если рабочий процесс, на котором она выполняется, становится недоступным до ее завершения, функция будет повторена на другом доступном работнике. Из-за этого факта и того факта, что выполнение функции не является атомарным, функция может выполняться более одного раза.

Помимо диспетчеризации удаленных функций, ClusterCoordinator также помогает создавать наборы данных для всех рабочих ClusterCoordinator и восстанавливать эти наборы данных, когда работник восстанавливается после сбоя.

Учебное пособие по настройке

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Настройка кластера

Как упоминалось выше, для обучающего кластера сервера параметров требуется задача координатора, которая запускает вашу программу обучения, один или несколько рабочих tf.distribute.Server и задачи сервера параметров, которые запускают серверы tf.distribute.Server , то есть tf.distribute.Server , и, возможно, дополнительную задачу оценки, которая выполняет вспомогательную машину. оценка (см. раздел оценки коляски ниже). Требования для их установки:

  • Задача координатора должна знать адреса и порты всех других серверов TensorFlow, кроме оценщика.
  • Рабочие и серверы параметров должны знать, какой порт им нужно слушать. Для простоты мы обычно передаем полную информацию о кластере при создании серверов TensorFlow для этих задач.
  • Задаче оценщика не обязательно знать настройку обучающего кластера. Если это так, он не должен пытаться подключиться к обучающему кластеру.
  • Рабочие и серверы параметров должны иметь типы задач «worker» и «ps» соответственно. Координатор должен использовать «руководитель» в качестве типа задачи по устаревшим причинам.

В этом руководстве мы создадим внутрипроцессный кластер, чтобы все обучение сервера параметров можно было запустить в colab. В следующем разделе мы расскажем, как создавать настоящие кластеры .

Внутрипроцессный кластер

В этом руководстве мы заранее запустим несколько серверов TensorFlow и подключимся к ним позже:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Обучение с помощью настраиваемого цикла обучения

Пользовательский цикл обучения с tf.distribute.Strategy обеспечивает большую гибкость для определения циклов обучения. В настоящее время для обучения сервера параметров в TensorFlow 2 поддерживается только настраиваемый цикл обучения. Здесь мы используем ParameterServerStrategy для определения шага обучения, а затем используем ClusterCoordinator для отправки выполнения шагов обучения удаленным работникам.

Создайте ParameterServerStrategy

Чтобы написать шаг обучения в настраиваемом цикле обучения, первым делом необходимо создать ParameterServerStrategy . Мы объясним variable_partitioner позже.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

Затем вы создадите модель, определите набор данных и пошаговую функцию, как мы видели в цикле обучения с другими tf.distribute.Strategy . Вы можете найти более подробную информацию в этом руководстве . Давайте создадим эти компоненты, выполнив следующие шаги:

Настроить данные

Сначала напишите функцию, которая создает набор данных, который включает логику предварительной обработки, реализованную слоями предварительной обработки Keras. Мы создадим эти слои вне dataset_fn но применим преобразование внутри dataset_fn поскольку вы dataset_fn в tf.function которая не позволяет создавать переменные внутри него.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Сгенерируйте примеры игрушек в наборе данных:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Затем мы создаем набор обучающих данных, завернутый в dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Построить модель

Во-вторых, мы создаем модель и другие объекты. Обязательно создайте все переменные в strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Определите шаг обучения

В-третьих, создайте шаг обучения, завернутый в tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

В приведенной выше функции шага вызов strategy.run и strategy.reduce в step_fn полезен для поддержки графических процессоров или нескольких рабочих реплик в будущем, хотя на данный момент они имеют тривиальную реализацию.

Отправка инструкций по обучению удаленным сотрудникам

После того, как все вычисления будут определены с помощью ParameterServerStrategy , мы будем использовать класс ClusterCoordinator для создания ресурсов и распространения шагов обучения для удаленных сотрудников.

Давайте сначала создадим объект ClusterCoordinator и передадим объект стратегии:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Затем мы создаем набор данных для каждого рабочего и итератор. В per_worker_dataset_fn ниже per_worker_dataset_fn обернуть dataset_fn в strategy.distribute_datasets_from_function необязательно, но это позволит беспрепятственно поддерживать эффективную предварительную выборку для графических процессоров в будущем, когда графические процессоры поддерживаются ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Последний шаг - распределить вычисления между удаленными работниками по schedule . Метод schedule помещает в очередь tf.function и tf.function возвращает RemoteValue аналогичное RemoteValue . Функции в очереди будут отправлены удаленным работникам в фоновых потоках, а RemoteValue будет заполняться асинхронно. Метод join можно использовать для ожидания выполнения всех запланированных функций.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Вот как вы можете получить результат RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

Как вариант, вы можете запустить все шаги и что-то сделать, ожидая завершения:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Полный рабочий процесс обучения и обслуживания для этого конкретного примера см. В этом тесте .

Подробнее о создании набора данных

Набор данных в приведенном выше коде создается с использованием API create_per_worker_dataset . Он создает один набор данных для каждого рабочего и возвращает объект-контейнер. Вы можете вызвать для него метод iter чтобы создать итератор для каждого рабочего. Итератор для каждого рабочего содержит один итератор для каждого рабочего, и соответствующий фрагмент рабочего будет подставлен во входной аргумент функции, переданной методу schedule до того, как функция будет выполнена на конкретном работнике.

В настоящее время метод schedule предполагает, что воркеры эквивалентны, и, следовательно, предполагает, что наборы данных для разных воркеров одинаковы, за исключением того, что они могут перетасовываться по-разному, если они содержат операцию dataset.shuffle . Из-за этого мы также рекомендуем повторять наборы данных бесконечно и планировать конечное количество шагов вместо того, чтобы полагаться на OutOfRangeError из набора данных.

Еще одно важное замечание: tf.data данных tf.data не поддерживают неявную сериализацию и десериализацию через границы задач. Поэтому важно создать весь набор данных внутри функции, переданной в create_per_worker_dataset .

Переменный сегмент

Шардинг переменных относится к разделению переменной на несколько меньших переменных. Мы называем эти меньшие переменные осколком s. Переменное сегментирование может быть полезно для распределения сетевой нагрузки при доступе к этим сегментам. Также полезно распределить вычисление и хранение нормальной переменной между несколькими серверами параметров.

Чтобы включить сегментирование переменных, вы можете передать variable_partitioner при создании объекта ParameterServerStrategy . variable_partitioner будет вызываться каждый раз, когда создается переменная, и ожидается, что он вернет количество сегментов по каждому измерению переменной. tf.distribute.experimental.partitioners.FixedShardsPartitioner variable_partitioner , например tf.distribute.experimental.partitioners.FixedShardsPartitioner .

В приведенном выше примере мы используем FixedShardsPartitioner который разделит все переменные на два осколка, и каждый осколок будет назначен различным серверам параметров:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Когда передается variable_partitioner и если вы создаете переменную непосредственно в strategy.scope() , она станет типом контейнера со свойством variables которое обеспечивает доступ к списку шардов. В большинстве случаев этот контейнер будет автоматически преобразован в тензор путем объединения всех осколков. В результате его можно использовать как обычную переменную. С другой стороны, некоторые методы tf.nn.embedding_lookup такие как tf.nn.embedding_lookup обеспечивают эффективную реализацию для этого типа контейнера, и в этих методах будет исключено автоматическое объединение.

Дополнительные сведения см. В строке документации API ParameterServerStrategy .

Оценка

Существует несколько способов определить и запустить цикл оценки в распределенном обучении. У каждого есть свои плюсы и минусы, как описано ниже. Если у вас нет предпочтений, рекомендуется использовать встроенный метод оценки.

Встроенная оценка

В этом методе координатор чередует обучение и оценку, поэтому мы называем это встроенной оценкой. Есть несколько преимуществ встроенной оценки. Например, он может поддерживать большие модели оценки и наборы оценочных данных, которые не может вместить одна задача. В качестве другого примера результаты оценки могут быть использованы для принятия решений для обучения следующей эпохе.

Есть два способа реализовать встроенную оценку:

  • Прямая оценка - для небольших моделей и наборов данных оценки координатор может запустить оценку непосредственно в распределенной модели с набором данных оценки на координаторе:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Распределенная оценка - для больших моделей или наборов данных, которые невозможно запустить непосредственно на координаторе, задача координатора может распределять задачи оценки среди рабочих с помощью методов schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Оценка коляски

Другой метод называется оценкой боковой машины, который заключается в создании специальной задачи оценщика, которая многократно считывает контрольные точки и выполняет оценку на последней контрольной точке. Это позволяет вашей тренировочной программе завершиться раньше, если вам не нужно менять тренировочный цикл на основе результатов оценки. Однако для запуска оценки требуется дополнительная задача оценщика и периодические контрольные точки. Ниже приведен возможный цикл оценки коляски:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Кластеры в реальном мире

В реальной производственной среде вы будете запускать все задачи в разных процессах на разных машинах. Самый простой способ настроить информацию о кластере для каждой задачи - установить переменные среды «TF_CONFIG» и использовать TFConfigClusterResolver для анализа «TF_CONFIG». Общее описание переменных среды "TF_CONFIG" см. В распределенном учебном руководстве .

Если вы начинаете свои учебные задачи с помощью Kubernetes или других шаблонов конфигурации, весьма вероятно, что в этих шаблонах уже установлено значение «TF_CONFIG».

Установите переменную окружения «TF_CONFIG»

Предположим, у вас есть 3 рабочих и 2 сервера параметров, «TF_CONFIG» рабочего 1 может быть:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

«TF_CONFIG» оценщика может быть:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

Часть «кластер» в приведенной выше строке «TF_CONFIG» для оценщика является необязательной.

Если вы используете один и тот же двоичный файл для всех задач

Если вы предпочитаете запускать все эти задачи, используя один двоичный файл, вам нужно будет в самом начале разрешить вашей программе разветвляться на разные роли:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

Следующий код запускает сервер TensorFlow и ждет:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Обработка сбоя задачи

Сбой рабочего

Как упоминалось выше, ClusterCoordinator имеет встроенную отказоустойчивость в случае сбоя работника. После восстановления работника соответствующий фрагмент наборов данных, созданных create_per_worker_dataset , которые все еще находятся в области видимости, будет воссоздан путем вызова его исходного dataset_fn переданного create_per_worker_dataset .

Ошибка сервера параметров или координатора

Однако, когда координатор видит ошибку сервера параметров, он немедленно AbortedError UnavailableError или AbortedError . В этом случае вы можете перезапустить координатор. Сам координатор тоже может стать недоступным. Поэтому, чтобы не потерять значительную часть прогресса обучения, важно периодически проверять переменные модели и загружать переменные модели из контрольной точки, если таковые имеются, до начала обучения. Прогресс обучения можно приблизительно вывести из optimizer.iterations если оптимизатор установлен контрольной точкой.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Получение RemoteValue

Получение RemoteValue гарантированно будет успешным, если функция будет выполнена успешно. Это связано с тем, что в настоящее время возвращаемое значение сразу же копируется в координатор после выполнения функции. Если во время копирования произойдет сбой какого-либо рабочего, функция будет повторена на другом доступном работнике. Поэтому, если вы хотите оптимизировать производительность, вы можете запланировать функции без возвращаемого значения.

Отчет об ошибках

Как только координатор видит ошибку, такую ​​как UnavailableError от серверов параметров, или другие ошибки приложения, такие как InvalidArgument из tf.debugging.check_numerics , он отменяет все ожидающие и поставленные в очередь функции, прежде чем tf.debugging.check_numerics ошибку. Получение соответствующих RemoteValue s вызовет CancelledError .

После возникновения ошибки координатор не будет вызывать ту же ошибку или какую-либо ошибку из отмененных функций.

Улучшение производительности

Есть несколько возможных причин, по которым вы видите проблемы с производительностью при обучении с ParameterServerStrategy и ClusterResolver .

Одна из частых причин заключается в том, что серверы параметров имеют несбалансированную нагрузку, а некоторые сильно загруженные серверы параметров достигли предела емкости. Также может быть несколько основных причин. Вот несколько простых методов решения этой проблемы:

  1. сегментируйте ваши большие переменные модели, указав variable_partitioner при создании ParameterServerStrategy .
  2. по возможности избегайте создания переменной точки доступа, которая требуется для всех серверов параметров за один шаг. Например, используйте постоянную скорость обучения или подкласс tf.keras.optimizers.schedules.LearningRateSchedule в оптимизаторах, поскольку поведение по умолчанию tf.keras.optimizers.schedules.LearningRateSchedule что скорость обучения станет переменной, размещаемой на определенном сервере параметров и запрашиваемой всеми другими серверами параметров на каждом этапе. .
  3. перемешайте ваши большие словари, прежде чем передавать их слоям предварительной обработки Keras.

Еще одна возможная причина проблем с производительностью - координатор. Наша первая реализация schedule / join основана на Python и, следовательно, может иметь накладные расходы на потоки. Также может быть большая задержка между координатором и рабочими. В этом случае вы можете tf.function несколько шагов в одну tf.function :

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Мы продолжим оптимизацию координатора, и, надеюсь, большинству пользователей не придется вручную упаковывать шаги в будущем.

Кроме того, небольшой трюк для повышения производительности заключается в том, чтобы запланировать функции без возвращаемого значения, как объяснено в разделе обработки сбоя задачи выше.

Известные ограничения

Большинство известных ограничений описано в разделах выше. Вот краткое изложение:

  • os.environment["grpc_fail_fast"]="use_caller" необходим для каждой задачи, включая координатора, чтобы обеспечить правильную работу отказоустойчивости.
  • Рабочие GPU не поддерживаются.
  • Обучение синхронного сервера параметров не поддерживается.
  • ParameterServerStrategy не работает с API compile и fit Keras.
  • ClusterCoordinator.schedule не поддерживает гарантии посещения для набора данных.
  • Когда используется ClusterCoordinator.create_per_worker_dataset , весь набор данных должен быть создан внутри переданной ему функции.
  • Обычно для достижения оптимальной производительности необходимо объединить несколько шагов в одну функцию.
  • Не поддерживается загрузка сохраненной_модели через tf.saved_model.load содержащую сегментированные переменные. Обратите внимание, что загрузка такой сохраненной модели с использованием TensorFlow Serving должна работать.
  • Не поддерживается загрузка контрольной точки, содержащей сегментированные переменные слота оптимизатора, в другое количество сегментов.
  • Восстановление после сбоя сервера параметров без перезапуска задачи координатора не поддерживается.