Сохраните дату! Google I / O возвращается 18-20 мая Зарегистрируйтесь сейчас
Эта страница переведена с помощью Cloud Translation API.
Switch to English

Обзор

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Прежде, чем мы начнем

Прежде чем мы начнем, выполните следующее, чтобы убедиться, что ваша среда правильно настроена. Если вы не видите приветствия, обратитесь к руководству по установке за инструкциями.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
import collections
import attr
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

В руководствах по классификации изображений и генерации текста мы узнали, как настроить конвейеры моделей и данных для федеративного обучения (FL), а также выполнили федеративное обучение через слой tff.learning API TFF.

Когда дело доходит до исследований ФЛ, это только верхушка айсберга. В этом руководстве мы обсудим, как реализовать алгоритмы федеративного обучения, не полагаясь на API tff.learning . Мы стремимся к следующему:

Цели:

  • Понять общую структуру алгоритмов федеративного обучения.
  • Изучите федеративное ядро TFF.
  • Используйте федеративное ядро ​​для непосредственной реализации федеративного усреднения.

Хотя это руководство является самодостаточным, мы рекомендуем сначала прочитать уроки по классификации изображений и созданию текста .

Подготовка входных данных

Сначала мы загружаем и предварительно обрабатываем набор данных EMNIST, включенный в TFF. Дополнительные сведения см. В руководстве по классификации изображений .

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Чтобы передать набор данных в нашу модель, мы сглаживаем данные и преобразуем каждый пример в кортеж формы (flattened_image_vector, label) .

NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch of EMNIST data and return a (features, label) tuple."""
    return (tf.reshape(element['pixels'], [-1, 784]), 
            tf.reshape(element['label'], [-1, 1]))

  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

Теперь мы отбираем небольшое количество клиентов и применяем предварительную обработку к их наборам данных.

client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False)

federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]

Подготовка модели

Мы используем ту же модель, что и в руководстве по классификации изображений . Эта модель (реализованная через tf.keras ) имеет единственный скрытый слой, за которым следует слой softmax.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

Чтобы использовать эту модель в TFF, мы оборачиваем модель tff.learning.Model как tff.learning.Model . Это позволяет нам выполнять прямой проход модели в TFF и извлекать выходные данные модели . Для получения дополнительных сведений см. Также руководство по классификации изображений .

def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Хотя мы использовали tf.keras для создания tff.learning.Model , TFF поддерживает гораздо более общие модели. Эти модели имеют следующие релевантные атрибуты, фиксирующие веса модели:

  • trainable_variables : итерация тензоров, соответствующих обучаемым слоям.
  • non_trainable_variables : итерация тензоров, соответствующих необучаемым слоям.

Для наших целей мы будем использовать только trainable_variables . (так как в нашей модели они есть только!).

Создание собственного алгоритма федеративного обучения

Хотя API tff.learning позволяет создавать множество вариантов федеративного усреднения, существуют и другие федеративные алгоритмы, которые не вписываются в эту структуру. Например, вы можете захотеть добавить регуляризацию, отсечение или более сложные алгоритмы, такие как федеративное обучение GAN . Вместо этого вам может быть интересна федеративная аналитика .

Для этих более сложных алгоритмов нам придется написать собственный алгоритм с использованием TFF. Во многих случаях объединенные алгоритмы состоят из 4 основных компонентов:

  1. Шаг трансляции от сервера к клиенту.
  2. Шаг обновления локального клиента.
  3. Шаг загрузки от клиента к серверу.
  4. Шаг обновления сервера.

В TFF мы обычно представляем объединенные алгоритмы как tff.templates.IterativeProcess (который мы tff.templates.IterativeProcess просто IterativeProcess ). Это класс, который содержит функции initialize и next . Здесь initialize используется для инициализации сервера, а next выполняется один цикл связи объединенного алгоритма. Напишем скелет того, как должен выглядеть наш итеративный процесс для FedAvg.

Во-первых, у нас есть функция инициализации, которая просто создает tff.learning.Model и возвращает его обучаемые веса.

def initialize_fn():
  model = model_fn()
  return model.trainable_variables

Эта функция выглядит неплохо, но, как мы увидим позже, нам нужно будет сделать небольшую модификацию, чтобы сделать ее «вычислением TFF».

Мы также хотим набросать next_fn .

def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = client_update(federated_dataset, server_weights_at_client)

  # The server averages these updates.
  mean_client_weights = mean(client_weights)

  # The server updates its model.
  server_weights = server_update(mean_client_weights)

  return server_weights

Мы сосредоточимся на реализации этих четырех компонентов по отдельности. Сначала мы сосредоточимся на частях, которые могут быть реализованы в чистом TensorFlow, а именно на этапах обновления клиента и сервера.

Блоки TensorFlow

Обновление клиента

Мы будем использовать нашу tff.learning.Model для обучения клиентов по существу так же, как вы обучаете модель TensorFlow. В частности, мы будем использоватьtf.GradientTape для вычисления градиента для пакетов данных, а затем применить этот градиент с помощью client_optimizer . Мы ориентируемся только на веса, которые можно тренировать.

@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

Обновление сервера

Обновление сервера для FedAvg проще, чем обновление клиента. Мы реализуем «стандартное» федеративное усреднение, в котором мы просто заменяем веса модели сервера на среднее значение весов модели клиента. Опять же, мы сосредотачиваемся только на тренировочных весах.

@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

Фрагмент можно упростить, просто вернув mean_client_weights . Однако более продвинутые реализации федеративного усреднения используют mean_client_weights с более сложными методами, такими как импульс или адаптивность.

Задача : реализовать версию server_update которая обновляет веса сервера до среднего значения model_weights и mean_client_weights. (Примечание. Такой подход к «средней точке» аналогичен недавней работе над оптимизатором Lookahead !).

Пока что мы написали только чистый код TensorFlow. Это сделано намеренно, поскольку TFF позволяет использовать большую часть кода TensorFlow, с которым вы уже знакомы. Однако теперь мы должны указать логику оркестровки , то есть логику, которая диктует, что сервер передает клиенту, а что клиент загружает на сервер.

Для этого потребуется Федеративное ядро TFF.

Введение в федеративное ядро

Федеративное ядро ​​(FC) - это набор интерфейсов нижнего уровня, служащих основой для tff.learning API. Однако эти интерфейсы не ограничиваются обучением. Фактически, их можно использовать для аналитики и многих других вычислений над распределенными данными.

На высоком уровне федеративное ядро ​​- это среда разработки, которая позволяет компактно выраженной программной логике комбинировать код TensorFlow с операторами распределенной связи (такими как распределенные суммы и широковещательные рассылки). Цель состоит в том, чтобы дать исследователям и практикам возможность контролировать распределенную связь в своих системах, не требуя деталей реализации системы (таких как определение двухточечного обмена сетевыми сообщениями).

Одним из ключевых моментов является то, что TFF разработан для сохранения конфиденциальности. Следовательно, он позволяет явно контролировать, где находятся данные, чтобы предотвратить нежелательное накопление данных на централизованном сервере.

Федеративные данные

Ключевым понятием в TFF являются «объединенные данные», которые относятся к набору элементов данных, размещенных на группе устройств в распределенной системе (например, наборы данных клиента или веса модели сервера). Мы моделируем всю коллекцию элементов данных на всех устройствах как единое объединенное значение .

Например, предположим, что у нас есть клиентские устройства, каждое из которых имеет поплавок, представляющий температуру датчика. Мы могли бы представить его как федеративный поплавок с помощью

federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

Объединенные типы определяются типом T входящих в него компонентов (например, tf.float32 ) и группой G устройств. Мы сосредоточимся на тех случаях, когда G - это либо tff.CLIENTS либо tff.SERVER . Такой федеративный тип представлен как {T}@G , как показано ниже.

str(federated_float_on_clients)
'{float32}@CLIENTS'

Почему нас так волнуют места размещения? Ключевая цель TFF - дать возможность писать код, который можно было бы развернуть в реальной распределенной системе. Это означает, что важно выяснить, какие подмножества устройств какой код выполняют и где находятся различные фрагменты данных.

ПТФ фокусируется на трех вещах: данные, где размещены данные, и как данные трансформируются. Первые два инкапсулированы в объединенные типы, а последний - в объединенные вычисления .

Федеративные вычисления

TFF - это строго типизированная среда функционального программирования, базовыми единицами которой являются объединенные вычисления . Это элементы логики, которые принимают объединенные значения в качестве входных и возвращают объединенные значения в качестве выходных.

Например, предположим, что мы хотим усреднить температуры на наших клиентских датчиках. Мы могли бы определить следующее (с помощью нашего федеративного числа с плавающей запятой):

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)

Вы можете спросить, чем он отличается от декоратора tf.function в TensorFlow? Ключевой ответ заключается в том, что код, сгенерированный tff.federated_computation является ни TensorFlow, ни кодом Python; Это спецификация распределенной системы на внутреннем платформо-независимом связующем языке .

Хотя это может показаться сложным, вы можете думать о вычислениях TFF как о функциях с четко определенными сигнатурами типов. Эти сигнатуры типа можно запросить напрямую.

str(get_average_temperature.type_signature)
'({float32}@CLIENTS -> float32@SERVER)'

Эта tff.federated_computation принимает аргументы федеративного типа {float32}@CLIENTS и возвращает значения федеративного типа {float32}@SERVER . Объединенные вычисления также могут передаваться от сервера к клиенту, от клиента к клиенту или от сервера к серверу. Объединенные вычисления также могут быть составлены как обычные функции, если их типовые сигнатуры совпадают.

Для поддержки разработки TFF позволяет вам вызывать tff.federated_computation как функцию Python. Например, мы можем позвонить

get_average_temperature([68.5, 70.3, 69.8])
69.53334

Нежелательные вычисления и TensorFlow

Следует помнить о двух ключевых ограничениях. Во-первых, когда интерпретатор Python встречает декоратор tff.federated_computation , функция трассируется один раз и сериализуется для использования в будущем. Из-за децентрализованного характера федеративного обучения это будущее использование может произойти где-то еще, например, в среде удаленного выполнения. Следовательно, вычисления TFF принципиально неинтересны . Это поведение в некоторой степени аналогично tf.function декоратора tf.function в TensorFlow.

Во-вторых, федеративное вычисление может состоять только из федеративных операторов (таких как tff.federated_mean ), они не могут содержать операции TensorFlow. Код TensorFlow должен быть ограничен блоками, украшенными tff.tf_computation . Самый обычный код TensorFlow можно декорировать напрямую, например, следующую функцию, которая принимает число и добавляет к нему 0.5 .

@tff.tf_computation(tf.float32)
def add_half(x):
  return tf.add(x, 0.5)

У них также есть типовые подписи, но без размещения . Например, мы можем позвонить

str(add_half.type_signature)
'(float32 -> float32)'

Здесь мы видим важное различие между tff.federated_computation и tff.tf_computation . У первого есть явные места размещения, а у второго - нет.

Мы можем использовать блоки tff.tf_computation в федеративных вычислениях, указав места размещения. Давайте создадим функцию, которая добавляет половину, но только к объединенным плавающим объектам на клиентах. Мы можем сделать это, используя tff.federated_map , который применяет заданное tff.tf_computation , сохраняя при этом размещение.

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def add_half_on_clients(x):
  return tff.federated_map(add_half, x)

Эта функция почти идентична add_half , за исключением того, что она принимает значения только с размещением в tff.CLIENTS и возвращает значения с таким же размещением. Мы можем видеть это в сигнатуре его типа:

str(add_half_on_clients.type_signature)
'({float32}@CLIENTS -> {float32}@CLIENTS)'

В итоге:

  • TFF работает с федеративными ценностями.
  • Каждое объединенное значение имеет объединенный тип с типом (например, tf.float32 ) и местом размещения (например, tff.CLIENTS ).
  • Объединенные значения можно преобразовать с помощью объединенных вычислений , которые должны быть украшены tff.federated_computation и подписью объединенного типа.
  • Код tff.tf_computation должен содержаться в блоках с декораторами tff.tf_computation .
  • Эти блоки затем могут быть включены в объединенные вычисления.

Снова о создании собственного алгоритма федеративного обучения

Теперь, когда мы получили представление о Федеративном ядре, мы можем создать наш собственный алгоритм федеративного обучения. Помните, что выше мы определили initialize_fn и next_fn для нашего алгоритма. next_fn будет использовать client_update и server_update мы определили с помощью чистого кода server_update .

Однако для того, чтобы сделать наш алгоритм объединенным вычислением, нам понадобятся как next_fn и initialize_fn чтобы каждый был tff.federated_computation .

Федеративные блоки TensorFlow

Создание вычисления инициализации

Функция инициализации будет довольно простой: мы создадим модель, используя model_fn . Однако помните, что мы должны отделить наш код tff.tf_computation с помощью tff.tf_computation .

@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables

Затем мы можем передать это напрямую в объединенное вычисление, используя tff.federated_value .

@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

Создание next_fn

Теперь мы используем наш код обновления клиента и сервера, чтобы написать фактический алгоритм. Сначала мы превратим наш client_update в tff.tf_computation который принимает наборы данных клиента и веса сервера и выводит обновленный тензор весов клиента.

Нам понадобятся соответствующие типы, чтобы правильно оформить нашу функцию. К счастью, тип весов серверов можно извлечь непосредственно из нашей модели.

dummy_model = model_fn()
tf_dataset_type = tff.SequenceType(dummy_model.input_spec)

Давайте посмотрим на сигнатуру типа набора данных. Помните, что мы взяли 28 на 28 изображений (с целыми метками) и сгладили их.

str(tf_dataset_type)
'<float32[?,784],int32[?,1]>*'

Мы также можем извлечь тип весов модели, используя нашу функцию server_init выше.

model_weights_type = server_init.type_signature.result

Изучив подпись типа, мы сможем увидеть архитектуру нашей модели!

str(model_weights_type)
'<float32[784,10],float32[10]>'

Теперь мы можем создать наш tff.tf_computation для обновления клиента.

@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

Версия обновления сервера tff.tf_computation может быть определена аналогичным образом, используя типы, которые мы уже извлекли.

@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

И последнее, но не менее важное: нам нужно создать tff.federated_computation который объединит все это воедино. Эта функция принимает два объединенных значения , одно соответствует весам сервера (с размещением tff.SERVER ), а другое - наборам данных клиента (с размещением tff.CLIENTS ).

Обратите внимание, что оба этих типа были определены выше! Нам просто нужно правильно разместить их с помощью tff.FederatedType .

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

Помните 4 элемента алгоритма FL?

  1. Шаг трансляции от сервера к клиенту.
  2. Шаг обновления локального клиента.
  3. Шаг загрузки от клиента к серверу.
  4. Шаг обновления сервера.

Теперь, когда мы создали вышесказанное, каждую часть можно компактно представить как одну строку кода TFF. Из-за этой простоты нам пришлось проявить особую осторожность при указании таких вещей, как федеративные типы!

@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))

  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

Теперь у нас есть tff.federated_computation как для инициализации алгоритма, так и для выполнения одного шага алгоритма. Чтобы завершить наш алгоритм, мы передаем их в tff.templates.IterativeProcess .

federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

Давайте посмотрим на сигнатуру типа функций initialize и next нашего итеративного процесса.

str(federated_algorithm.initialize.type_signature)
'( -> <float32[784,10],float32[10]>@SERVER)'

Это отражает тот факт, что federated_algorithm.initialize - это функция без аргументов, которая возвращает однослойную модель (с матрицей весов 784 на 10 и 10 единицами смещения).

str(federated_algorithm.next.type_signature)
'(<<float32[784,10],float32[10]>@SERVER,{<float32[?,784],int32[?,1]>*}@CLIENTS> -> <float32[784,10],float32[10]>@SERVER)'

Здесь мы видим, что federated_algorithm.next принимает модель сервера и данные клиента и возвращает обновленную модель сервера.

Оценка алгоритма

Давайте проведем несколько раундов и посмотрим, как изменится проигрыш. Сначала мы определим функцию оценки, используя централизованный подход, описанный во втором руководстве.

Сначала мы создаем централизованный набор данных оценки, а затем применяем ту же предварительную обработку, которую мы использовали для данных обучения.

Обратите внимание , что мы только take первые 1000 элементов по причинам вычислительной эффективности, но , как правило , мы будем использовать весь тестовый набор данных.

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients().take(1000)
central_emnist_test = preprocess(central_emnist_test)

Затем мы пишем функцию, которая принимает состояние сервера и использует Keras для оценки тестового набора данных. Если вы знакомы с tf.Keras , все это tf.Keras вам знакомым, но обратите внимание на использование set_weights !

def evaluate(server_state):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]  
  )
  keras_model.set_weights(server_state)
  keras_model.evaluate(central_emnist_test)

Теперь давайте инициализируем наш алгоритм и оценим его на тестовом наборе.

server_state = federated_algorithm.initialize()
evaluate(server_state)
50/50 [==============================] - 0s 2ms/step - loss: 2.3026 - sparse_categorical_accuracy: 0.0910

Давайте потренируемся несколько раундов и посмотрим, не изменится ли что-нибудь.

for round in range(15):
  server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)
50/50 [==============================] - 0s 1ms/step - loss: 2.1706 - sparse_categorical_accuracy: 0.2440

Мы видим небольшое уменьшение функции потерь. Хотя скачок невелик, мы выполнили всего 10 тренировочных раундов и на небольшой группе клиентов. Чтобы увидеть лучшие результаты, нам, возможно, придется сделать сотни, если не тысячи раундов.

Модификация нашего алгоритма

На этом остановимся и подумаем о том, чего мы достигли. Мы реализовали федеративное усреднение напрямую, объединив чистый код TensorFlow (для обновлений клиента и сервера) с объединенными вычислениями из федеративного ядра TFF.

Чтобы выполнить более сложное обучение, мы можем просто изменить то, что у нас есть выше. В частности, редактируя чистый TF-код выше, мы можем изменить то, как клиент выполняет обучение или как сервер обновляет свою модель.

Задача: добавить отсечение градиента в функцию client_update .

Если бы мы хотели внести большие изменения, мы могли бы также сделать так, чтобы сервер сохранял и транслировал больше данных. Например, сервер может также сохранять скорость обучения клиента и снижать ее со временем! Обратите внимание, что для этого потребуется изменить сигнатуры типов, используемые в вызовах tff.tf_computation выше.

Более сложная задача: внедрить федеративное усреднение со снижением скорости обучения клиентов.

На этом этапе вы можете начать понимать, насколько гибко можно реализовать в этой структуре. Идеи (включая ответ на более сложную задачу, приведенную выше) вы можете найти в исходном коде tff.learning.build_federated_averaging_process или ознакомиться с различными исследовательскими проектами с использованием TFF.