Посетите симпозиум «Женщины в машинном обучении» 7 декабря Зарегистрируйтесь сейчас

Федеративное обучение для классификации изображений

Оптимизируйте свои подборки Сохраняйте и классифицируйте контент в соответствии со своими настройками.

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

В этом уроке, мы используем классический пример MNIST обучения ввести Федеративное Learning (FL) API слой TFF, tff.learning - набор высокоуровневых интерфейсов , которые могут быть использованы для выполнения общих типов федеративных задач обучения, такие как федеративное обучение на основе пользовательских моделей, реализованных в TensorFlow.

Это руководство и API федеративного обучения предназначены в первую очередь для пользователей, которые хотят подключить свои собственные модели TensorFlow к TFF, рассматривая последние в основном как черный ящик. Для понимания более углубленного TFF и как реализовать свои собственные алгоритмы обучения федеративных см учебники по FC Core API - Пользовательские Федеративные алгоритмы Часть 1 и Часть 2 .

Более подробную информацию о tff.learning , продолжайте Федеративные обучения для текста поколения , учебник , который в дополнение к покрытию повторяющихся моделей, а также демонстрирует загрузку предварительно обученной сериализован модель Keras для уточнения с федеративным обучения в сочетании с использованием оценки Keras.

Прежде, чем мы начнем

Прежде чем мы начнем, выполните следующее, чтобы убедиться, что ваша среда правильно настроена. Если вы не видите приветствие, пожалуйста , обратитесь к установке руководству для получения инструкций.

# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
!pip install --quiet --upgrade tb-nightly  # or tensorboard, but not both

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

Подготовка входных данных

Начнем с данных. Для федеративного обучения требуется объединенный набор данных, т. Е. Сбор данных от нескольких пользователей. Федеративные данные , как правило , не- н.о.р. , который представляет собой уникальный набор проблем.

Для того , чтобы облегчить экспериментирование, мы засевали репозиторий TFF с несколькими наборами данных, в том числе федеративной версии MNIST , который содержит версию оригинального NIST набора данных , который был повторно обработаны с использованием листьев так, чтобы данные введенные оригинальным писателем цифры. Поскольку у каждого модуля записи есть уникальный стиль, этот набор данных демонстрирует поведение без идентификаторов, ожидаемое от федеративных наборов данных.

Вот как мы можем его загрузить.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Наборы данных , возвращаемые load_data() являются экземплярами tff.simulation.ClientData , интерфейс , который позволяет перечислять множество пользователей, чтобы построить tf.data.Dataset , представляющий данные конкретного пользователя, и для запроса состав отдельных элементов. Вот как вы можете использовать этот интерфейс для изучения содержимого набора данных. Имейте в виду, что хотя этот интерфейс позволяет вам перебирать идентификаторы клиентов, это только функция данных моделирования. Как вы вскоре увидите, идентификационные данные клиентов не используются фреймворком федеративного обучения - их единственная цель - позволить вам выбрать подмножества данных для моделирования.

len(emnist_train.client_ids)
3383
emnist_train.element_type_structure
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
1
from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()

PNG

Изучение неоднородности объединенных данных

Федеративные данные , как правило , не- н.о.р. , пользователи , как правило , имеют разные распределения данных в зависимости от характера использования. У некоторых клиентов может быть меньше обучающих примеров на устройстве из-за нехватки данных локально, в то время как у некоторых клиентов будет более чем достаточно обучающих примеров. Давайте исследуем эту концепцию неоднородности данных, типичную для федеративной системы, с данными EMNIST, которые у нас есть. Важно отметить, что этот глубокий анализ данных клиента доступен только нам, потому что это среда моделирования, в которой все данные доступны нам локально. В реальной производственной интегрированной среде вы не сможете проверять данные одного клиента.

Во-первых, давайте возьмем выборку данных одного клиента, чтобы получить представление о примерах на одном смоделированном устройстве. Поскольку набор данных, который мы используем, был запрограммирован уникальным писателем, данные одного клиента представляют собой почерк одного человека для выборки цифр от 0 до 9, имитируя уникальный «шаблон использования» одного пользователя.

## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1

PNG

Теперь давайте визуализируем количество примеров на каждом клиенте для каждой цифровой метки MNIST. В федеративной среде количество примеров на каждом клиенте может незначительно варьироваться в зависимости от поведения пользователя.

# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

PNG

Теперь давайте визуализируем среднее изображение для каждого клиента для каждой метки MNIST. Этот код будет производить среднее значение каждого пикселя для всех пользовательских примеров для одной метки. Мы увидим, что среднее изображение одного клиента для цифры будет отличаться от среднего изображения другого клиента для той же цифры из-за уникального стиля почерка каждого человека. Мы можем размышлять о том, как каждый локальный раунд обучения будет подталкивать модель в разном направлении для каждого клиента, поскольку мы извлекаем уроки из собственных уникальных данных этого пользователя в этом локальном раунде. Позже в этом руководстве мы увидим, как мы можем принимать каждое обновление модели от всех клиентов и объединять их вместе в нашу новую глобальную модель, которая извлекается из собственных уникальных данных каждого нашего клиента.

# Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')

PNG

PNG

PNG

PNG

PNG

Пользовательские данные могут быть зашумлены и ненадежно помечены. Например, глядя на данные клиента №2 выше, мы видим, что для метки 2, возможно, были некоторые примеры с неправильной маркировкой, создающие более шумное среднее изображение.

Предварительная обработка входных данных

Так как данные уже tf.data.Dataset , предварительную обработку можно осуществить с использованием преобразования данных. Здесь мы сглаживать 28x28 изображений в 784 - элементные массивы, перетасовать отдельные примеры, организовать их в пакеты, и переименовывать функции из pixels и label для x и y для использования с Keras. Мы также бросить в repeat по множеству данных , чтобы запустить несколько эпох.

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Давайте проверим, что это сработало.

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [5],
       [7],
       [1],
       [7],
       [7],
       [1],
       [4],
       [7],
       [4],
       [2],
       [2],
       [5],
       [4],
       [1],
       [1],
       [0],
       [0],
       [9]], dtype=int32))])

У нас есть почти все строительные блоки для построения объединенных наборов данных.

Один из способов кормить федеративные данными для TFF в симуляции просто как список Python, с каждым элементом списка , удерживающий данные отдельного пользователя, будь то в виде списка или в виде tf.data.Dataset . Поскольку у нас уже есть интерфейс, обеспечивающий последнее, давайте воспользуемся им.

Вот простая вспомогательная функция, которая построит список наборов данных из заданного набора пользователей в качестве входных данных для цикла обучения или оценки.

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

Теперь, как нам выбирать клиентов?

В типичном сценарии федеративного обучения мы имеем дело с потенциально очень большой группой пользовательских устройств, только часть из которых может быть доступна для обучения в данный момент времени. Это имеет место, например, когда клиентские устройства являются мобильными телефонами, которые участвуют в обучении только тогда, когда они подключены к источнику питания, отключены от сети с измерением и в противном случае находятся в режиме ожидания.

Конечно, мы находимся в среде моделирования, и все данные доступны локально. Обычно при запуске моделирования мы просто выбираем случайное подмножество клиентов, которые будут участвовать в каждом раунде обучения, как правило, разных в каждом раунде.

Тем не менее, как вы можете узнать, изучая документ о Усреднении Федеративного алгоритме достижение сходимости в системе с произвольно выбранными подмножествами клиентов в каждом раунде может занять некоторое время, и было бы нецелесообразно , чтобы запустить сотни раундов это интерактивное руководство.

Вместо этого мы сделаем выборку набора клиентов один раз и повторно используем тот же набор в раундах, чтобы ускорить конвергенцию (намеренно подгоняя под данные этих нескольких пользователей). Мы оставляем читателю в качестве упражнения изменить это руководство для имитации случайной выборки - это довольно легко сделать (как только вы это сделаете, имейте в виду, что приведение модели к схождению может занять некоторое время).

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

Создание модели с Керасом

Если вы используете Keras, у вас, вероятно, уже есть код, создающий модель Keras. Вот пример простой модели, которая подойдет для наших нужд.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

Для того , чтобы использовать любую модель с TFF, она должна быть завернута в экземпляре tff.learning.Model интерфейса, который предоставляет методы штамповать вперед мимо модель, метаданные свойств и т.д., аналогично Keras, но также вводит дополнительным элементы, такие как способы управления процессом вычисления объединенных метрик. Давайте пока не будем об этом беспокоиться; если у вас есть модель Keras , как тот , который мы только что определено выше, вы можете иметь ПТФ обернуть его для вас, вызывая tff.learning.from_keras_model , передавая модель и образец пакет данных в качестве аргументов, как показано ниже.

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Обучение модели на объединенных данных

