Merken Sie den Termin vor! Google I / O kehrt vom 18. bis 20. Mai zurück Registrieren Sie sich jetzt
Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Implementieren benutzerdefinierter Aggregationen

Ansicht auf TensorFlow.org In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

In diesem Lernprogramm erläutern wir die Entwurfsprinzipien hinter dem Modul tff.aggregators und bewährte Methoden für die Implementierung einer benutzerdefinierten Aggregation von Werten von Clients zu Server.

Voraussetzungen. In diesem Lernprogramm wird davon tff.SERVER , tff.CLIENTS Sie bereits mit grundlegenden Konzepten von Federated Core vertraut sind, z. B. Platzierungen ( tff.SERVER , tff.CLIENTS ), wie TFF Berechnungen ( tff.tf_computation , tff.federated_computation ) und deren tff.federated_computation .

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Entwurfszusammenfassung

In TFF bezieht sich "Aggregation" auf die Bewegung eines Satzes von Werten in tff.CLIENTS , um einen Aggregatwert desselben Typs in tff.SERVER zu tff.SERVER . Das heißt, jeder einzelne Kundenwert muss nicht verfügbar sein. Beispielsweise werden beim Verbundlernen die Aktualisierungen des Clientmodells gemittelt, um eine aggregierte Modellaktualisierung zu erhalten, die auf das globale Modell auf dem Server angewendet werden kann.

Zusätzlich zu Operatoren, die dieses Ziel erreichen, wie z. B. tff.federated_sum , bietet TFF tff.templates.AggregationProcess (einen statusbehafteten Prozess ), der die tff.templates.AggregationProcess für die Aggregationsberechnung formalisiert, damit sie auf komplexere Formen als eine einfache Summe verallgemeinert werden kann.

Die Hauptkomponenten des Moduls tff.aggregators sind Fabriken zur Erstellung des AggregationProcess , die in zweierlei Hinsicht als allgemein nützliche und austauschbare Bausteine ​​von TFF konzipiert sind:

  1. Parametrisierte Berechnungen. Die Aggregation ist ein unabhängiger Baustein, der in andere TFF-Module tff.aggregators kann, die für die Zusammenarbeit mit tff.aggregators um deren erforderliche Aggregation zu parametrisieren.

Beispiel:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Aggregationszusammensetzung. Ein Aggregationsbaustein kann mit anderen Aggregationsbausteinen zusammengesetzt werden, um komplexere zusammengesetzte Aggregationen zu erstellen.

Beispiel:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Der Rest dieses Tutorials erklärt, wie diese beiden Ziele erreicht werden.

Aggregationsprozess

Wir fassen zuerst den tff.templates.AggregationProcess und folgen dem Factory-Muster für seine Erstellung.

Der tff.templates.AggregationProcess ist ein tff.templates.MeasuredProcess mit für die Aggregation angegebenen tff.templates.MeasuredProcess . Insbesondere haben die Funktionen initialize und next die folgenden Typensignaturen:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Der Status (vom Typ state_type ) muss auf dem Server abgelegt werden. Die next Funktion verwendet als Eingabeargument den Status und einen Wert, der aggregiert werden soll (vom Typ value_type ) und auf Clients platziert wird. Das * bedeutet optionale andere Eingabeargumente, z. B. Gewichte in einem gewichteten Mittelwert. Es gibt ein aktualisiertes Statusobjekt, den auf dem Server platzierten aggregierten Wert desselben Typs und einige Messungen zurück.

Beachten Sie, dass sowohl der Status, der zwischen den Ausführungen der next Funktion übergeben werden soll, als auch die gemeldeten Messungen, die Informationen in Abhängigkeit von einer bestimmten Ausführung der next Funktion melden sollen, leer sein können. Sie müssen jedoch ausdrücklich spezifiziert werden, damit andere Teile von TFF einen klaren Vertrag haben, dem sie folgen können.

Von anderen TFF-Modulen, beispielsweise den tff.learning in tff.learning , wird erwartet, dass sie den tff.templates.AggregationProcess , um die tff.templates.AggregationProcess von Werten zu parametrisieren. Wie genau die Werte aggregiert werden und welche Typensignaturen sie haben, hängt jedoch von anderen Details des trainierten Modells und dem dafür verwendeten Lernalgorithmus ab.

Um die Aggregation von den anderen Aspekten der Berechnungen unabhängig zu machen, verwenden wir das Factory-Muster. Wir erstellen den entsprechenden tff.templates.AggregationProcess sobald die relevanten tff.templates.AggregationProcess der zu aggregierenden Objekte verfügbar sind, indem wir die create Methode der Factory aufrufen. Die direkte Abwicklung des Aggregationsprozesses ist daher nur für Bibliotheksautoren erforderlich, die für diese Erstellung verantwortlich sind.

