Implementazione di aggregazioni personalizzate

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza la fonte su GitHub Scarica taccuino

In questo tutorial, spieghiamo i principi di progettazione dietro la tff.aggregators modulo e best practice per l'implementazione un'aggregazione personalizzata di valori dai client ai server.

Prerequisiti. Questo tutorial presume che già familiarità con i concetti di base della Federated Nucleo come tirocini ( tff.SERVER , tff.CLIENTS ), come TFF rappresenta calcoli ( tff.tf_computation , tff.federated_computation ) e le loro firme tipo.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Riepilogo del progetto

In TFF, "aggregazione" si riferisce al movimento di un insieme di valori su tff.CLIENTS per produrre un valore complessivo dello stesso tipo su tff.SERVER . Cioè, non è necessario che ogni singolo valore del cliente sia disponibile. Ad esempio, nell'apprendimento federato, viene calcolata la media degli aggiornamenti del modello client per ottenere un aggiornamento del modello aggregato da applicare al modello globale sul server.

Oltre agli operatori raggiungere tale obiettivo come tff.federated_sum , TFF fornisce tff.templates.AggregationProcess (un processo con stato ) che formalizza la firma tipo per aggregazione calcolo così può generalizzare a forme più complesse di una semplice somma.

I principali componenti dei tff.aggregators modulo sono fabbriche per la creazione del AggregationProcess , che sono progettati per essere blocchi generalmente utili e sostituibili costruzione di TFF in due aspetti:

  1. Calcoli parametrizzati. Aggregazione è un elemento indipendente che può essere collegato ad altri moduli TFF progettati per lavorare con tff.aggregators parametrizzare loro aggregazione necessario.

Esempio:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Composizione di aggregazione. Un building block di aggregazione può essere composto con altri building block di aggregazione per creare aggregazioni composite più complesse.

Esempio:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Il resto di questo tutorial spiega come vengono raggiunti questi due obiettivi.

Processo di aggregazione

In primo luogo abbiamo riassumiamo la tff.templates.AggregationProcess , e seguiamo con il modello di fabbrica per la sua creazione.

Il tff.templates.AggregationProcess è un tff.templates.MeasuredProcess con le firme di tipo specificati per l'aggregazione. In particolare, l' initialize e next funzioni hanno le firme di tipo seguente:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Lo stato (di tipo state_type ) deve essere posizionato sul server. La next funzione prende come parametro di input lo stato e un valore per essere aggregati (di tipo value_type ) posto al client. I * mezzi facoltativi altri argomenti di input, ad esempio i pesi in media pesata. Restituisce un oggetto di stato aggiornato, il valore aggregato dello stesso tipo posizionato sul server e alcune misurazioni.

Notare che sia lo stato da passare tra esecuzioni della next funzione, e le misure riportate destinato a segnalare qualsiasi informazione a seconda di una specifica esecuzione del next funzione, può essere vuoto. Tuttavia, devono essere specificati esplicitamente affinché altre parti di TFF abbiano un contratto chiaro da seguire.

Altri moduli TFF, per esempio gli aggiornamenti del modello nel tff.learning , sono tenuti a utilizzare il tff.templates.AggregationProcess di parametrizzare come i valori vengono aggregati. Tuttavia, quali sono esattamente i valori aggregati e quali sono le loro firme di tipo, dipende da altri dettagli del modello in fase di training e dall'algoritmo di apprendimento utilizzato per farlo.

Per rendere l'aggregazione indipendente dagli altri aspetti di calcoli, si usa il modello di fabbrica - creiamo l'appropriato tff.templates.AggregationProcess una volta le firme di tipo rilevanti di oggetti per essere aggregati sono disponibili, richiamando il create modalità di fabbrica. La gestione diretta del processo di aggregazione è quindi necessaria solo per gli autori della biblioteca, che sono responsabili di questa creazione.

Fabbriche di processo di aggregazione

Sono disponibili due classi di fabbrica di base astratte per l'aggregazione non ponderata e ponderata. Il loro create metodo prende tipo firme di valore per essere cumulati e restituisce un tff.templates.AggregationProcess per l'aggregazione di tali valori.

Il processo creato da tff.aggregators.UnweightedAggregationFactory prende due argomenti di input: (1) sul server di stato e (2) il valore del tipo specificato value_type .

Un esempio di implementazione è tff.aggregators.SumFactory .

Il processo creato da tff.aggregators.WeightedAggregationFactory accetta tre parametri di input: (1) lo stato sul server, (2) il valore del tipo specificato value_type e (3) il peso del tipo weight_type , come specificato dall'utente della fabbrica quando si richiama il suo create metodo.

Un esempio di implementazione è tff.aggregators.MeanFactory che calcola una media ponderata.

Il modello di fabbrica è il modo in cui raggiungiamo il primo obiettivo sopra indicato; quell'aggregazione è un blocco costitutivo indipendente. Ad esempio, quando si modificano le variabili del modello addestrabili, un'aggregazione complessa non deve necessariamente cambiare; la fabbrica rappresenta verrà richiamato con una firma di tipo diverso quando viene utilizzato da un metodo come tff.learning.build_federated_averaging_process .