Теперь, когда у нас есть модель , завернутая в tff.learning.Model для использования с TFF, мы можем позволить ПТФУ построить алгоритм усреднения Федеративного, вызывая вспомогательную функцию tff.learning.build_federated_averaging_process следующим образом .

Имейте в виде , что аргумент должен быть конструктор (например, model_fn выше), а не уже построенный экземпляр, так что построение модели может произойти в контексте контролируемого TFF (если вам интересно , о причинах это, мы рекомендуем вам ознакомиться с последующем учебником по пользовательским алгоритмам ).

Одним из наиболее важного замечания по алгоритму усреднения Федеративного ниже, есть 2 оптимизаторы: а _client оптимизатор и _SERVER оптимизатор. _Client оптимизатор используется только для вычисления локального обновления модели на каждый клиенте. _SERVER оптимизатор применяет усредненное обновление глобальной модели на сервере. В частности, это означает, что выбор оптимизатора и используемой скорости обучения может отличаться от тех, которые вы использовали для обучения модели на стандартном наборе данных iid. Мы рекомендуем начинать с обычного SGD, возможно, с меньшей скоростью обучения, чем обычно. Скорость обучения, которую мы используем, не была тщательно настроена, не стесняйтесь экспериментировать.

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

Что только что произошло? ПТФ построил пару объединенных вычислений и упаковывает их в tff.templates.IterativeProcess , в котором эти вычисления доступны в виде пара свойств initialize и в next .

В двух словах, федеративные вычисления являются программы на внутреннем языке ПТФ, что может выражать различные федеративные алгоритмы (вы можете найти больше об этом в пользовательских алгоритмов учебник). В этом случае два вычисления генерироваться и упаковывают в iterative_process реализации федеративного Усреднения .

Целью TFF является определение вычислений таким образом, чтобы они могли выполняться в реальных настройках федеративного обучения, но в настоящее время реализована только среда выполнения моделирования локального выполнения. Чтобы выполнить вычисление в симуляторе, вы просто вызываете его как функцию Python. Эта интерпретируемая среда по умолчанию не предназначена для обеспечения высокой производительности, но ее будет достаточно для этого руководства; мы ожидаем предоставить более производительные среды выполнения моделирования, чтобы облегчить более масштабные исследования в будущих выпусках.

Давайте начнем с initialize вычисления. Как и в случае со всеми объединенными вычислениями, вы можете думать об этом как о функции. Вычисление не принимает аргументов и возвращает один результат - представление состояния процесса федеративного усреднения на сервере. Хотя мы не хотим углубляться в детали TFF, может быть поучительно посмотреть, как это состояние выглядит. Вы можете представить это следующим образом.

str(iterative_process.initialize.type_signature)
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

В то время как выше тип подписи может показаться на первый взгляд немного загадочным, вы можете признать , что состояние сервера состоит из model (начальные параметры модели для MNIST , которые будут распространяться на все устройства), и optimizer_state (дополнительная информация поддерживается сервером, например, количество раундов для использования в расписаниях гиперпараметров и т. д.).

Давайте Призовите initialize вычисления для построения состояния сервера.

state = iterative_process.initialize()

Вторая пара федеративных вычислений, next , представляет собой один раунд Федеративного усреднения, который состоит из толкая состояния сервера (включая параметры модели) для клиентов, на устройстве обучение на свои локальных данных, сбор и обновление усреднения модели , и производство новой обновленной модели на сервере.

Концептуально, вы можете думать о next , как с функциональной сигнатуру , которая выглядит следующим образом .

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

В частности, следует думать о next() не как функция , которая работает на сервере, а быть декларативным функциональным представлением всего децентрализованного вычисления - некоторые из входов обеспечивается сервер ( SERVER_STATE ), но каждый участвующим устройство предоставляет свой собственный локальный набор данных.

Давайте проведем один раунд обучения и визуализируем результаты. Мы можем использовать объединенные данные, которые мы уже создали выше, для выборки пользователей.

