Сохраните дату! Google I / O возвращается 18-20 мая Зарегистрируйтесь сейчас
Эта страница переведена с помощью Cloud Translation API.
Switch to English

Реализация настраиваемых агрегатов

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

В этом руководстве мы объясняем принципы проектирования, лежащие в tff.aggregators модуля tff.aggregators и передовые методы реализации настраиваемого агрегирования значений от клиентов к серверу.

Предпосылки. В этом руководстве предполагается, что вы уже знакомы с основными концепциями Federated Core, такими как размещения ( tff.SERVER , tff.CLIENTS ), как TFF представляет вычисления ( tff.tf_computation , tff.federated_computation ) и их типовые сигнатуры.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Резюме дизайна

В TFF «агрегация» относится к перемещению набора значений на tff.CLIENTS для создания агрегированного значения того же типа на tff.SERVER . То есть не обязательно иметь ценность каждого отдельного клиента. Например, при федеративном обучении обновления клиентской модели усредняются, чтобы получить агрегированное обновление модели, которое будет применяться к глобальной модели на сервере.

В дополнение к операторам, выполняющим эту задачу, таким как tff.federated_sum , TFF предоставляет tff.templates.AggregationProcess ( процесс с tff.templates.AggregationProcess состояния ), который формализует сигнатуру типа для вычисления агрегирования, чтобы его можно было обобщить на более сложные формы, чем простая сумма.

Основными компонентами модуля tff.aggregators являются фабрики для создания AggregationProcess , которые в целом предназначены для использования в качестве заменяемых строительных блоков TFF в двух аспектах:

  1. Параметризованные вычисления. Агрегация - это независимый строительный блок, который можно подключить к другим модулям TFF, предназначенным для работы с tff.aggregators чтобы параметризовать их необходимое агрегирование.

Пример:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Агрегационный состав. Строительный блок агрегирования может быть составлен с другими строительными блоками агрегирования для создания более сложных составных агрегатов.

Пример:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Остальная часть этого руководства объясняет, как достигаются эти две цели.

Процесс агрегирования

Сначала мы резюмируем tff.templates.AggregationProcess и следуем шаблону фабрики для его создания.

tff.templates.AggregationProcess - это tff.templates.MeasuredProcess с сигнатурами типов, указанными для агрегирования. В частности, функции initialize и next имеют следующие сигнатуры типов:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Состояние (типа state_type ) должно быть размещено на сервере. next функция принимает в качестве входного аргумента состояние и значение, которое нужно агрегировать (типа value_type ), размещенное у клиентов. * Означает необязательные другие входные аргументы, например веса в средневзвешенном значении. Он возвращает обновленный объект состояния, агрегированное значение того же типа, размещенное на сервере, и некоторые измерения.

Обратите внимание, что как состояние, которое должно передаваться между выполнениями next функции, так и сообщаемые измерения, предназначенные для сообщения любой информации в зависимости от конкретного выполнения next функции, могут быть пустыми. Тем не менее, они должны быть четко указаны, чтобы другие части TFF имели четкий контракт, которому нужно следовать.

tff.learning , что другие модули TFF, например обновления модели в tff.learning , будут использовать tff.templates.AggregationProcess для параметризации агрегирования значений. Однако то, что именно агрегируются значения и каковы их типовые сигнатуры, зависит от других деталей обучаемой модели и алгоритма обучения, используемого для этого.

Чтобы сделать агрегацию независимой от других аспектов вычислений, мы используем шаблон фабрики - мы создаем соответствующий tff.templates.AggregationProcess когда становятся доступными соответствующие сигнатуры типов объектов, которые будут агрегированы, путем вызова метода create фабрики. Таким образом, непосредственное управление процессом агрегирования необходимо только авторам библиотеки, которые несут ответственность за это создание.

Заводы по переработке агрегации

Есть два абстрактных базовых фабричных класса для невзвешенного и взвешенного агрегирования. Их метод create принимает типовые сигнатуры значений для агрегирования и возвращает tff.templates.AggregationProcess для агрегирования таких значений.

Процесс, созданный tff.aggregators.UnweightedAggregationFactory принимает два входных аргумента: (1) состояние на сервере и (2) значение указанного типа value_type .

Пример реализации - tff.aggregators.SumFactory .

Процесс, созданный tff.aggregators.WeightedAggregationFactory принимает три входных аргумента: (1) состояние на сервере, (2) значение указанного типа value_type и (3) вес типа weight_type , как указано пользователем фабрики при вызове его метода create .

Пример реализации - tff.aggregators.MeanFactory который вычисляет средневзвешенное значение.