Composizioni

Ricordiamo che un processo di aggregazione generale può incapsulare (a) un po' di pre-elaborazione dei valori sui client, (b) lo spostamento dei valori da client a server e (c) un po' di post-elaborazione del valore aggregato sul server. Il secondo obiettivo sopra indicato, la composizione aggregazione, si realizza all'interno delle tff.aggregators modulo strutturando l'attuazione delle fabbriche aggregazione tali che la parte (b) può essere delegato a un'altra fabbrica aggregazione.

Invece di implementare tutta la logica necessaria all'interno di una singola classe di fabbrica, le implementazioni sono focalizzate per impostazione predefinita su un singolo aspetto rilevante per l'aggregazione. Quando necessario, questo modello ci consente quindi di sostituire i blocchi costitutivi uno alla volta.

Un esempio è la ponderata tff.aggregators.MeanFactory . La sua implementazione moltiplica i valori ei pesi forniti sui client, quindi somma sia i valori ponderati che i pesi in modo indipendente, quindi divide la somma dei valori ponderati per la somma dei pesi sul server. Invece di implementare le sommatorie utilizzando direttamente l' tff.federated_sum operatore, la sommatoria è delegata a due istanze di tff.aggregators.SumFactory .

Tale struttura permette di sostituire le due somme di default da fabbriche diverse, che realizzano la somma in modo diverso. Ad esempio, un tff.aggregators.SecureSumFactory , o di un'implementazione personalizzata del tff.aggregators.UnweightedAggregationFactory . Viceversa, tempo, tff.aggregators.MeanFactory può essa stessa essere un'aggregazione interna di un'altra fabbrica come tff.aggregators.clipping_factory , se i valori devono essere fermato prima media.

Vedere la precedente sintonia raccomandato aggregazioni per l'apprendimento tutorial per usi receommended del meccanismo composizione utilizzando fabbriche esistenti nel tff.aggregators modulo.

Le migliori pratiche con l'esempio

Stiamo per illustrare i tff.aggregators concetti nel dettaglio l'attuazione di un compito semplice esempio, e rendere progressivamente più generale. Un altro modo per imparare è guardare all'implementazione delle fabbriche esistenti.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Invece di sommare value , il compito esempio per sommare value * 2.0 e quindi dividere la somma da 2.0 . Il risultato di aggregazione è quindi matematicamente equivalenti alla somma direttamente il value , e può essere pensato come composto da tre parti: (1) scala a clienti (2) sommando tutti i clienti (3) unscaling sul server.

Seguendo il disegno spiegato sopra, la logica viene implementato come una sottoclasse di tff.aggregators.UnweightedAggregationFactory , che crea opportuno tff.templates.AggregationProcess quando somministrato un value_type di aggregato:

Implementazione minima

Per l'attività di esempio, i calcoli necessari sono sempre gli stessi, quindi non è necessario utilizzare lo stato. Viene così vuotato e rappresentato come tff.federated_value((), tff.SERVER) . Lo stesso vale per le misurazioni, per ora.

L'attuazione minima del compito è quindi la seguente:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Se tutto funziona come previsto può essere verificato con il seguente codice:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Stato e misurazioni

Lo statefulness è ampiamente utilizzato in TFF per rappresentare calcoli che dovrebbero essere eseguiti in modo iterativo e che cambiano ad ogni iterazione. Ad esempio, lo stato di un calcolo di apprendimento contiene i pesi del modello in fase di apprendimento.

Per illustrare come utilizzare lo stato in un calcolo di aggregazione, modifichiamo l'attività di esempio. Invece di moltiplicare value per 2.0 , lo moltiplichiamo per l'indice di iterazione - il numero di volte in cui l'aggregazione è stato eseguito.

Per fare ciò, abbiamo bisogno di un modo per tenere traccia dell'indice di iterazione, che si ottiene attraverso il concetto di stato. Nel initialize_fn , invece di creare uno stato di vuoto, inizializziamo lo stato di essere uno zero scalare. Poi, stato può essere utilizzato nel next_fn in tre fasi: (1) incremento da 1.0 , (2) di uso per moltiplicare value , e (3) di ritorno come il nuovo stato aggiornate.

Una volta fatto questo, si può notare: Ma esattamente lo stesso codice di cui sopra può essere utilizzato per verificare tutte le opere come previsto. Come faccio a sapere che qualcosa è effettivamente cambiato?

Buona domanda! È qui che il concetto di misurazioni diventa utile. In generale, le misurazioni possono segnalare qualsiasi valore rilevante per una singola esecuzione della next funzione, che potrebbe essere utilizzato per il monitoraggio. In questo caso, può essere il summed_value dell'esempio precedente. Ovvero, il valore prima del passaggio di "ridimensionamento", che dovrebbe dipendere dall'indice di iterazione. Ancora una volta, questo non è necessariamente utile nella pratica, ma illustra il relativo meccanismo.

