Implementieren von benutzerdefinierten Aggregationen

Auf TensorFlow.org ansehen In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

In diesem Tutorial erklären wir Design - Prinzipien hinter dem tff.aggregators Modul und Best Practices für die Implementierung kundenspezifische Aggregation von Werten von Clients zum Server.

Voraussetzungen. Dieses Tutorial vorausgesetzt , dass Sie bereits vertraut sind mit den grundlegenden Konzepten von Federated Kern wie Platzierungen ( tff.SERVER , tff.CLIENTS ), wie TFF stellt Berechnungen ( tff.tf_computation , tff.federated_computation ) und ihre Art Signaturen.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Designzusammenfassung

In TFF „Aggregation“ bezieht sich auf die Bewegung eines Satzes von Werten auf tff.CLIENTS einen Gesamtwert von der gleichen Art auf herzustellen tff.SERVER . Das heißt, jeder einzelne Clientwert muss nicht verfügbar sein. Beim föderierten Lernen werden beispielsweise Clientmodellaktualisierungen gemittelt, um eine aggregierte Modellaktualisierung zu erhalten, die auf das globale Modell auf dem Server angewendet wird.

Neben Betreibern dieses Ziel zu erreichen , wie tff.federated_sum , TFF bietet tff.templates.AggregationProcess (ein Stateful - Prozess ) , die die Art Signatur für die Aggregation Berechnung formalisiert , so dass es um komplexere Formen als eine einfache Summe verallgemeinern kann.

Die Hauptkomponenten der tff.aggregators Modulfabriken für die Erstellung des ARE AggregationProcess , die allgemein nützlich und replacable Bausteine des TFF in zwei Aspekten sein ausgelegt sind:

  1. Parametrisierte Berechnungen. Aggregation ist ein unabhängiger Baustein, der in andere TFF - Module entwickelt , um die Arbeit mit eingesteckt werden kann tff.aggregators ihre notwendige Aggregation zu parametrieren.

Beispiel:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Aggregationszusammensetzung. Ein Aggregationsbaustein kann mit anderen Aggregationsbausteinen zusammengesetzt werden, um komplexere zusammengesetzte Aggregationen zu erstellen.

Beispiel:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Der Rest dieses Tutorials erklärt, wie diese beiden Ziele erreicht werden.

Aggregationsprozess

Wir fassen zunächst die tff.templates.AggregationProcess , und folgen Sie mit dem Fabrik - Muster für seine Schöpfung.

Die tff.templates.AggregationProcess ist ein tff.templates.MeasuredProcess mit Typ - Signaturen für die Aggregation angegeben. Insbesondere die initialize und next Funktionen haben den folgenden Typen Signaturen:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Der Zustand (vom Typ state_type ) muß auf dem Server aufgestellt werden. Die next Funktion nimmt als Eingabe - Argument der Zustand und einem Wert (des Typs aggregiert werden value_type ) bei Kunden platziert. Die * Mittel gegebenenfalls andere Eingabeargumente, zum Beispiel Gewichte in einem gewichteten Mittelwert. Es gibt ein aktualisiertes Zustandsobjekt, den aggregierten Wert desselben Typs, der auf dem Server platziert ist, und einige Messungen zurück.

Man beachte , daß sowohl der Zustand zwischen Ausführungen der übergeben werden next Funktion und die berichteten Messungen bestimmt , alle Informationen zu berichten über eine spezifische Ausführung der je next Funktion kann leer sein. Sie müssen jedoch ausdrücklich angegeben werden, damit andere Teile von TFF einem klaren Vertrag folgen können.

Andere TFF - Module, zum Beispiel der Modelle Updates in tff.learning , wird erwartet , dass die verwenden tff.templates.AggregationProcess zu parametrieren , wie Werte aggregiert werden. Was genau die aggregierten Werte sind und was ihre Typsignaturen sind, hängt jedoch von anderen Details des trainierten Modells und dem dafür verwendeten Lernalgorithmus ab.

Aggregation unabhängig von den anderen Aspekten der Berechnungen zu machen, verwenden wir die Fabrik Muster - wir die passende erstellen tff.templates.AggregationProcess , sobald die entsprechenden Typ Signaturen von Objekten aggregiert werden zur Verfügung stehen, unter Berufung auf das create Methode der Fabrik. Eine direkte Abwicklung des Aggregationsprozesses ist somit nur für Bibliotheksautoren erforderlich, die für diese Erstellung verantwortlich sind.

Aggregationsprozessfabriken