Фабричный паттерн - это то, как мы достигаем первой цели, указанной выше; эта агрегация является независимым строительным блоком. Например, при изменении обучаемых переменных модели сложная агрегация не обязательно должна изменяться; представляющая его фабрика будет вызываться с другой сигнатурой типа при использовании таким методом, как tff.learning.build_federated_averaging_process .

Композиции

Напомним, что общий процесс агрегации может инкапсулировать (а) некоторую предварительную обработку значений на клиентах, (б) перемещение значений от клиента к серверу и (в) некоторую постобработку агрегированного значения на сервере. Вторая цель, указанная выше, композиция агрегации, реализуется внутри модуля tff.aggregators путем структурирования реализации фабрик агрегации таким образом, чтобы часть (b) могла быть делегирована другой фабрике агрегации.

Вместо того, чтобы реализовывать всю необходимую логику в рамках одного фабричного класса, реализации по умолчанию сосредоточены на одном аспекте, важном для агрегации. При необходимости этот шаблон затем позволяет нам заменять строительные блоки по одному.

Примером может служить взвешенный tff.aggregators.MeanFactory . Его реализация умножает предоставленные значения и веса на клиентах, затем суммирует взвешенные значения и веса независимо, а затем делит сумму взвешенных значений на сумму весов на сервере. Вместо реализации суммирования с помощью прямого использования оператора tff.federated_sum суммирование делегируется двум экземплярам tff.aggregators.SumFactory .

Такая структура позволяет заменять два суммирования по умолчанию разными фабриками, которые реализуют сумму по-разному. Например, tff.aggregators.SecureSumFactory или пользовательская реализация tff.aggregators.UnweightedAggregationFactory . И наоборот, время tff.aggregators.MeanFactory может само быть внутренним агрегатом другой фабрики, например tff.aggregators.clipping_factory , если значения должны быть обрезаны перед усреднением.

См. Предыдущее руководство по настройке агрегатов, рекомендованное для ознакомления с рекомендуемыми вариантами использования механизма композиции с использованием существующих фабрик в модуле tff.aggregators .

Лучшие практики на примере

Мы собираемся подробно проиллюстрировать концепции tff.aggregators , реализуя простой пример задачи, и постепенно делать ее более общей. Еще один способ научиться - это посмотреть на реализацию существующих фабрик.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Вместо суммирования value примерная задача состоит в том, чтобы суммировать value * 2.0 а затем разделить сумму на 2.0 . Таким образом, результат агрегации математически эквивалентен прямому суммированию value и может рассматриваться как состоящий из трех частей: (1) масштабирование на клиентах (2) суммирование по клиентам (3) масштабирование на сервере.

Следуя схеме, описанной выше, логика будет реализована как подкласс tff.aggregators.UnweightedAggregationFactory , который создает соответствующий tff.templates.AggregationProcess при value_type для агрегирования:

Минимальная реализация

Для примера задачи необходимые вычисления всегда одни и те же, поэтому нет необходимости использовать состояние. Таким образом, он пуст и представлен как tff.federated_value((), tff.SERVER) . То же самое относится и к измерениям, пока.

Таким образом, минимальная реализация задачи следующая:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Все ли работает должным образом, можно проверить с помощью следующего кода:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Состояние и измерения

Сохранение состояния широко используется в TFF для представления вычислений, которые, как ожидается, будут выполняться итеративно и изменяться с каждой итерацией. Например, состояние обучающего вычисления содержит веса обучаемой модели.

Чтобы проиллюстрировать, как использовать состояние в вычислении агрегирования, мы изменим пример задачи. Вместо умножения value на 2.0 мы умножаем его на индекс итерации - количество раз, когда агрегация выполнялась.

Для этого нам нужен способ отслеживать индекс итерации, что достигается с помощью концепции состояния. В initialize_fn вместо создания пустого состояния мы инициализируем состояние скалярным нулем. Затем состояние можно использовать в next_fn в три этапа: (1) увеличение на 1.0 , (2) использование для умножения value и (3) возврат в качестве нового обновленного состояния.

Как только это будет сделано, вы можете заметить: Но точно такой же код, как указано выше, можно использовать для проверки того, что все работает должным образом. Как я узнаю, что что-то действительно изменилось?

Хороший вопрос! Здесь становится полезной концепция измерений. Как правило, измерения могут сообщать любое значение, относящееся к однократному выполнению next функции, которая может использоваться для мониторинга. В данном случае это может быть summed_value из предыдущего примера. То есть значение до шага «масштабирования», которое должно зависеть от индекса итерации. Опять же, это не обязательно полезно на практике, но иллюстрирует соответствующий механизм.

