Salva la data! Google I / O ritorna dal 18 al 20 maggio Registrati ora
Questa pagina è stata tradotta dall'API Cloud Translation.
Switch to English

Implementazione di aggregazioni personalizzate

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza sorgente su GitHub Scarica taccuino

In questo tutorial, spieghiamo i principi di progettazione alla base del modulo tff.aggregators e le best practice per l'implementazione dell'aggregazione personalizzata di valori dai client al server.

Prerequisiti. Questo tutorial presuppone che tu abbia già familiarità con i concetti di base di Federated Core come i posizionamenti ( tff.SERVER , tff.CLIENTS ), il modo in cui TFF rappresenta i calcoli ( tff.tf_computation , tff.federated_computation ) e le loro firme di tipo.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Riepilogo del progetto

In TFF, "aggregazione" si riferisce allo spostamento di un insieme di valori su tff.CLIENTS per produrre un valore aggregato dello stesso tipo su tff.SERVER . In altre parole, non è necessario che il valore di ogni singolo cliente sia disponibile. Ad esempio, nell'apprendimento federato, viene calcolata la media degli aggiornamenti del modello client per ottenere un aggiornamento del modello aggregato da applicare al modello globale sul server.

Oltre agli operatori che raggiungono questo obiettivo come tff.federated_sum , TFF fornisce tff.templates.AggregationProcess (un processo con stato ) che formalizza la firma del tipo per il calcolo dell'aggregazione in modo che possa generalizzare a forme più complesse rispetto a una semplice somma.

I principali componenti dei tff.aggregators modulo sono fabbriche per la creazione del AggregationProcess , che sono progettati per essere blocchi generalmente utili e sostituibili costruzione di TFF in due aspetti:

  1. Calcoli parametrizzati. L'aggregazione è un blocco tff.aggregators indipendente che può essere collegato ad altri moduli TFF progettati per funzionare con tff.aggregators per parametrizzare la loro aggregazione necessaria.

Esempio:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Composizione dell'aggregazione. Un building block di aggregazione può essere composto con altri building block di aggregazione per creare aggregazioni composite più complesse.

Esempio:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Il resto di questo tutorial spiega come vengono raggiunti questi due obiettivi.

Processo di aggregazione

Per prima cosa riassumiamo il tff.templates.AggregationProcess e lo seguiamo con il pattern factory per la sua creazione.

tff.templates.AggregationProcess è un tff.templates.MeasuredProcess con firme di tipo specificate per l'aggregazione. In particolare, le funzioni initialize e next hanno le seguenti firme di tipo:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Lo stato (di tipo state_type ) deve essere posizionato sul server. La funzione next prende come argomento di input lo stato e un valore da aggregare (di tipo value_type ) posto ai client. * Indica altri argomenti di input opzionali, ad esempio i pesi in una media ponderata. Restituisce un oggetto di stato aggiornato, il valore aggregato dello stesso tipo posizionato sul server e alcune misurazioni.

Notare che sia lo stato da passare tra le esecuzioni della funzione next , sia le misurazioni riportate intese a riportare qualsiasi informazione dipendente da una specifica esecuzione della funzione next , possono essere vuote. Tuttavia, devono essere specificati esplicitamente affinché altre parti del TFF abbiano un contratto chiaro da seguire.

Altri moduli TFF, ad esempio gli aggiornamenti del modello in tff.learning , dovrebbero utilizzare tff.templates.AggregationProcess per parametrizzare il modo in cui i valori vengono aggregati. Tuttavia, quali sono esattamente i valori aggregati e quali sono le loro firme di tipo, dipende da altri dettagli del modello da addestrare e dall'algoritmo di apprendimento utilizzato per farlo.