Es gibt zwei abstrakte Basis-Factory-Klassen für die ungewichtete und gewichtete Aggregation. Ihre create Methode nimmt Art Signaturen Wert aggregiert werden und gibt einen tff.templates.AggregationProcess für die Aggregation solcher Werte.

Der Prozess erstellt von tff.aggregators.UnweightedAggregationFactory nimmt zwei Eingangsargumente: (1) Zustand am Server und (2) Wert des angegebenen Typs value_type .

Eine beispielhafte Implementierung ist tff.aggregators.SumFactory .

Der Prozess erstellt von tff.aggregators.WeightedAggregationFactory nimmt drei Eingabeargumente: (1) Zustand am Server (2) Wert des angegebenen Typs value_type und (3) Gewicht des Typs weight_type , wie es die Fabrik des Benutzers festgelegt , wenn seine Aufrufen create Methode.

Eine beispielhafte Implementierung ist tff.aggregators.MeanFactory , die einen gewichteten Mittelwert berechnet.

Das Fabrikmuster ist, wie wir das erste oben genannte Ziel erreichen; diese Aggregation ist ein unabhängiger Baustein. Wenn beispielsweise geändert wird, welche Modellvariablen trainierbar sind, muss sich eine komplexe Aggregation nicht unbedingt ändern; Die Fabrik darstellt es wird mit einer anderen Art Signatur aufgerufen werden , wenn sie durch ein Verfahren wie verwendet tff.learning.build_federated_averaging_process .

Kompositionen

Denken Sie daran, dass ein allgemeiner Aggregationsprozess (a) eine gewisse Vorverarbeitung der Werte bei Clients, (b) die Bewegung von Werten vom Client zum Server und (c) eine gewisse Nachverarbeitung der aggregierten Werte auf dem Server einkapseln kann. Das zweite Ziel oben, Aggregation Zusammensetzung angegeben, realisiert wird innerhalb der tff.aggregators durch Strukturierung Modul die Umsetzung der Aggregation Fabriken so dass ein Teil (b) kann in einer anderen Aggregations Fabrik delegiert werden.

Anstatt die gesamte erforderliche Logik innerhalb einer einzelnen Factory-Klasse zu implementieren, konzentrieren sich die Implementierungen standardmäßig auf einen einzigen Aspekt, der für die Aggregation relevant ist. Bei Bedarf ermöglicht uns dieses Muster dann, die Bausteine ​​einzeln zu ersetzen.

Ein Beispiel ist der gewichtete tff.aggregators.MeanFactory . Seine Implementierung multipliziert bereitgestellte Werte und Gewichtungen bei Clients, summiert dann unabhängig sowohl gewichtete Werte als auch Gewichtungen und dividiert dann die Summe der gewichteten Werte durch die Summe der Gewichtungen am Server. Statt die Summierungen der Umsetzung von direkt die Verwendung tff.federated_sum Operator wird die Summe auf zwei Instanzen delegiert tff.aggregators.SumFactory .

Eine solche Struktur ermöglicht es, die beiden Standardsummen durch unterschiedliche Fabriken zu ersetzen, die die Summe unterschiedlich realisieren. Zum Beispiel kann ein tff.aggregators.SecureSumFactory oder eine benutzerdefinierte Implementierung der tff.aggregators.UnweightedAggregationFactory . Umgekehrt Zeit, tff.aggregators.MeanFactory kann sich eine innere Zusammenlegen eines anderen Fabrik wie sein tff.aggregators.clipping_factory , wenn die Werte vor der Mittelung werden abgeschnitten sind.

Siehe vorherigen Tuning Aggregationen empfohlen für das tff.aggregators Lernen Tutorial für receommended Verwendungen der Zusammensetzung Mechanismus bestehenden Fabriken in der tff.aggregators Modul.

Best Practices durch Beispiel

Wir werden die illustrieren tff.aggregators Konzepte im Detail durch ein einfaches Beispiel Aufgabe Implementierung und machen es zunehmend allgemeiner. Eine andere Möglichkeit zum Lernen besteht darin, sich die Implementierung bestehender Fabriken anzusehen.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Statt Summieren value ist das Beispiel Aufgabe zu summieren value * 2.0 und dann teilen Sie die Summe von 2.0 . Das Aggregations Ergebnis ist somit mathematisch äquivalent zu dem Summierpunkt direkt value und könnte als aus drei Teilen bestehend betrachtet werden: (1) Skalierung an Clients (2) für Kunden (3) Summieren am Server unscaling.