Таким образом, ответ на задачу с сохранением состояния выглядит следующим образом:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Обратите внимание, что state которое входит в next_fn качестве входных данных, размещается на сервере. Чтобы использовать его на клиентах, его сначала необходимо передать, что достигается с tff.federated_broadcast оператора tff.federated_broadcast .

Чтобы убедиться, что все работает так, как ожидалось, теперь мы можем посмотреть client_data measurements , которые должны отличаться в каждом раунде выполнения, даже если они выполняются с одними и client_data же client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Структурированные типы

Веса модели для модели, обученной федеративному обучению, обычно представлены в виде набора тензоров, а не одного тензора. В TFF это представлено как tff.StructType и обычно полезные фабрики агрегирования должны иметь возможность принимать структурированные типы.

Однако в приведенных выше примерах мы работали только с объектом tff.TensorType . Если мы попытаемся использовать предыдущую фабрику для создания процесса агрегации с tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , мы получим странную ошибку, потому что TensorFlow попытается перемножить tf.Tensor и list .

Проблема в том, что вместо умножения структуры тензоров на константу нам нужно умножить каждый тензор в структуре на константу. Обычное решение этой проблемы - использовать модуль tf.nest внутри созданных tff.tf_computation .

Таким образом, версия предыдущей ExampleTaskFactory совместимая со структурированными типами, выглядит следующим образом:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

В этом примере подчеркивается шаблон, которому может быть полезно следовать при структурировании кода TFF. Когда не tff.tf_computation очень простые операции, код становится более разборчивым, когда tff.tf_computation которые будут использоваться в качестве строительных блоков внутри tff.federated_computation , создаются в отдельном месте. Внутри tff.federated_computation эти строительные блоки соединяются только с помощью внутренних операторов.

Чтобы убедиться, что он работает должным образом:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Внутренние скопления

Последний шаг - необязательно разрешить делегирование фактического агрегирования другим фабрикам, чтобы упростить составление различных методов агрегирования.

Это достигается путем создания необязательного аргумента inner_factory в конструкторе нашей ExampleTaskFactory . Если не указано иное, используется tff.aggregators.SumFactory , который применяет оператор tff.federated_sum использованный непосредственно в предыдущем разделе.

Когда вызывается create , мы можем сначала вызвать create из inner_factory чтобы создать внутренний процесс агрегации с тем же значением value_type .

Состояние нашего процесса, возвращаемое initialize_fn представляет собой композицию из двух частей: состояния, созданного этим процессом, и состояния только что созданного внутреннего процесса.

Реализация next_fn отличается тем, что фактическая агрегация делегируется next функции внутреннего процесса, и тем, как составляется окончательный результат. Состояние снова состоит из «этого» и «внутреннего» состояния, а измерения составляются аналогично OrderedDict .

Ниже представлена ​​реализация такого шаблона.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

При делегировании функции inner_process.next структура возврата, которую мы получаем, представляет собой tff.templates.MeasuredProcessOutput с теми же тремя полями - state , result и measurements . При создании общей структуры возврата составного процесса агрегации поля state и measurements обычно должны быть составлены и возвращены вместе. Напротив, поле result соответствует агрегируемому значению и вместо этого "проходит через" составленное агрегирование.

Объект state следует рассматривать как деталь реализации фабрики, и, следовательно, композиция может иметь любую структуру. Однако measurements соответствуют значениям, которые в какой-то момент будут сообщены пользователю. Поэтому мы рекомендуем использовать OrderedDict с составным именованием, чтобы было ясно, откуда в композиции берется сообщаемая метрика.

Также обратите внимание на использование оператора tff.federated_zip . Объект state управляемый созданным процессом, должен иметь tff.FederatedType . Если бы вместо этого мы вернули (this_state, inner_state) в initialize_fn , его сигнатура типа возвращаемого значения была бы tff.StructType содержащей кортеж из tff.FederatedType s. Использование tff.federated_zip «поднимает» tff.FederatedType на верхний уровень. Это аналогично используется в next_fn при подготовке состояния и измерений к возврату.

Наконец, мы можем увидеть, как это можно использовать с внутренней агрегацией по умолчанию:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... и с другой внутренней агрегацией. Например, ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Резюме

В этом руководстве мы объяснили лучшие практики, которым нужно следовать, чтобы создать универсальный строительный блок агрегации, представленный как фабрика агрегации. Общность достигается за счет замысла дизайна двумя способами:

  1. Параметризованные вычисления. Агрегация - это независимый строительный блок, который можно подключить к другим модулям TFF, предназначенным для работы с tff.aggregators для параметризации их необходимой агрегации, например tff.learning.build_federated_averaging_process .
  2. Агрегационный состав. Строительный блок агрегирования может быть составлен с другими строительными блоками агрегирования для создания более сложных составных агрегатов.