Per rendere l'aggregazione indipendente dagli altri aspetti dei calcoli, utilizziamo il pattern factory: creiamo il tff.templates.AggregationProcess appropriato una volta disponibili le firme di tipo rilevanti degli oggetti da aggregare, invocando il metodo create della factory. La gestione diretta del processo di aggregazione è quindi necessaria solo per gli autori di biblioteche, che sono responsabili di questa creazione.

Fabbriche di processi di aggregazione

Esistono due classi factory di base astratte per l'aggregazione non ponderata e ponderata. Il loro metodo create accetta firme di tipo di valore da aggregare e restituisce un tff.templates.AggregationProcess per l'aggregazione di tali valori.

Il processo creato da tff.aggregators.UnweightedAggregationFactory accetta due argomenti di input: (1) stato sul server e (2) valore del tipo specificato value_type .

Un'implementazione di esempio è tff.aggregators.SumFactory .

Il processo creato da tff.aggregators.WeightedAggregationFactory accetta tre argomenti di input: (1) state sul server, (2) valore del tipo specificato value_type e (3) weight del tipo weight_type , come specificato dall'utente della factory quando richiama il suo metodo di create .

Un'implementazione di esempio è tff.aggregators.MeanFactory che calcola una media ponderata.

Il modello di fabbrica è il modo in cui raggiungiamo il primo obiettivo sopra indicato; che l'aggregazione è un elemento costitutivo indipendente. Ad esempio, quando si modificano le variabili del modello addestrabili, non è necessario che un'aggregazione complessa cambi; la factory che lo rappresenta verrà invocata con una firma di tipo diverso se utilizzata da un metodo come tff.learning.build_federated_averaging_process .

Composizioni

Ricordate che un processo di aggregazione generale può incapsulare (a) una pre-elaborazione dei valori sui client, (b) lo spostamento dei valori dal client al server e (c) una certa post-elaborazione del valore aggregato sul server. Il secondo obiettivo sopra indicato, la composizione dell'aggregazione, si realizza all'interno del modulo tff.aggregators strutturando l'implementazione delle aggregation factories in modo tale che la parte (b) possa essere delegata ad un'altra aggregation factory.

Piuttosto che implementare tutta la logica necessaria all'interno di una singola classe factory, le implementazioni sono per impostazione predefinita focalizzate su un singolo aspetto rilevante per l'aggregazione. Quando necessario, questo modello ci consente quindi di sostituire gli elementi costitutivi uno alla volta.

Un esempio è il weighted tff.aggregators.MeanFactory . La sua implementazione moltiplica i valori e i pesi forniti sui client, quindi somma sia i valori ponderati che i pesi in modo indipendente, quindi divide la somma dei valori ponderati per la somma dei pesi sul server. Invece di implementare le sommatorie utilizzando direttamente l'operatore tff.federated_sum , la somma viene delegata a due istanze di tff.aggregators.SumFactory .

Tale struttura consente di sostituire le due somme predefinite con fabbriche diverse, che realizzano la somma in modo diverso. Ad esempio, un tff.aggregators.SecureSumFactory o un'implementazione personalizzata di tff.aggregators.UnweightedAggregationFactory . Al contrario, time, tff.aggregators.MeanFactory può essere esso stesso un'aggregazione interna di un'altra factory come tff.aggregators.clipping_factory , se i valori devono essere ritagliati prima della media.

Vedere il precedente tutorial sulle aggregazioni consigliate da Tuning per l'apprendimento per gli usi consigliati del meccanismo di composizione utilizzando le fabbriche esistenti nel modulo tff.aggregators .

Buone pratiche con l'esempio

tff.aggregators in dettaglio i concetti di tff.aggregators implementando un semplice task di esempio e renderlo progressivamente più generale. Un altro modo per imparare è guardare all'implementazione delle fabbriche esistenti.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Invece di sommare il value , l'attività di esempio consiste nel sommare il value * 2.0 e quindi dividere la somma per 2.0 . Il risultato dell'aggregazione è quindi matematicamente equivalente alla somma diretta del value e potrebbe essere pensato come composto da tre parti: (1) ridimensionamento in corrispondenza dei client (2) somma tra client (3) annullamento in scala sul server.

