Создание собственного алгоритма федеративного обучения

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Прежде чем мы начнем

Прежде чем мы начнем, выполните следующее, чтобы убедиться, что ваша среда настроена правильно. Если вы не видите приветствие, пожалуйста , обратитесь к установке руководству для получения инструкций.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
import tensorflow as tf
import tensorflow_federated as tff

В классификации изображений и текста поколения учебников, мы узнали , как создать модель и данные трубопроводов для федеративного обучения (FL), и выполняется федеративного обучение через tff.learning API слой TFF.

Это только верхушка айсберга, когда дело доходит до исследований FL. В этом уроке мы рассмотрим , как реализовать федеративных алгоритмы обучения без откладывая на tff.learning API. Мы стремимся выполнить следующее:

Цели:

  • Понимать общую структуру алгоритмов федеративного обучения.
  • Исследуйте Федеративное ядро TFF.
  • Используйте Federated Core для прямой реализации Federated Averaging.

В то время как этот учебник является самодостаточным, мы рекомендуем первое чтение классификации изображений и генерацию текста учебники.

Подготовка входных данных

Сначала мы загружаем и предварительно обрабатываем набор данных EMNIST, включенный в TFF. Для получения более подробной информации см классификации изображений учебника.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Для того , чтобы подать набор данных в нашей модели мы выравниваться данные и преобразовать каждый пример в кортеж вида (flattened_image_vector, label) .

NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch of EMNIST data and return a (features, label) tuple."""
    return (tf.reshape(element['pixels'], [-1, 784]), 
            tf.reshape(element['label'], [-1, 1]))

  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

Теперь мы выбираем небольшое количество клиентов и применяем описанную выше предварительную обработку к их наборам данных.

client_ids = sorted(emnist_train.client_ids)[:NUM_CLIENTS]
federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]

Подготовка модели

Мы используем ту же модель , как и в классификации изображений учебника. Эта модель (реализуется через tf.keras ) имеет один скрытый слой, за которым следует SoftMax слоем.

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])

Для того , чтобы использовать эту модель в TFF, мы оборачиваем модель Keras как tff.learning.Model . Это позволяет нам выполнять модели вперед проход в TFF и извлечение модели выходов . Для получения более подробной информации, а также увидеть классификации изображений учебника.

def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

В то время как мы использовали tf.keras создать tff.learning.Model , ПТФ поддерживает гораздо более общие модели. Эти модели имеют следующие соответствующие атрибуты, отражающие вес модели:

  • trainable_variables : итерируемые тензоров , соответствующих обучаемые слои.
  • non_trainable_variables : итерируемые тензоров , соответствующие не-обучаемых слои.

Для наших целей мы будем использовать только trainable_variables . (поскольку у нашей модели только они!).

Создание собственного алгоритма федеративного обучения

В то время как tff.learning API позволяет создавать множество вариантов Федеративного усреднения, существует и другие федеративные алгоритмы , которые не вписываются в эти рамки. Например, вы можете захотеть добавить упорядочению, вырезку или более сложные алгоритмы , такие как федеративное обучение ГАНЫ . Вы также можете быть вместо быть заинтересованы в федеративной аналитике .

Для этих более сложных алгоритмов нам придется написать собственный алгоритм с использованием TFF. Во многих случаях федеративные алгоритмы состоят из 4 основных компонентов:

  1. Шаг широковещательной рассылки от сервера к клиенту.
  2. Шаг обновления локального клиента.
  3. Этап загрузки клиент-сервер.
  4. Шаг обновления сервера.

В TFF, мы обычно представляем собой федеративные алгоритмы как tff.templates.IterativeProcess (которые мы называем просто IterativeProcess по всему). Это класс , который содержит initialize и next функции. Здесь initialize используется для инициализации сервера, а next будет выполнять один раунд связи федеративного алгоритма. Давайте напишем схему того, как должен выглядеть наш итеративный процесс для FedAvg.

Во- первых, мы имеем функцию инициализации , которая просто создает tff.learning.Model , и возвращает его обучаемый веса.

def initialize_fn():
  model = model_fn()
  return model.trainable_variables

Эта функция выглядит хорошо, но, как мы увидим позже, нам нужно будет сделать небольшую модификацию, чтобы сделать ее «вычислением TFF».

Мы также хотим обрисовать next_fn .

def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = client_update(federated_dataset, server_weights_at_client)

  # The server averages these updates.
  mean_client_weights = mean(client_weights)

  # The server updates its model.
  server_weights = server_update(mean_client_weights)

  return server_weights

Мы сосредоточимся на реализации этих четырех компонентов по отдельности. Сначала мы сосредоточимся на частях, которые можно реализовать в чистом TensorFlow, а именно на этапах обновления клиента и сервера.

Блоки TensorFlow

Обновление клиента

Мы будем использовать наш tff.learning.Model сделать обучение клиента по существу так же , как вы бы обучать модель TensorFlow. В частности, мы будем использовать tf.GradientTape для вычисления градиента на пакетах данных, а затем применить этот градиент , используя client_optimizer . Мы ориентируемся только на тренируемые веса.

@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

Обновление сервера

Обновление сервера для FedAvg проще, чем обновление клиента. Мы реализуем «ванильное» федеративное усреднение, в котором мы просто заменяем веса модели сервера средним значением весов модели клиента. Опять же, мы сосредоточимся только на тренируемых весах.

@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

Фрагмент может быть упрощена путем простого возвращения mean_client_weights . Тем не менее, более продвинутые реализации Федеративного использование Усреднения mean_client_weights с более сложными методами, такие как импульс или адаптивность.

Задача: Реализовать версию server_update , которая обновляет веса серверов , чтобы быть серединой model_weights и mean_client_weights. (Примечание: Это своего рода «средней точки» подход аналогичен недавней работе на оптимизатора LOOKAHEAD !).

До сих пор мы писали только чистый код TensorFlow. Так задумано, поскольку TFF позволяет вам использовать большую часть кода TensorFlow, с которым вы уже знакомы. Тем не менее, теперь мы должны определить логику оркестровки, то есть, логика , которая диктует , что вещает сервер клиенту, и то , что клиент загрузки на сервер.

Это потребует Федеративное ядро TFF.

Введение в федеративное ядро

Федеративные Ядро (ФК) представляет собой набор нижних уровня интерфейсов , которые служат в качестве основы для tff.learning API. Однако эти интерфейсы не ограничиваются обучением. На самом деле их можно использовать для аналитики и многих других вычислений над распределенными данными.

На высоком уровне федеративное ядро ​​— это среда разработки, которая позволяет компактно выражать программную логику для объединения кода TensorFlow с операторами распределенной связи (такими как распределенные суммы и широковещательные рассылки). Цель состоит в том, чтобы дать исследователям и практикам явный контроль над распределенной связью в их системах, не требуя деталей реализации системы (таких как указание обмена сетевыми сообщениями точка-точка).

Одним из ключевых моментов является то, что TFF предназначен для сохранения конфиденциальности. Таким образом, он позволяет явно контролировать местонахождение данных, чтобы предотвратить нежелательное накопление данных на централизованном сервере.

Федеративные данные

Ключевым понятием в TFF являются «федеративные данные», которые относятся к набору элементов данных, размещенных на группе устройств в распределенной системе (например, клиентские наборы данных или веса модели сервера). Мы моделируем всю совокупность элементов данных во всех устройствах в качестве единого федеративного значения.

Например, предположим, что у нас есть клиентские устройства, каждое из которых имеет число с плавающей запятой, представляющее температуру датчика. Мы могли бы представить его в качестве федеративного проплывают

federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

Типы объединения определяются типа T его составляющих членов (например. tf.float32 ) и группа G устройств. Мы сосредоточимся на тех случаях , когда G является либо tff.CLIENTS или tff.SERVER . Такой федеративный тип представлен в виде {T}@G , как показано ниже.

str(federated_float_on_clients)
'{float32}@CLIENTS'

Почему мы так заботимся о местах размещения? Основная цель TFF — дать возможность писать код, который можно развернуть в реальной распределенной системе. Это означает, что очень важно понять, какие подмножества устройств выполняют какой код и где находятся различные части данных.

ПТФ фокусируется на трех вещах: данные, где размещены данные, и как данные трансформируются. Первые два инкапсулируется в федеративных типах, в то время как последний воплощен в федеративных вычислениях.

Федеративные вычисления

ПТФ является сильно типизированным функциональным программированием среды чья основных единиц федеративных вычисления. Это части логики, которые принимают объединенные значения в качестве входных данных и возвращают объединенные значения в качестве выходных данных.

Например, предположим, что мы хотим усреднить температуру на наших клиентских датчиках. Мы могли бы определить следующее (используя наш федеративный поплавок):

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)

Вы можете спросить, как это отличается от tf.function декоратора в TensorFlow? Ключевой ответ , что код , сгенерированный tff.federated_computation ни TensorFlow , ни Python кода; Это спецификация распределенной системы во внутреннем независимой от платформы языка клея.

Хотя это может показаться сложным, вы можете думать о вычислениях TFF как о функциях с четко определенными сигнатурами типов. Эти сигнатуры типов могут быть запрошены напрямую.

str(get_average_temperature.type_signature)
'({float32}@CLIENTS -> float32@SERVER)'

Этот tff.federated_computation принимает аргументы федеративного типа {float32}@CLIENTS и возвращают значения федеративного типа {float32}@SERVER . Федеративные вычисления могут также идти от сервера к клиенту, от клиента к клиенту или от сервера к серверу. Федеративные вычисления также могут быть составлены как обычные функции, если их сигнатуры типов совпадают.

Для поддержки развития, ПТФ позволяет вызывать tff.federated_computation как функции Python. Например, мы можем назвать

get_average_temperature([68.5, 70.3, 69.8])
69.53334

Неторопливые вычисления и TensorFlow

Есть два ключевых ограничения, о которых нужно знать. Во- первых, когда интерпретатор Python встречает tff.federated_computation декоратора, функция прослежена раз и сериализовать для использования в будущем. Из-за децентрализованного характера федеративного обучения это будущее использование может происходить в другом месте, например, в среде удаленного выполнения. Поэтому TFF вычисления принципиально , не хотят. Такое поведение является несколько аналогично из tf.function декоратора в TensorFlow.

Во- вторых, федеративный вычисление может состоять только из объединенных операторов (например, tff.federated_mean ), они не могут содержать операции TensorFlow. TensorFlow код должен быть ограничен блоков , украшенных tff.tf_computation . Наиболее обычный код TensorFlow может быть непосредственно оформлены, например, следующая функция , которая принимает число и добавляет 0.5 к нему.

@tff.tf_computation(tf.float32)
def add_half(x):
  return tf.add(x, 0.5)

Они также имеют подписи типа, но без размещения. Например, мы можем назвать

str(add_half.type_signature)
'(float32 -> float32)'

Здесь мы видим важное различие между tff.federated_computation и tff.tf_computation . У первого есть явные места размещения, а у второго нет.

Мы можем использовать tff.tf_computation блоки в федеративных вычислений путем определения мест размещения. Давайте создадим функцию, которая добавляет половину, но только к федеративным поплавкам на клиентах. Мы можем сделать это с помощью tff.federated_map , который применяет данную tff.tf_computation , сохраняя при этом размещение.

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def add_half_on_clients(x):
  return tff.federated_map(add_half, x)

Эта функция почти идентична add_half , за исключение того, что она принимает только значение с размещением на tff.CLIENTS и значение возвращается с тем же размещением. Мы можем видеть это в его сигнатуре типа:

str(add_half_on_clients.type_signature)
'({float32}@CLIENTS -> {float32}@CLIENTS)'

В итоге:

  • TFF работает с федеративными значениями.
  • Каждое федеративное значение имеет федеративный тип, с типом (например. tf.float32 ) и размещением (например. tff.CLIENTS ).
  • Федеративные значения могут быть преобразованы с помощью федеративных вычислений, которые должны быть украшены tff.federated_computation и федеративного типа подписи.
  • TensorFlow код должен содержаться в блоках с tff.tf_computation декораторов.
  • Затем эти блоки могут быть включены в объединенные вычисления.

Новый взгляд на построение собственного алгоритма федеративного обучения

Теперь, когда мы получили представление о Federated Core, мы можем создать собственный алгоритм федеративного обучения. Помните , что выше, мы определили initialize_fn и next_fn для нашего алгоритма. next_fn будет использовать в client_update и server_update мы определили с использованием чистого кода TensorFlow.

Однако для того, чтобы сделать наш алгоритм федеративного вычисления, нам потребуются как next_fn и initialize_fn каждого быть tff.federated_computation .

Федеративные блоки TensorFlow

Создание вычисления инициализации

Функция инициализации будет довольно просто: мы создадим модель , используя model_fn . Тем не менее, помните , что мы должны разделить наш TensorFlow код , используя tff.tf_computation .

@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables

После этого мы можем передать это непосредственно в федеративное вычисление с использованием tff.federated_value .

@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

Создание next_fn

Теперь мы используем наш код обновления клиента и сервера для написания фактического алгоритма. Сначала обратит client_update в tff.tf_computation , который принимает клиентские наборы данных и вес сервера и выводит обновленный тензор весов клиента.

Нам понадобятся соответствующие типы, чтобы правильно оформить нашу функцию. К счастью, тип весов сервера можно извлечь непосредственно из нашей модели.

whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

Давайте посмотрим на сигнатуру типа набора данных. Помните, что мы взяли изображения размером 28 на 28 (с целочисленными метками) и сгладили их.

str(tf_dataset_type)
'<float32[?,784],int32[?,1]>*'

Мы также можем извлечь тип модели весов с помощью нашей server_init функции выше.

model_weights_type = server_init.type_signature.result

Изучив сигнатуру типа, мы сможем увидеть архитектуру нашей модели!

str(model_weights_type)
'<float32[784,10],float32[10]>'

Теперь мы можем создать tff.tf_computation для обновления клиента.

@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

tff.tf_computation версия обновления сервера можно определить аналогичным образом, с использованием типов мы уже извлечена.

@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

И последнее, но не в последнюю очередь, нам нужно создать tff.federated_computation , который приносит все это вместе. Эта функция принимает два федеративных значение, один соответствующих веса серверов (с размещением tff.SERVER ), а другие соответствующим клиентскими наборами данных (с размещением tff.CLIENTS ).

Обратите внимание, что оба эти типа были определены выше! Мы просто должны дать им правильное размещение с помощью tff.FederatedType .

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

Помните 4 элемента алгоритма FL?

  1. Шаг широковещательной рассылки от сервера к клиенту.
  2. Шаг обновления локального клиента.
  3. Этап загрузки клиент-сервер.
  4. Шаг обновления сервера.

Теперь, когда мы построили все вышеперечисленное, каждую часть можно компактно представить в виде одной строки кода TFF. Именно из-за этой простоты нам пришлось особенно тщательно указывать такие вещи, как федеративные типы!

@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))

  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

Теперь у нас есть tff.federated_computation как для инициализации алгоритма, так и для работы на один шаг алгоритма. Чтобы закончить наш алгоритм, мы передаем их в tff.templates.IterativeProcess .

federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

Давайте посмотрим на тип подписи initialize и next функций нашего итерационного процесса.

str(federated_algorithm.initialize.type_signature)
'( -> <float32[784,10],float32[10]>@SERVER)'

Это отражает тот факт , что federated_algorithm.initialize является без аргументов функции , которая возвращает модель однослойной (с 784-по-10 весовой матрицей, и 10 смещением единиц).

str(federated_algorithm.next.type_signature)
'(<server_weights=<float32[784,10],float32[10]>@SERVER,federated_dataset={<float32[?,784],int32[?,1]>*}@CLIENTS> -> <float32[784,10],float32[10]>@SERVER)'

Здесь мы видим , что federated_algorithm.next принимает модель сервера и клиентских данных и возвращает обновленную модель сервера.

Оценка алгоритма

Давайте пробежим несколько раундов и посмотрим, как изменится проигрыш. Во- первых, мы определим функцию оценки , используя централизованный подход , описанный во втором учебнике.

Сначала мы создаем централизованный набор данных для оценки, а затем применяем ту же предварительную обработку, которую мы использовали для обучающих данных.

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients()
central_emnist_test = preprocess(central_emnist_test)

Затем мы пишем функцию, которая принимает состояние сервера и использует Keras для оценки тестового набора данных. Если вы знакомы с tf.Keras , это будет все выглядеть знакомым, хотя обратите внимание на использование set_weights !

def evaluate(server_state):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]  
  )
  keras_model.set_weights(server_state)
  keras_model.evaluate(central_emnist_test)

Теперь давайте инициализируем наш алгоритм и оценим его на тестовом наборе.

server_state = federated_algorithm.initialize()
evaluate(server_state)
2042/2042 [==============================] - 2s 767us/step - loss: 2.8479 - sparse_categorical_accuracy: 0.1027

Давайте потренируемся в течение нескольких раундов и посмотрим, изменится ли что-нибудь.

for round in range(15):
  server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)
2042/2042 [==============================] - 2s 738us/step - loss: 2.5867 - sparse_categorical_accuracy: 0.0980

Мы видим небольшое уменьшение функции потерь. Хотя скачок невелик, мы провели только 15 тренировочных раундов и на небольшом подмножестве клиентов. Чтобы увидеть лучшие результаты, нам, возможно, придется сделать сотни, если не тысячи раундов.

Изменение нашего алгоритма

На этом этапе давайте остановимся и подумаем о том, чего мы достигли. Мы реализовали федеративное усреднение напрямую, объединив чистый код TensorFlow (для обновлений клиента и сервера) с федеративными вычислениями из федеративного ядра TFF.

Чтобы выполнить более сложное обучение, мы можем просто изменить то, что у нас есть выше. В частности, отредактировав приведенный выше чистый код TF, мы можем изменить то, как клиент выполняет обучение, или как сервер обновляет свою модель.

Задача: Добавить градиент вырезку в client_update функции.

Если бы мы хотели внести более крупные изменения, мы могли бы также хранить и транслировать больше данных на сервере. Например, сервер может также хранить скорость обучения клиента и заставлять ее уменьшаться со временем! Обратите внимание , что это потребует изменений в подписи типа , используемых в tff.tf_computation вызовов выше.

Harder Задача: Реализовать Федеративные Усреднение с изучением распада скорости на клиентах.

На этом этапе вы можете начать понимать, насколько гибкими могут быть возможности реализации в этой среде. Для идей ( в том числе ответа на вызов труднее выше) вы можете увидеть исходный код для tff.learning.build_federated_averaging_process или проверить различные исследовательские проекты с использованием TFF.