La risposta con stato all'attività appare quindi come segue:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Si noti che la state che entra in next_fn come ingresso è posto sul server. Per poter utilizzare a clienti, deve prima essere comunicati, che è ottenuto utilizzando la tff.federated_broadcast operatore.

Per verificare tutte le opere come previsto, possiamo ora guardare ai segnalati measurements , che dovrebbero essere diverso con ogni turno di esecuzione, anche se correre con la stessa client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Tipi strutturati

I pesi del modello di un modello addestrato nell'apprendimento federato sono generalmente rappresentati come una raccolta di tensori, piuttosto che un singolo tensore. In TFF, questo viene rappresentato come tff.StructType e generalmente utili fabbriche di aggregazione devono essere in grado di accettare i tipi strutturati.

Tuttavia, negli esempi precedenti, abbiamo lavorato solo con una tff.TensorType oggetto. Se cerchiamo di utilizzare la fabbrica precedente per creare il processo di aggregazione con un tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , otteniamo un errore di strano perché tensorflow cercherà di moltiplicare un tf.Tensor e una list .

Il problema è che invece di moltiplicare la struttura dei tensori per una costante, occorre moltiplicare ogni tensore nella struttura per una costante. La soluzione più comune a questo problema è utilizzare il tf.nest all'interno modulo delle creati tff.tf_computation s.

La versione del precedente ExampleTaskFactory compatibile con i tipi strutturati appare quindi come segue:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Questo esempio evidenzia uno schema che può essere utile seguire quando si struttura il codice TFF. Quando non si tratta operazioni molto semplici, il codice diventa più leggibile quando i tff.tf_computation s che verranno utilizzati come blocchi di costruzione all'interno di un tff.federated_computation vengono creati in un luogo separato. All'interno del tff.federated_computation , questi elementi sono collegati solo con operatori intrinseci.

Per verificare che funzioni come previsto:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Aggregazioni interne

Il passaggio finale consiste nell'abilitare facoltativamente la delega dell'aggregazione effettiva ad altri stabilimenti, al fine di consentire una facile composizione delle diverse tecniche di aggregazione.

Questo risultato è ottenuto attraverso la creazione di un optional inner_factory argomento nel costruttore della nostra ExampleTaskFactory . Se non specificato, tff.aggregators.SumFactory è usato, che applica il tff.federated_sum operatore usato direttamente nella sezione precedente.

Quando create viene chiamato, possiamo chiamare prima create del inner_factory per creare il processo di aggregazione interno con lo stesso value_type .

Lo stato del nostro processo restituito da initialize_fn è una composizione di due parti: lo stato creato da "questo" processo, e lo stato del processo interiore appena creato.

L'attuazione dei next_fn differisce dal fatto che l'aggregazione effettiva è delegato alla next funzione del processo interno, e nel modo in cui l'output finale è composto. Lo stato è ancora composto di "questo" stato e "interno", e le misurazioni sono composti in modo simile come OrderedDict .

Quella che segue è un'implementazione di tale modello.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Quando delegando al inner_process.next funzioni, la struttura di ritorno che otteniamo è un tff.templates.MeasuredProcessOutput , con gli stessi tre campi - state , result e measurements . Quando si crea la struttura complessiva ritorno del processo di aggregazione composta, le state e measurements campi devono essere generalmente composti e restituiti insieme. Al contrario, i result corrisponde campo al valore da aggregati invece "scorre attraverso" l'aggregazione composta.

Lo state oggetto deve essere visto come un dettaglio di implementazione della fabbrica, e quindi la composizione potrebbe essere di qualsiasi struttura. Tuttavia, measurements corrispondono a valori da segnalare all'utente ad un certo punto. Pertanto, si consiglia di utilizzare OrderedDict , con composta nominando tale che sarebbe chiaro dove in una composizione fa un presentate metrica viene da.

Si noti anche l'uso del tff.federated_zip dell'operatore. Lo state oggetto contolled dal processo creato sia un tff.FederatedType . Se avessimo invece tornati (this_state, inner_state) nel initialize_fn , la sua firma tipo di ritorno sarebbe un tff.StructType contenente un 2-tupla di tff.FederatedType s. L'uso di tff.federated_zip "ascensori" il tff.FederatedType al livello superiore. Questo è analogamente utilizzato nella next_fn nella preparazione dello stato e misurazioni da restituire.

Infine, possiamo vedere come questo può essere utilizzato con l'aggregazione interna predefinita:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... e con una diversa aggregazione interna. Ad esempio, un ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Riepilogo

In questo tutorial, abbiamo spiegato le migliori pratiche da seguire per creare un blocco di aggregazione generico, rappresentato come una fabbrica di aggregazione. La generalità passa attraverso l'intento progettuale in due modi:

  1. Calcoli parametrizzati. Aggregazione è un elemento indipendente che può essere collegato ad altri moduli TFF progettati per lavorare con tff.aggregators parametrizzare loro aggregazione necessario, come tff.learning.build_federated_averaging_process .
  2. Composizione di aggregazione. Un building block di aggregazione può essere composto con altri building block di aggregazione per creare aggregazioni composite più complesse.