state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12345679), ('loss', 3.1193738)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Давайте проведем еще несколько раундов. Как отмечалось ранее, обычно на этом этапе вы выбираете подмножество данных моделирования из новой случайно выбранной выборки пользователей для каждого раунда, чтобы смоделировать реалистичное развертывание, в котором пользователи постоянно приходят и уходят, но в этой интерактивной записной книжке для ради демонстрации мы просто повторно используем одних и тех же пользователей, чтобы система быстро сходилась.

NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13518518), ('loss', 2.9834728)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14382716), ('loss', 2.861665)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.17407407), ('loss', 2.7957022)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.19917695), ('loss', 2.6146567)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.21975309), ('loss', 2.529761)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2409465), ('loss', 2.4053504)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2611111), ('loss', 2.315389)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.30823046), ('loss', 2.1240263)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.33312756), ('loss', 2.1164262)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Потери в обучении уменьшаются после каждого раунда федеративного обучения, что указывает на сближение модели. Есть несколько важных предостережений с этими учебными показателями, однако, смотрите раздел по оценке позже в этом руководстве.

Отображение метрик модели в TensorBoard

Затем давайте визуализируем метрики этих объединенных вычислений с помощью Tensorboard.

Начнем с создания каталога и соответствующего средства записи сводки для записи показателей.

logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

Постройте соответствующие скалярные метрики с тем же составителем сводки.

with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

Запустите TensorBoard с указанным выше корневым каталогом журналов. Загрузка данных может занять несколько секунд.

!ls {logdir}
%tensorboard --logdir {logdir} --port=0
events.out.tfevents.1629557449.ebe6e776479e64ea-4903924a278.borgtask.google.com.458912.1.v2
Launching TensorBoard...
Reusing TensorBoard on port 50681 (pid 292785), started 0:30:30 ago. (Use '!kill 292785' to kill it.)
<IPython.core.display.Javascript at 0x7fd6617e02d0>
# Uncomment and run this this cell to clean your directory of old output for
# future graphs from this directory. We don't run it by default so that if 
# you do a "Runtime > Run all" you don't lose your results.

# !rm -R /tmp/logs/scalars/*

Чтобы таким же образом просматривать метрики оценки, вы можете создать отдельную папку eval, например «logs / scalars / eval», для записи в TensorBoard.

Настройка реализации модели

Keras является рекомендуемым высокого уровня модели API для TensorFlow , и мы рекомендуем при помощи моделей Keras (через tff.learning.from_keras_model ) в TFF всякий раз , когда это возможно.

Тем не менее, tff.learning обеспечивает интерфейс модели более низкого уровня, tff.learning.Model , который предоставляет минимальную функциональность , необходимую для использования модели для объединенного обучения. Непосредственно реализующий этот интерфейс (возможно , до сих пор используют строительные блоки , как tf.keras.layers ) обеспечивает максимальную настройку без изменения внутренностей федеративных алгоритмов обучения.

Так что давайте сделаем все заново с нуля.

Определение переменных модели, прямого прохода и показателей

Первый шаг - определить переменные TensorFlow, с которыми мы собираемся работать. Чтобы сделать следующий код более разборчивым, давайте определим структуру данных для представления всего набора. Это будет включать в себя такие переменные, как weights и bias , что мы будем обучать, а также переменные , которые будут содержать различную сводную статистику и счетчик мы будем обновлять в процессе обучения, такие как loss_sum , accuracy_sum и num_examples .

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

Вот метод, который создает переменные. Для простоты, мы представляем все статистические данные , как tf.float32 , так как это позволит устранить необходимость в преобразовании типа на более позднем этапе. Обертывание инициализаторы переменной , как лямбды является требованием введенного переменными ресурсов .

def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

Имея переменные для параметров модели и совокупную статистику, теперь мы можем определить метод прямого прохода, который вычисляет потери, генерирует прогнозы и обновляет совокупную статистику для одного пакета входных данных, как показано ниже.

def predict_on_batch(variables, x):
  return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)

def mnist_forward_pass(variables, batch):
  y = predict_on_batch(variables, batch['x'])
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

Затем мы определяем функцию, которая возвращает набор локальных метрик, снова используя TensorFlow. Это значения (в дополнение к обновлениям модели, которые обрабатываются автоматически), которые могут быть агрегированы на сервере в процессе федеративного обучения или оценки.

