Обучение нескольких сотрудников с Keras

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

В этом руководстве показано, как выполнять распределенное обучение с несколькими работниками с помощью модели Keras и API Model.fit с использованием API tf.distribute.Strategy , в частности класса tf.distribute.MultiWorkerMirroredStrategy . С помощью этой стратегии модель Keras, разработанная для работы с одним рабочим, может беспрепятственно работать на нескольких рабочих с минимальными изменениями кода.

Для тех, кто заинтересован в более глубоком понимании API tf.distribute.Strategy , доступно руководство « Распределенное обучение в TensorFlow » для обзора стратегий распространения, поддерживаемых TensorFlow.

Чтобы узнать, как использовать MultiWorkerMirroredStrategy с Keras и настраиваемым циклом обучения, см. Пользовательский цикл обучения с Keras и MultiWorkerMirroredStrategy .

Обратите внимание, что целью этого руководства является демонстрация минимального примера с несколькими рабочими процессами с двумя рабочими процессами.

Настраивать

Начните с некоторых необходимых импортов:

import json
import os
import sys

Перед импортом TensorFlow внесите несколько изменений в среду:

  1. Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие процессы пытаются использовать один и тот же графический процессор. В реальном приложении каждый работник будет работать на отдельной машине.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Сбросьте переменную окружения TF_CONFIG (подробнее об этом вы узнаете позже):
os.environ.pop('TF_CONFIG', None)
  1. Убедитесь, что текущий каталог находится на пути Python — это позволит блокноту позже импортировать файлы, записанные %%writefile :
if '.' not in sys.path:
  sys.path.insert(0, '.')

Теперь импортируйте TensorFlow:

import tensorflow as tf

Набор данных и определение модели

Затем создайте файл mnist_setup.py с простой настройкой модели и набора данных. Этот файл Python будет использоваться рабочими процессами в этом руководстве:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Обучение модели на одном работнике

Попробуйте обучить модель на небольшом количестве эпох и понаблюдайте за результатами одного воркера , чтобы убедиться, что все работает правильно. По мере обучения потери должны снижаться, а точность увеличиваться.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Многопользовательская конфигурация

Теперь давайте войдем в мир обучения нескольких сотрудников.

Кластер с заданиями и задачами

В TensorFlow распределенное обучение включает: 'cluster' с несколькими заданиями, и каждое из заданий может иметь одну или несколько 'task' .

Вам понадобится переменная среды конфигурации TF_CONFIG для обучения на нескольких машинах, каждая из которых может иметь свою роль. TF_CONFIG — это строка JSON, используемая для указания конфигурации кластера для каждого рабочего процесса, являющегося частью кластера.

Переменная TF_CONFIG состоит из двух компонентов: 'cluster' и 'task' .

  • 'cluster' одинаков для всех работников и предоставляет информацию об обучающем кластере, который представляет собой список, состоящий из различных типов должностей, таких как 'worker' или 'chief' .

    • При обучении нескольких рабочих с tf.distribute.MultiWorkerMirroredStrategy обычно есть один 'worker' , который берет на себя обязанности, такие как сохранение контрольной точки и запись сводного файла для TensorBoard, в дополнение к тому, что делает обычный 'worker' . Такой 'worker' называется главным рабочим (с названием должности 'chief' ).
    • Обычно 'chief' назначается 'index' 0 (собственно, так реализован tf.distribute.Strategy ).
  • 'task' предоставляет информацию о текущей задаче и отличается для каждого работника. Он указывает 'type' и 'index' этого работника.

Ниже приведен пример конфигурации:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Вот тот же TF_CONFIG , сериализованный как строка JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Обратите внимание, что tf_config — это просто локальная переменная в Python. Чтобы иметь возможность использовать его для обучающей конфигурации, этот словарь необходимо сериализовать как JSON и поместить в переменную среды TF_CONFIG .

В приведенном выше примере конфигурации вы устанавливаете 'type' задачи на 'worker' а 'index' задачи — на 0 . Поэтому эта машина первая рабочая. Он будет назначен 'chief' работником и будет выполнять больше работы, чем другие.

В целях иллюстрации в этом руководстве показано, как можно настроить переменную TF_CONFIG с двумя рабочими процессами на localhost хосте.

На практике вы должны создать несколько воркеров на внешних IP-адресах/портах и ​​соответственно установить переменную TF_CONFIG для каждого воркера.

В этом уроке вы будете использовать двух рабочих:

  • TF_CONFIG первого («начального 'chief' ) TF_CONFIG показан выше.
  • Для второго воркера вы установите tf_config['task']['index']=1

Переменные среды и подпроцессы в блокнотах