Im Anschluss an das Design oben erläutert, wird die Logik als eine Unterklasse von implementiert werden tff.aggregators.UnweightedAggregationFactory , die entsprechende schafft tff.templates.AggregationProcess wenn eine gegebene value_type zu aggregieren:

Minimale Implementierung

Für die Beispielaufgabe sind die erforderlichen Berechnungen immer die gleichen, sodass die Verwendung von state nicht erforderlich ist. Es ist somit leer, und wie dargestellt tff.federated_value((), tff.SERVER) . Das gleiche gilt vorerst für Messungen.

Die minimale Umsetzung der Aufgabe sieht somit wie folgt aus:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Ob alles wie erwartet funktioniert kann mit folgendem Code überprüft werden:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Aussagekraft und Maße

Statefulness wird in TFF allgemein verwendet, um Berechnungen darzustellen, von denen erwartet wird, dass sie iterativ ausgeführt werden und sich mit jeder Iteration ändern. Beispielsweise enthält der Zustand einer Lernberechnung die Gewichte des gelernten Modells.

Um zu veranschaulichen, wie der Zustand in einer Aggregationsberechnung verwendet wird, ändern wir die Beispielaufgabe. Anstelle der Multiplikation value von 2.0 multiplizieren wir es durch die Iteration Index - die Anzahl der Male die Aggregation ausgeführt wurde.

Dazu brauchen wir eine Möglichkeit, den Iterationsindex zu verfolgen, was durch das Konzept des Zustands erreicht wird. Im initialize_fn , anstatt einen leeren Zustand zu schaffen, initialisieren wir den Zustand ein Skalar Null. Dann kann das Zustand verwendet werden , in next_fn in drei Schritten: (1) Schrittweite von 1.0 , (2) Verwendung Multiplizier - value , und (3) zurück als neuen aktualisierten Zustand.

Sobald dies geschehen ist, können Sie Folgendes beachten: Aber genau der gleiche Code wie oben kann verwendet werden , um alle Arbeiten zu überprüfen , wie erwartet. Woher weiß ich, dass sich tatsächlich etwas geändert hat?

Gute Frage! Hier kommt das Konzept der Messungen zum Tragen. Im Allgemeinen können Messungen berichten jeden Wert relevant für eine einzige Ausführung der next Funktion, die für die Überwachung verwendet werden könnte. In diesem Fall kann es das sein summed_value aus dem vorherigen Beispiel. Das heißt, der Wert vor dem Schritt "Unskalierung", der vom Iterationsindex abhängen sollte. Auch dies ist in der Praxis nicht unbedingt sinnvoll, veranschaulicht aber den relevanten Mechanismus.

Die zustandsbehaftete Antwort auf die Aufgabe sieht somit wie folgt aus:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Man beachte , dass der state , der in kommt next_fn als Eingabe an Server gestellt. Um es bei Kunden zu verwenden, muss es zunächst mitgeteilt werden, welche das wird unter Verwendung tff.federated_broadcast Operator.

Um alle Arbeiten zu überprüfen , wie erwartet, können wir jetzt bei den berichteten aussehen measurements , die mit jeder Runde der Ausführung unterschiedlich sein sollte, auch wenn Lauf mit dem gleichen client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Strukturierte Typen

Die Modellgewichtungen eines im föderierten Lernen trainierten Modells werden normalerweise als eine Sammlung von Tensoren und nicht als ein einzelner Tensor dargestellt. In TFF, wird dies als dargestellt tff.StructType und allgemein nützliche Aggregation Fabriken müssen die strukturierten Typen akzeptieren zu können.

Doch in den obigen Beispielen haben wir nur mit einem gearbeitet tff.TensorType Objekt. Wenn wir versuchen , die bisherige Fabrik zu verwenden , um den Aggregationsprozess mit einem schaffen tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , so erhalten wir einen seltsamen Fehler , weil TensorFlow werden versuchen , einen zu multiplizieren tf.Tensor und eine list .

Das Problem besteht darin , dass statt die Struktur der Tensoren durch eine Konstante multipliziert, müssen wir jeden Tensor in der Struktur durch eine Konstante multiplizieren. Die übliche Lösung für dieses Problem ist es, das verwenden tf.nest Modul innerhalb der erstellten tff.tf_computation s.