Здесь мы просто возвращаем среднюю loss и accuracy , а также num_examples , которые мы должны правильно взвешивать вклады от различных пользователей при расчете федеративных агрегатов.

def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

Наконец, мы должны определить , как агрегировать локальные показатели , испускаемых каждым устройством с помощью get_local_mnist_metrics . Это только часть кода, не написано в TensorFlow - это федеративное вычисление выражается в TFF. Если вы хотите копать глубже, обезжиренный над пользовательскими алгоритмами учебник, но в большинстве случаев, вы не будете действительно нужны; вариантов рисунка, показанного ниже, должно хватить. Вот как это выглядит:

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

Входные metrics аргумент соответствует OrderedDict возвращенного get_local_mnist_metrics выше, но критически значение уже не является tf.Tensors - они «боксовый» , как tff.Value s, чтобы сделать его очищает вы можете больше не манипулировать ими с помощью TensorFlow, но только с помощью интегрированных операторов ПТФ как tff.federated_mean и tff.federated_sum . Возвращенный словарь глобальных агрегатов определяет набор показателей, которые будут доступны на сервере.

Построение экземпляра tff.learning.Model

Имея все вышеперечисленное, мы готовы построить представление модели для использования с TFF, подобное тому, которое создается для вас, когда вы позволяете TFF принимать модель Keras.

from typing import Callable, List, OrderedDict

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def predict_on_batch(self, x, training=True):
    del training
    return predict_on_batch(self._variables, x)

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

  @tf.function
  def report_local_unfinalized_metrics(
      self) -> OrderedDict[str, List[tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to unfinalized values."""
    return collections.OrderedDict(
        num_examples=[self._variables.num_examples],
        loss=[self._variables.loss_sum, self._variables.num_examples],
        accuracy=[self._variables.accuracy_sum, self._variables.num_examples])

  def metric_finalizers(
      self) -> OrderedDict[str, Callable[[List[tf.Tensor]], tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to finalizers."""
    return collections.OrderedDict(
        num_examples=tf.function(func=lambda x: x[0]),
        loss=tf.function(func=lambda x: x[0] / x[1]),
        accuracy=tf.function(func=lambda x: x[0] / x[1]))

Как вы можете видеть, абстрактные методы и свойства , определяемые tff.learning.Model соответствует фрагменты кода в предыдущем разделе , который ввел переменные и определили потери и статистические данные.

Вот несколько моментов, которые стоит выделить:

  • Все утверждают , что ваша модель будет использовать , должны быть захвачена в качестве переменных TensorFlow как ПТФ не использует Python во время выполнения (помните , код должны быть написано таким образом, что он может быть развернут на мобильные устройства, см пользовательских алгоритмов учебника для более углубленного комментарий к причинам).
  • Ваша модель должна описывать , какую форму она принимает данных ( input_spec ), как и в целом, ПТФ является сильно типизированной средой и хочет , чтобы определить сигнатуры типов для всех компонентов. Объявление формата входных данных вашей модели - важная часть этого.
  • Хотя технически не требуется, мы рекомендуем оборачивать все TensorFlow логики (вперед проход, метрические расчеты и т.д.) как tf.function s, так как это помогает обеспечить TensorFlow можно сериализовать, и устраняет необходимость в зависимости явного управления.

Вышеупомянутого достаточно для оценки и таких алгоритмов, как Federated SGD. Однако для федеративного усреднения нам нужно указать, как модель должна обучаться локально для каждого пакета. Мы укажем локальный оптимизатор при построении алгоритма федеративного усреднения.

Моделирование федеративного обучения с помощью новой модели

Когда все вышеперечисленное выполнено, оставшаяся часть процесса выглядит так, как мы уже видели - просто замените конструктор модели конструктором нашего нового класса модели и используйте два объединенных вычисления в итеративном процессе, который вы создали для циклического прохождения тренировочные раунды.

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.0708053), ('accuracy', 0.12777779)])), ('stat', OrderedDict([('num_examples', 4860)]))])
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.011699), ('accuracy', 0.13024691)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.7408307), ('accuracy', 0.15576132)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.6761012), ('accuracy', 0.17921811)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.675567), ('accuracy', 0.1855967)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.5664043), ('accuracy', 0.20329218)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.4179392), ('accuracy', 0.24382716)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.3237286), ('accuracy', 0.26687244)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.1861682), ('accuracy', 0.28209877)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.046388), ('accuracy', 0.32037038)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Чтобы увидеть эти метрики в TensorBoard, обратитесь к шагам, перечисленным выше в разделе «Отображение метрик модели в TensorBoard».