Aggregationsprozessfabriken

Es gibt zwei abstrakte Basisfactory-Klassen für ungewichtete und gewichtete Aggregation. Ihre Methode create verwendet tff.templates.AggregationProcess von Wert, die aggregiert werden sollen, und gibt einen tff.templates.AggregationProcess zur Aggregation solcher Werte zurück.

Der von tff.aggregators.UnweightedAggregationFactory erstellte Prozess tff.aggregators.UnweightedAggregationFactory zwei Eingabeargumente: (1) Status auf dem Server und (2) Wert des angegebenen Typs value_type .

Eine Beispielimplementierung ist tff.aggregators.SumFactory .

Der von tff.aggregators.WeightedAggregationFactory erstellte Prozess tff.aggregators.WeightedAggregationFactory drei Eingabeargumente: (1) Status auf dem Server, (2) Wert des angegebenen Typs value_type und (3) Gewicht des Typs weight_type , wie vom Benutzer der Factory beim Aufrufen der Methode create .

Eine Beispielimplementierung ist tff.aggregators.MeanFactory , die einen gewichteten Mittelwert berechnet.

Das Fabrikmuster ist, wie wir das oben angegebene erste Ziel erreichen; Diese Aggregation ist ein unabhängiger Baustein. Wenn Sie beispielsweise ändern, welche Modellvariablen trainierbar sind, muss sich eine komplexe Aggregation nicht unbedingt ändern. Die Factory, die sie darstellt, wird mit einer anderen tff.learning.build_federated_averaging_process aufgerufen, wenn sie von einer Methode wie tff.learning.build_federated_averaging_process .

Kompositionen

Es sei daran erinnert, dass ein allgemeiner Aggregationsprozess (a) eine gewisse Vorverarbeitung der Werte auf Clients, (b) eine Verschiebung von Werten von Client zu Server und (c) eine gewisse Nachverarbeitung des aggregierten Werts auf dem Server umfassen kann. Das oben angegebene zweite Ziel, die Aggregationszusammensetzung, wird innerhalb des Moduls tff.aggregators realisiert, indem die Implementierung der Aggregationsfabriken so strukturiert wird, dass Teil (b) an eine andere Aggregationsfabrik delegiert werden kann.

Anstatt die gesamte erforderliche Logik in einer einzelnen Factory-Klasse zu implementieren, konzentrieren sich die Implementierungen standardmäßig auf einen einzelnen Aspekt, der für die Aggregation relevant ist. Dieses Muster ermöglicht es uns dann bei Bedarf, die Bausteine ​​einzeln auszutauschen.

Ein Beispiel ist die gewichtete tff.aggregators.MeanFactory . Die Implementierung multipliziert bereitgestellte Werte und Gewichte auf Clients, summiert dann sowohl gewichtete Werte als auch Gewichte unabhängig voneinander und dividiert dann die Summe der gewichteten Werte durch die Summe der Gewichte auf dem Server. Anstatt die Summierungen direkt mit dem Operator tff.federated_sum , wird die Summierung an zwei Instanzen von tff.aggregators.SumFactory .

Eine solche Struktur ermöglicht es, die beiden Standard-Summierungen durch unterschiedliche Fabriken zu ersetzen, die die Summe unterschiedlich realisieren. Zum Beispiel eine tff.aggregators.SecureSumFactory oder eine benutzerdefinierte Implementierung der tff.aggregators.UnweightedAggregationFactory . Umgekehrt kann time, tff.aggregators.MeanFactory selbst eine innere Aggregation einer anderen Factory wie tff.aggregators.clipping_factory , wenn die Werte vor der Mittelwertbildung abgeschnitten werden sollen.

Informationen zur empfohlenen Verwendung des Kompositionsmechanismus unter Verwendung vorhandener Fabriken im Modul tff.aggregators Sie im vorherigen Tutorial zu empfohlenen Aggregationen für das Lernen .

Best Practices am Beispiel

Wir werden die Konzepte von tff.aggregators detailliert veranschaulichen, tff.aggregators wir eine einfache Beispielaufgabe implementieren und sie schrittweise allgemeiner gestalten. Eine andere Möglichkeit zu lernen besteht darin, die Implementierung bestehender Fabriken zu betrachten.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Anstatt den value summieren, besteht die Beispielaufgabe darin, den value * 2.0 zu summieren und dann die Summe durch 2.0 teilen. Das Aggregationsergebnis ist somit mathematisch äquivalent zur direkten Summierung des value und kann als aus drei Teilen bestehend angesehen werden: (1) Skalierung bei Clients (2) Summierung über Clients hinweg (3) Unskalierung beim Server.