Die Version des vorherigen ExampleTaskFactory kompatibel mit strukturierten Typen sieht somit wie folgt aus :

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Dieses Beispiel hebt ein Muster hervor, das beim Strukturieren von TFF-Code nützlich sein kann. Wenn er nicht mit sehr einfachen Operationen handelt, wird der Code besser lesbar , wenn die tff.tf_computation s , die als Bausteine in einen verwendet werden tff.federated_computation in einem separaten Ort erstellt werden. Innerhalb des tff.federated_computation werden diese Bausteine nur mit den inneren Operatoren verbunden sind .

Um zu überprüfen, ob es wie erwartet funktioniert:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Innere Aggregate

Der letzte Schritt besteht darin, optional die Delegation der eigentlichen Aggregation an andere Fabriken zu ermöglichen, um eine einfache Zusammenstellung verschiedener Aggregationstechniken zu ermöglichen.

Dies wird erreicht, indem ein optionales erreicht inner_factory Argument im Konstruktor unserer ExampleTaskFactory . Wenn nicht anders angegeben, tff.aggregators.SumFactory verwendet, der das gilt tff.federated_sum Operator direkt im vorherigen Abschnitt verwendet.

Wenn create aufgerufen wird, können wir zuerst nennen create die inner_factory den inneren Aggregationsprozess mit derselben zu schaffen value_type .

Der Zustand unseres Prozesses durch zurück initialize_fn ist eine Zusammensetzung aus zwei Teilen , den Zustand , der durch „diesen“ Prozess erstellt wird , und der Zustand des gerade inneren Prozesses erstellt.

Die Umsetzung der next_fn unterscheidet, dass die tatsächliche Aggregation wird gegen die delegierten next Funktion des inneren Prozesses, und darin , wie die endgültige Ausgabe zusammengesetzt ist. Der Zustand wird wieder zusammengesetzt aus „diesem“ und „inneren“ Zustand, und die Messungen werden in ähnlicher Weise als zusammengesetzt OrderedDict .

Das Folgende ist eine Implementierung eines solchen Musters.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Wenn Sie mit dem Delegieren inner_process.next Funktion, die Rückkehr Struktur , die wir bekommen , ist eine tff.templates.MeasuredProcessOutput , mit dem gleichen drei Felder - state , result und measurements . Wenn die Gesamtrücklaufstruktur des zusammengesetzten Aggregationsprozesses zu schaffen, der state und die measurements sollten Felder im Allgemeinen zusammen zusammengesetzt und zurückgeführt werden. Im Gegensatz dazu wird das result wird Feld entspricht den Wert aggregiert und stattdessen „fließt durch“ die zusammengesetzte Aggregation.

Der state Objekt sollte als Implementierungsdetail der Fabrik zu sehen ist , und somit könnte die Zusammensetzung jeder Struktur sein. Jedoch measurements entsprechen Werten an einem bestimmten Punkt an den Benutzer gemeldet werden. Daher empfehlen wir die Verwendung OrderedDict , mit so zusammengesetzt , zu benennen , dass es klar sein würde , wo in einer Zusammensetzung ist eine Metrik berichtet herkommt.

Beachten Sie auch die Verwendung des tff.federated_zip Betreiber. Der state Objekt durch den Prozess erstellt contolled sollte sein tff.FederatedType . Wenn wir statt zurückgekehrt war (this_state, inner_state) im initialize_fn , würde seine Rückkehr Art Signatur sein , tff.StructType ein 2-Tupel zu enthalten tff.FederatedType s. Die Verwendung von tff.federated_zip „Aufzüge“ der tff.FederatedType auf der obersten Ebene. Dies wird in ähnlicher Weise in dem verwendeten next_fn wenn sich der Zustand und die Messungen der Vorbereitung zurückgeführt werden.

Schließlich können wir sehen, wie dies mit der standardmäßigen inneren Aggregation verwendet werden kann:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... und mit einer anderen inneren Aggregation. Zum Beispiel kann ein ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Zusammenfassung

In diesem Tutorial haben wir die Best Practices erläutert, die Sie befolgen sollten, um einen universellen Aggregationsbaustein zu erstellen, der als Aggregationsfactory dargestellt wird. Die Allgemeingültigkeit ergibt sich aus der Entwurfsabsicht auf zwei Arten:

  1. Parametrisierte Berechnungen. Aggregation ist ein unabhängiger Baustein, der in andere TFF - Module entwickelt , um die Arbeit mit eingesteckt werden kann tff.aggregators ihre notwendige Aggregation, wie zu parametrieren tff.learning.build_federated_averaging_process .
  2. Aggregationszusammensetzung. Ein Aggregationsbaustein kann mit anderen Aggregationsbausteinen zusammengesetzt werden, um komplexere zusammengesetzte Aggregationen zu erstellen.