Seguendo il progetto spiegato sopra, la logica verrà implementata come una sottoclasse di tff.aggregators.UnweightedAggregationFactory , che crea un appropriato tff.templates.AggregationProcess quando viene fornito un value_type da aggregare:

Implementazione minima

Per l'attività di esempio, i calcoli necessari sono sempre gli stessi, quindi non è necessario utilizzare lo stato. È quindi vuoto e rappresentato come tff.federated_value((), tff.SERVER) . Lo stesso vale per le misurazioni, per ora.

L'implementazione minima dell'attività è quindi la seguente:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Se tutto funziona come previsto può essere verificato con il codice seguente:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Statefulness e misurazioni

Lo statefulness è ampiamente utilizzato in TFF per rappresentare i calcoli che dovrebbero essere eseguiti in modo iterativo e cambiare con ogni iterazione. Ad esempio, lo stato di un calcolo di apprendimento contiene i pesi del modello da apprendere.

Per illustrare come utilizzare lo stato in un calcolo di aggregazione, modifichiamo l'attività di esempio. Invece di moltiplicare il value per 2.0 , lo moltiplichiamo per l'indice di iterazione, il numero di volte in cui l'aggregazione è stata eseguita.

Per fare ciò, abbiamo bisogno di un modo per tenere traccia dell'indice di iterazione, che si ottiene attraverso il concetto di stato. In initialize_fn , invece di creare uno stato vuoto, inizializziamo lo stato in modo che sia uno zero scalare. Quindi, lo stato può essere utilizzato in next_fn in tre passaggi: (1) incrementa di 1.0 , (2) usa per moltiplicare il value e (3) ritorna come nuovo stato aggiornato.

Una volta fatto ciò, potresti notare: ma è possibile utilizzare esattamente lo stesso codice di cui sopra per verificare che tutto funzioni come previsto. Come faccio a sapere che qualcosa è effettivamente cambiato?

Buona domanda! È qui che diventa utile il concetto di misurazione. In generale, le misurazioni possono riportare qualsiasi valore rilevante per una singola esecuzione della funzione next , che potrebbe essere utilizzato per il monitoraggio. In questo caso, può essere il summed_value dell'esempio precedente. Ovvero, il valore prima del passaggio "unscaling", che dovrebbe dipendere dall'indice di iterazione. Ancora una volta, questo non è necessariamente utile nella pratica, ma illustra il meccanismo pertinente.

La risposta stateful al compito appare quindi come segue:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Notare che lo state che entra in next_fn come input è posto sul server. Per poterlo utilizzare presso i client, deve prima essere comunicato, che si ottiene utilizzando l'operatore tff.federated_broadcast .

Per verificare che tutto funzioni come previsto, ora possiamo guardare le measurements riportate, che dovrebbero essere diverse a ogni round di esecuzione, anche se eseguite con lo stesso client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Tipi strutturati

I pesi del modello di un modello addestrato nell'apprendimento federato sono solitamente rappresentati come una raccolta di tensori, piuttosto che un singolo tensore. In TFF, questo è rappresentato come tff.StructType e le factory di aggregazione generalmente utili devono essere in grado di accettare i tipi strutturati.

Tuttavia, negli esempi precedenti, abbiamo lavorato solo con un oggetto tff.TensorType . Se proviamo a utilizzare la factory precedente per creare il processo di aggregazione con un tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , otteniamo uno strano errore perché TensorFlow proverà a moltiplicare un tf.Tensor e un list .

Il problema è che invece di moltiplicare la struttura dei tensori per una costante, dobbiamo moltiplicare ogni tensore nella struttura per una costante. La solita soluzione a questo problema è usare il modulo tf.nest all'interno dei tff.tf_computation creati.