Nach dem oben erläuterten Entwurf wird die Logik als Unterklasse von tff.aggregators.UnweightedAggregationFactory , wodurch geeignete tff.templates.AggregationProcess erstellt werden, wenn ein value_type zum Aggregieren angegeben wird:

Minimale Implementierung

Für die Beispielaufgabe sind die erforderlichen Berechnungen immer gleich, sodass die Verwendung des Status nicht erforderlich ist. Es ist also leer und wird als tff.federated_value((), tff.SERVER) . Gleiches gilt vorerst für Messungen.

Die minimale Implementierung der Aufgabe ist somit wie folgt:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Ob alles wie erwartet funktioniert, kann mit folgendem Code überprüft werden:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Statefulness und Messungen

Statefulness wird in TFF häufig verwendet, um Berechnungen darzustellen, von denen erwartet wird, dass sie iterativ ausgeführt werden und sich mit jeder Iteration ändern. Beispielsweise enthält der Status einer Lernberechnung die Gewichte des zu lernenden Modells.

Um zu veranschaulichen, wie der Status in einer Aggregationsberechnung verwendet wird, ändern wir die Beispielaufgabe. Anstatt den value mit 2.0 multiplizieren, multiplizieren wir ihn mit dem Iterationsindex - der Häufigkeit, mit der die Aggregation ausgeführt wurde.

Dazu benötigen wir eine Möglichkeit, den Iterationsindex zu verfolgen, der durch das Konzept des Zustands erreicht wird. In initialize_fn initialisieren wir den Zustand nicht als leeren Status, sondern als skalaren Nullpunkt. Dann kann state in next_fn in drei Schritten verwendet werden: (1) Inkrementieren um 1.0 , (2) Multiplizieren des value und (3) Zurückgeben als neuer aktualisierter Status.

Sobald dies erledigt ist, können Sie Folgendes bemerken: Es kann jedoch genau derselbe Code wie oben verwendet werden, um alle Arbeiten wie erwartet zu überprüfen. Woher weiß ich, dass sich tatsächlich etwas geändert hat?

Gute Frage! Hier wird das Konzept der Messungen nützlich. Im Allgemeinen können Messungen jeden Wert melden, der für eine einzelne Ausführung der next Funktion relevant ist und zur Überwachung verwendet werden kann. In diesem Fall kann es sich um den summed_value aus dem vorherigen Beispiel handeln. Das heißt, der Wert vor dem Schritt "Entkalken", der vom Iterationsindex abhängen sollte. Auch dies ist in der Praxis nicht unbedingt nützlich, veranschaulicht jedoch den relevanten Mechanismus.

Die zustandsbehaftete Antwort auf die Aufgabe sieht also wie folgt aus:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Beachten Sie, dass der state , der als Eingabe in next_fn , auf dem Server abgelegt wird. Um es auf Clients verwenden zu können, muss es zuerst kommuniziert werden, was mit dem Operator tff.federated_broadcast wird.

Um alle Arbeiten wie erwartet zu überprüfen, können wir uns jetzt die gemeldeten measurements ansehen, die bei jeder Ausführungsrunde unterschiedlich sein sollten, selbst wenn sie mit denselben client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Strukturierte Typen

Die Modellgewichte eines Modells, das im föderierten Lernen trainiert wurde, werden normalerweise als Sammlung von Tensoren und nicht als einzelner Tensor dargestellt. In TFF wird dies als tff.StructType und allgemein nützliche Aggregationsfabriken müssen in der Lage sein, die strukturierten Typen zu akzeptieren.

In den obigen Beispielen haben wir jedoch nur mit einem tff.TensorType Objekt gearbeitet. Wenn wir versuchen, den Aggregationsprozess mit der vorherigen Factory mit einem tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) zu erstellen, erhalten wir einen seltsamen Fehler, weil TensorFlow versucht, einen tf.Tensor und eine list zu multiplizieren.

Das Problem ist, dass wir anstatt die Struktur der Tensoren mit einer Konstanten zu multiplizieren, jeden Tensor in der Struktur mit einer Konstanten multiplizieren müssen . Die übliche Lösung für dieses Problem besteht darin, das Modul tf.nest innerhalb der erstellten tff.tf_computation s zu verwenden.

Die Version der vorherigen ExampleTaskFactory die mit strukturierten Typen kompatibel ist, sieht daher wie folgt aus:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