Подпроцессы наследуют переменные среды от своего родителя.

Например, вы можете установить переменную среды в этом процессе Jupyter Notebook следующим образом:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Затем вы можете получить доступ к переменной среды из подпроцессов:

echo ${GREETINGS}
Hello TensorFlow!

В следующем разделе вы будете использовать аналогичный метод для передачи TF_CONFIG рабочим подпроцессам. В реальном сценарии вы бы не запускали свои задания таким образом, но в этом примере этого достаточно.

Выберите правильную стратегию

В TensorFlow есть две основные формы распределенного обучения:

  • Синхронное обучение , при котором этапы обучения синхронизируются между рабочими процессами и репликами, а также
  • Асинхронное обучение , где этапы обучения строго не синхронизированы (например, обучение сервера параметров ).

В этом руководстве показано, как выполнять синхронное обучение нескольких сотрудников с использованием экземпляра tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих процессов. Он использует CollectiveOps , операцию TensorFlow для коллективной коммуникации, для объединения градиентов и синхронизации переменных. Руководство tf.distribute.Strategy содержит более подробную информацию об этой стратегии.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy предоставляет несколько реализаций с помощью параметра tf.distribute.experimental.CommunicationOptions : 1) RING реализует коллективы на основе кольца, используя gRPC в качестве уровня связи между узлами; 2) NCCL использует библиотеку коллективных коммуникаций NVIDIA для реализации коллективов; и 3) AUTO откладывает выбор до среды выполнения. Лучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также сетевого соединения в кластере.

Обучите модель

С интеграцией tf.distribute.Strategy API в tf.keras единственное изменение, которое вы сделаете для распространения обучения среди нескольких сотрудников, — это включение построения модели и model.compile() внутри strategy.scope() . Область действия стратегии распределения определяет, как и где создаются переменные, а в случае MultiWorkerMirroredStrategy создаваемые переменные являются MirroredVariable s, и они реплицируются на каждом из рабочих процессов.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Чтобы на самом деле работать с MultiWorkerMirroredStrategy , вам нужно запустить рабочие процессы и передать им TF_CONFIG .

Как и файл mnist_setup.py , написанный ранее, вот main.py , который будет запускать каждый из рабочих процессов:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

В приведенном выше фрагменте кода обратите внимание, что global_batch_size , который передается в Dataset.batch , имеет значение per_worker_batch_size * num_workers . Это гарантирует, что каждый рабочий процесс обрабатывает пакеты примеров per_worker_batch_size независимо от количества рабочих процессов.

Текущий каталог теперь содержит оба файла Python:

ls *.py
main.py
mnist_setup.py

Итак, json-сериализуйте TF_CONFIG и добавьте его в переменные среды:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь вы можете запустить рабочий процесс, который будет запускать main.py и использовать TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Есть несколько замечаний по поводу приведенной выше команды:

  1. Он использует %%bash , который является «волшебством» ноутбука для запуска некоторых команд bash.
  2. Он использует флаг --bg для запуска процесса bash в фоновом режиме, потому что этот рабочий процесс не завершится. Он ждет всех рабочих перед тем, как начать.

Фоновый рабочий процесс не будет печатать выходные данные в этот блокнот, поэтому &> перенаправляет свои выходные данные в файл, чтобы вы могли позже проверить, что произошло, в файле журнала.

Итак, подождите несколько секунд, пока процесс запустится:

import time
time.sleep(10)

Теперь проверьте, что было выведено в файл журнала рабочего процесса:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

В последней строке файла журнала должно быть написано: Started server with target: grpc://localhost:12345 . Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить работу.

Итак, обновите tf_config для второго рабочего процесса:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Запустите второго рабочего. Это запустит обучение, так как все рабочие активны (поэтому нет необходимости фонировать этот процесс):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Если вы перепроверите журналы, написанные первым воркером, вы узнаете, что он участвовал в обучении этой модели:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Неудивительно, что это работало медленнее , чем тестовый запуск в начале этого руководства.

Запуск нескольких воркеров на одной машине только увеличивает накладные расходы.

Цель здесь заключалась не в том, чтобы улучшить время обучения, а только в том, чтобы привести пример обучения нескольких работников.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Углубленное обучение нескольких сотрудников

Итак, вы узнали, как выполнить базовую настройку нескольких рабочих.

В оставшейся части руководства вы подробно узнаете о других факторах, которые могут быть полезны или важны для реальных случаев использования.

Разделение набора данных

При обучении нескольких сотрудников сегментирование набора данных необходимо для обеспечения конвергенции и производительности.