La versione del precedente ExampleTaskFactory compatibile con i tipi strutturati appare quindi come segue:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Questo esempio evidenzia uno schema che può essere utile seguire durante la strutturazione del codice TFF. Quando non si ha a che fare con operazioni molto semplici, il codice diventa più leggibile quando i tff.tf_computation che verranno usati come blocchi di costruzione all'interno di un tff.federated_computation vengono creati in un luogo separato. All'interno di tff.federated_computation , questi blocchi di costruzione sono collegati solo utilizzando gli operatori intrinseci.

Per verificare che funzioni come previsto:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Aggregazioni interne

Il passaggio finale consiste nell'abilitare facoltativamente la delega dell'effettiva aggregazione ad altre fabbriche, al fine di consentire una facile composizione di diverse tecniche di aggregazione.

Ciò si ottiene creando un argomento inner_factory opzionale nel costruttore del nostro ExampleTaskFactory . Se non specificato, viene utilizzato tff.aggregators.SumFactory , che applica l'operatore tff.federated_sum utilizzato direttamente nella sezione precedente.

Quando viene chiamato create , possiamo prima chiamare create di inner_factory per creare il processo di aggregazione interno con lo stesso value_type .

Lo stato del nostro processo restituito da initialize_fn è una composizione di due parti: lo stato creato da "questo" processo e lo stato del processo interno appena creato.

L'implementazione di next_fn differisce in quanto l'aggregazione effettiva è delegata alla funzione next del processo interno e in come è composto l'output finale. Lo stato è di nuovo composto da "questo" e da uno stato "interno" e le misurazioni sono composte in modo simile a OrderedDict .

Quanto segue è un'implementazione di tale modello.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Quando si delega alla funzione inner_process.next , la struttura di ritorno che otteniamo è un tff.templates.MeasuredProcessOutput , con gli stessi tre campi: state , result e measurements . Quando si crea la struttura complessiva dei rendimenti del processo di aggregazione composta, i campi dello state e delle measurements dovrebbero essere generalmente composti e restituiti insieme. Al contrario, il campo del result corrisponde al valore che viene aggregato e invece "scorre" attraverso l'aggregazione composta.

L'oggetto state dovrebbe essere visto come un dettaglio di implementazione della fabbrica, e quindi la composizione potrebbe essere di qualsiasi struttura. Tuttavia, le measurements corrispondono a valori da segnalare all'utente ad un certo punto. Pertanto, si consiglia di utilizzare OrderedDict , con nomi composti in modo tale che sia chiaro da dove proviene una metrica riportata in una composizione.

Notare anche l'uso dell'operatore tff.federated_zip . L'oggetto di state controllato dal processo creato dovrebbe essere un tff.FederatedType . Se invece avessimo restituito (this_state, inner_state) in initialize_fn , la firma del tipo restituito sarebbe un tff.StructType contenente una tupla 2 di tff.FederatedType s. L'uso di tff.federated_zip "solleva" il tff.FederatedType al livello più alto. Viene utilizzato in modo simile in next_fn quando si prepara lo stato e le misurazioni da restituire.

Infine, possiamo vedere come questo può essere utilizzato con l'aggregazione interna predefinita:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... e con una diversa aggregazione interna. Ad esempio, un ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Sommario

In questo tutorial, abbiamo spiegato le migliori pratiche da seguire per creare un blocco predefinito di aggregazione per scopi generici, rappresentato come una fabbrica di aggregazione. La generalità deriva dall'intento progettuale in due modi:

  1. Calcoli parametrizzati. L'aggregazione è un blocco tff.aggregators indipendente che può essere inserito in altri moduli TFF progettati per funzionare con tff.aggregators per parametrizzare la loro aggregazione necessaria, come tff.learning.build_federated_averaging_process .
  2. Composizione dell'aggregazione. Un building block di aggregazione può essere composto con altri building block di aggregazione per creare aggregazioni composite più complesse.