Оценка

Все наши эксперименты до сих пор представляли только федеративные показатели обучения - средние показатели по всем пакетам данных, обученным для всех клиентов в раунде. Это вызывает обычные опасения по поводу переобучения, тем более что для простоты мы использовали один и тот же набор клиентов в каждом раунде, но есть дополнительное понятие переобучения в показателях обучения, специфичных для алгоритма федеративного усреднения. Это легче всего увидеть, если представить, что у каждого клиента есть один пакет данных, и мы тренируемся на этом пакете для многих итераций (эпох). В этом случае локальная модель будет быстро точно соответствовать этому одному пакету, и поэтому показатель локальной точности, который мы усредняем, приблизится к 1,0. Таким образом, эти показатели обучения можно рассматривать как признак того, что обучение прогрессирует, но не более того.

Для выполнения оценки на федеративных данных, вы можете построить другое федеративное вычисление , предназначенное только для этой цели, используя tff.learning.build_federated_evaluation функции, и переходя в вашей модели конструктора в качестве аргумента. Обратите внимание , что в отличие от Федеративного усреднении, где мы использовали MnistTrainableModel , достаточно пройти MnistModel . Оценка не выполняет градиентный спуск, и нет необходимости создавать оптимизаторы.

Для экспериментов и исследований, когда централизованный тест набор данные доступны, Федеративные обучения для текста поколение демонстрирует еще один вариант оценки: принимая тренированные веса от федеративного обучения, применяя их к стандартной модели Keras, а затем просто призывая tf.keras.models.Model.evaluate() на централизованном наборе данных.

evaluation = tff.learning.build_federated_evaluation(MnistModel)

Вы можете проверить сигнатуру абстрактного типа функции оценки следующим образом.

str(evaluation.type_signature)
'(<server_model_weights=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,federated_dataset={<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <eval=<num_examples=float32,loss=float32,accuracy=float32>,stat=<num_examples=int64>>@SERVER)'

Нет необходимости беспокоиться о деталях на данный момент, просто надо знать , что она имеет следующий общий вид, аналогичный tff.templates.IterativeProcess.next , но с двумя важными отличиями. Во-первых, мы не возвращаем состояние сервера, поскольку оценка не изменяет модель или какой-либо другой аспект состояния - вы можете думать об этом как об отсутствии состояния. Во-вторых, для оценки нужна только модель и не требуется никакой другой части состояния сервера, которая может быть связана с обучением, например переменных оптимизатора.

SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS

Давайте вызовем оценку последнего состояния, к которому мы пришли во время обучения. Для того , чтобы извлечь последнюю обученную модель от состояния сервера, вы просто получите доступ к .model элемента следующим образом .

train_metrics = evaluation(state.model, federated_train_data)

Вот что мы получаем. Обратите внимание, что цифры выглядят немного лучше, чем то, что было сообщено в предыдущем раунде обучения выше. По соглашению, показатели обучения, сообщаемые процессом итеративного обучения, обычно отражают производительность модели в начале цикла обучения, поэтому показатели оценки всегда будут на шаг впереди.

str(train_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 4860.0), ('loss', 1.7510437), ('accuracy', 0.2788066)])), ('stat', OrderedDict([('num_examples', 4860)]))])"

Теперь давайте скомпилируем тестовую выборку объединенных данных и повторно запустим оценку на тестовых данных. Данные будут поступать из той же выборки реальных пользователей, но из отдельного набора данных.

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 580.0), ('loss', 1.8361608), ('accuracy', 0.2413793)])), ('stat', OrderedDict([('num_examples', 580)]))])"

На этом урок завершен. Мы рекомендуем вам поиграть с параметрами (например, размеры пакетов, количество пользователей, эпохи, скорость обучения и т. Д.), Изменить приведенный выше код, чтобы имитировать обучение на случайных выборках пользователей в каждом раунде, и изучить другие руководства. мы разработали.