В примере из предыдущего раздела используется автоматическое разбиение по умолчанию, предоставляемое API tf.distribute.Strategy . Вы можете управлять сегментированием, установив tf.data.experimental.AutoShardPolicy в tf.data.experimental.DistributeOptions .

Чтобы узнать больше об автошардинге , обратитесь к руководству по распределенному вводу .

Вот краткий пример того, как отключить автоматическое сегментирование, чтобы каждая реплика обрабатывала каждый пример ( не рекомендуется ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Оценка

Если вы передадите validation_data в Model.fit , он будет чередоваться между обучением и оценкой для каждой эпохи. Оценка, использующая validation_data , распределяется между одним и тем же набором рабочих, а результаты оценки объединяются и доступны для всех рабочих.

Как и в случае с обучением, набор проверочных данных автоматически сегментируется на уровне файла. Вам нужно установить глобальный размер пакета в наборе данных проверки и установить validation_steps .

Повторный набор данных также рекомендуется для оценки.

Кроме того, вы также можете создать другую задачу, которая периодически считывает контрольные точки и запускает оценку. Это то, что делает оценщик. Но это не рекомендуемый способ выполнения оценки, поэтому его детали опущены.

Представление

Теперь у вас есть модель Keras, настроенная для запуска в нескольких рабочих процессах с помощью MultiWorkerMirroredStrategy .

Чтобы настроить производительность обучения нескольких сотрудников, вы можете попробовать следующее:

  • tf.distribute.MultiWorkerMirroredStrategy предоставляет несколько реализаций коллективной коммуникации :

    • RING реализует коллективы на основе кольца, используя gRPC в качестве уровня связи между узлами.
    • NCCL использует библиотеку коллективных коммуникаций NVIDIA для реализации коллективов.
    • AUTO откладывает выбор до среды выполнения.

    Лучший выбор коллективной реализации зависит от количества графических процессоров, типа графических процессоров и сетевого соединения в кластере. Чтобы переопределить автоматический выбор, укажите параметр communication_options конструктора MultiWorkerMirroredStrategy . Например:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Приведите переменные к tf.float , если это возможно:

    • Официальная модель ResNet включает пример того, как это можно сделать.

Отказоустойчивость

При синхронном обучении кластер выйдет из строя, если один из рабочих выйдет из строя, а механизма восстановления после сбоя не существует.

Использование Keras с tf.distribute.Strategy преимущество отказоустойчивости в случаях, когда рабочие умирают или иным образом нестабильны. Это можно сделать, сохранив состояние обучения в распределенной файловой системе по вашему выбору, чтобы после перезапуска экземпляра, который ранее дал сбой или был вытеснен, состояние обучения восстанавливалось.

Когда рабочий становится недоступным, другие рабочие перестают работать (возможно, после тайм-аута). В таких случаях необходимо перезапустить недоступный воркер, а также другие воркеры, вышедшие из строя.

Обратный вызов ModelCheckpoint

Обратный вызов ModelCheckpoint больше не обеспечивает отказоустойчивость. Вместо этого используйте обратный вызов BackupAndRestore .

Обратный вызов ModelCheckpoint по-прежнему можно использовать для сохранения контрольных точек. Но при этом, если обучение было прервано или успешно завершено, для продолжения обучения с контрольной точки пользователь должен загрузить модель вручную.

При желании пользователь может сохранить и восстановить модель/веса вне обратного вызова ModelCheckpoint .

Сохранение и загрузка модели

Чтобы сохранить вашу модель с помощью model.save или tf.saved_model.save , место сохранения должно быть разным для каждого работника.

  • Для неглавных работников вам нужно будет сохранить модель во временный каталог.
  • Для начальника вам нужно будет сохранить в указанный каталог модели.

Временные каталоги рабочего процесса должны быть уникальными, чтобы предотвратить ошибки, возникающие из-за того, что несколько рабочих процессов пытаются выполнить запись в одно и то же место.

Модель, сохраненная во всех каталогах, идентична, и обычно для восстановления или обслуживания следует ссылаться только на модель, сохраненную руководителем.

У вас должна быть некоторая логика очистки, которая удаляет временные каталоги, созданные рабочими процессами после завершения обучения.

Причина экономии на начальнике и рабочих одновременно заключается в том, что вы можете агрегировать переменные во время контрольной точки, что требует участия начальника и рабочих в протоколе связи allreduce. С другой стороны, разрешение руководителю и рабочим сохранять файлы в одном и том же каталоге модели приведет к ошибкам из-за конкуренции.

Используя MultiWorkerMirroredStrategy , программа запускается на каждом воркере, и чтобы узнать, является ли текущий воркер главным, она использует объект распознавателя кластера, который имеет атрибуты task_type и task_id :

  • task_type сообщает вам, что такое текущая работа (например 'worker' ).
  • task_id сообщает вам идентификатор работника.
  • Воркер с task_id == 0 назначается главным воркером.

В приведенном ниже фрагменте кода функция write_filepath предоставляет путь к файлу для записи, который зависит от task_id :

  • Для главного воркера (с task_id == 0 ) пишет в исходный путь к файлу.
  • Для других воркеров он создает временный каталог — temp_dir — с task_id в пути к каталогу для записи:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Теперь вы готовы экономить:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Как описано выше, в дальнейшем модель должна загружаться только из пути, в котором сохранен главный, поэтому давайте удалим временные, сохраненные неглавными работниками:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Теперь, когда пришло время загрузки, воспользуемся удобным API tf.keras.models.load_model и продолжим дальнейшую работу.

Здесь предположим, что для загрузки и продолжения обучения используется только один рабочий процесс, и в этом случае вы не вызываете tf.keras.models.load_model в другом strategy.scope() (обратите внимание, что strategy = tf.distribute.MultiWorkerMirroredStrategy() , как определено ранее ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Сохранение и восстановление контрольной точки

С другой стороны, контрольные точки позволяют сохранять веса вашей модели и восстанавливать их без сохранения всей модели.

Здесь вы создадите одну tf.train.Checkpoint , которая отслеживает модель, которой управляет tf.train.CheckpointManager , так что сохраняется только последняя контрольная точка:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

После настройки CheckpointManager вы готовы сохранять и удалять контрольные точки, сохраненные неглавными работниками:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Теперь, когда вам нужно восстановить модель, вы можете найти последнюю сохраненную контрольную точку с помощью удобной функции tf.train.latest_checkpoint . После восстановления чекпоинта можно продолжить обучение.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Обратный вызов BackupAndRestore

Обратный вызов tf.keras.callbacks.BackupAndRestore обеспечивает функциональность отказоустойчивости путем резервного копирования модели и текущего номера эпохи во временном файле контрольной точки с аргументом backup_dir для BackupAndRestore . Это делается в конце каждой эпохи.

Как только задания прерываются и перезапускаются, обратный вызов восстанавливает последнюю контрольную точку, и обучение продолжается с начала прерванной эпохи. Любое частичное обучение, уже выполненное в незавершенную эпоху до прерывания, будет отброшено, чтобы оно не повлияло на конечное состояние модели.

Чтобы использовать его, предоставьте экземпляр tf.keras.callbacks.BackupAndRestore при вызове Model.fit .

С MultiWorkerMirroredStrategy , если рабочий процесс прерывается, весь кластер приостанавливается до тех пор, пока прерванный рабочий процесс не будет перезапущен. Другие рабочие процессы также будут перезапущены, а прерванный рабочий процесс снова присоединится к кластеру. Затем каждый рабочий процесс считывает ранее сохраненный файл контрольной точки и восстанавливает свое прежнее состояние, тем самым позволяя кластеру снова синхронизироваться. Затем обучение продолжается.

Обратный вызов BackupAndRestore использует CheckpointManager для сохранения и восстановления состояния обучения, который создает файл с именем контрольная точка, который отслеживает существующие контрольные точки вместе с последней. По этой причине backup_dir не следует повторно использовать для хранения других контрольных точек, чтобы избежать конфликта имен.

В настоящее время обратный вызов BackupAndRestore поддерживает обучение одного работника без стратегии — MirroredStrategy — и обучение нескольких работников с помощью MultiWorkerMirroredStrategy .

Ниже приведены два примера как для обучения нескольких работников, так и для обучения одного работника:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Если вы проверите каталог backup_dir , который вы указали в BackupAndRestore , вы можете заметить некоторые временно сгенерированные файлы контрольных точек. Эти файлы необходимы для восстановления ранее потерянных экземпляров, и они будут удалены библиотекой в ​​конце Model.fit после успешного завершения вашего обучения.

Дополнительные ресурсы

  1. Руководство по распределенному обучению в TensorFlow содержит обзор доступных стратегий распространения.
  2. В учебном пособии « Пользовательский цикл обучения с Keras и MultiWorkerMirroredStrategy» показано, как использовать MultiWorkerMirroredStrategy с Keras и пользовательским циклом обучения.
  3. Ознакомьтесь с официальными моделями , многие из которых можно настроить для запуска нескольких стратегий распространения.
  4. Руководство Повышение производительности с помощью tf.function содержит информацию о других стратегиях и инструментах, таких как профилировщик TensorFlow , который вы можете использовать для оптимизации производительности своих моделей TensorFlow.