In diesem Beispiel wird ein Muster hervorgehoben, das bei der Strukturierung von TFF-Code hilfreich sein kann. Wenn es sich nicht um sehr einfache Operationen handelt, wird der Code besser lesbar, wenn die tff.tf_computation s, die als Bausteine ​​in einer tff.federated_computation werden, an einer separaten Stelle erstellt werden. Innerhalb der tff.federated_computation werden diese Bausteine ​​nur mit den intrinsischen Operatoren verbunden.

So überprüfen Sie, ob es wie erwartet funktioniert:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Innere Aggregationen

Der letzte Schritt besteht darin, optional die Delegierung der tatsächlichen Aggregation an andere Fabriken zu ermöglichen, um eine einfache Zusammensetzung verschiedener Aggregationstechniken zu ermöglichen.

Dies wird erreicht, indem im Konstruktor unserer ExampleTaskFactory ein optionales inner_factory Argument ExampleTaskFactory . Wenn nicht anders angegeben, tff.aggregators.SumFactory verwendet, der das gilt tff.federated_sum Operator direkt im vorherigen Abschnitt verwendet.

Wenn create aufgerufen wird, können wir zuerst create der inner_factory , um den inneren Aggregationsprozess mit demselben value_type zu erstellen.

Der Status unseres Prozesses, der von initialize_fn wird, besteht aus zwei Teilen: dem Status, der durch "diesen" Prozess erstellt wurde, und dem Status des gerade erstellten inneren Prozesses.

Die Implementierung von next_fn unterscheidet sich darin, dass die eigentliche Aggregation an die next Funktion des inneren Prozesses delegiert wird und wie die endgültige Ausgabe zusammengesetzt ist. Der Zustand setzt sich wieder aus "diesem" und "inneren" Zustand zusammen, und Messungen werden auf ähnliche Weise wie ein OrderedDict .

Das Folgende ist eine Implementierung eines solchen Musters.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Bei der Delegierung an die Funktion inner_process.next erhalten wir als tff.templates.MeasuredProcessOutput eine tff.templates.MeasuredProcessOutput mit denselben drei Feldern - state , result und measurements . Bei der Erstellung der Gesamtrückgabestruktur des zusammengesetzten Aggregationsprozesses sollten die Felder state und measurements im Allgemeinen zusammengesetzt und zusammen zurückgegeben werden. Im Gegensatz dazu entspricht das result dem zu aggregierenden Wert und "fließt" stattdessen durch die zusammengesetzte Aggregation.

Das state sollte als Implementierungsdetail der Fabrik angesehen werden, und daher kann die Zusammensetzung eine beliebige Struktur haben. Die measurements entsprechen jedoch Werten, die dem Benutzer irgendwann gemeldet werden müssen. Daher empfehlen wir die Verwendung von OrderedDict mit zusammengesetzter Benennung, sodass klar ist, woher in einer Komposition eine gemeldete Metrik stammt.

Beachten Sie auch die Verwendung des Operators tff.federated_zip . Der state Objekt durch den Prozess erstellt contolled sollte sein tff.FederatedType . Wenn wir stattdessen (this_state, inner_state) in initialize_fn , wäre die Signatur des Rückgabetyps ein tff.StructType , der ein 2-Tupel von tff.FederatedType s enthält. Die Verwendung von tff.federated_zip "hebt" den tff.FederatedType auf die oberste Ebene. Dies wird in ähnlicher Weise im next_fn wenn der Status und die zurückzugebenden Messungen vorbereitet werden.

Schließlich können wir sehen, wie dies mit der standardmäßigen inneren Aggregation verwendet werden kann:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... und mit einer anderen inneren Aggregation. Zum Beispiel eine ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Zusammenfassung

In diesem Lernprogramm haben wir die Best Practices erläutert, die befolgt werden müssen, um einen universellen Aggregationsbaustein zu erstellen, der als Aggregationsfactory dargestellt wird. Die Allgemeinheit kommt durch die Entwurfsabsicht auf zwei Arten zustande:

  1. Parametrisierte Berechnungen. Die Aggregation ist ein unabhängiger Baustein, der in andere TFF-Module tff.aggregators kann, die für die Zusammenarbeit mit tff.aggregators um deren erforderliche Aggregation zu parametrisieren, z. B. tff.learning.build_federated_averaging_process .
  2. Aggregationszusammensetzung. Ein Aggregationsbaustein kann mit anderen Aggregationsbausteinen zusammengesetzt werden, um komplexere zusammengesetzte Aggregationen zu erstellen.