Implémentation d'agrégations personnalisées

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Dans ce tutoriel, nous expliquons les principes de conception derrière le tff.aggregators module et les meilleures pratiques pour la mise en œuvre d' agrégation personnalisée des valeurs des clients au serveur.

Conditions préalables. Ce tutoriel suppose que vous êtes déjà familier avec les concepts de base de Federated de tff.SERVER tff.CLIENTS tff.tf_computation tff.federated_computation base tels que les placements ( tff.SERVER , tff.CLIENTS ), comment TFF (calculs REPRÉSENTE tff.tf_computation , tff.federated_computation ) et leurs signatures de type.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Résumé de conception

Dans la FFT, « agrégation » fait référence au mouvement d'un ensemble de valeurs sur tff.CLIENTS pour produire une valeur totale de même type sur tff.SERVER . Autrement dit, chaque valeur client individuelle n'a pas besoin d'être disponible. Par exemple, dans l'apprentissage fédéré, les mises à jour du modèle client sont moyennées pour obtenir une mise à jour du modèle agrégé à appliquer au modèle global sur le serveur.

En plus des opérateurs atteindre cet objectif , tels que tff.federated_sum , TFF fournit tff.templates.AggregationProcess (un de processus stateful ) qui formalise la signature de type pour le calcul de l' agrégation de sorte qu'il peut se généraliser à des formes plus complexes qu'une simple somme.

Les principales composantes des tff.aggregators module sont des usines pour la création du AggregationProcess , qui sont conçus pour être généralement utiles et blocs de construction remplaçables de TFF en deux aspects:

  1. Calculs paramétrés. L' agrégation est un bloc de construction indépendant qui peut être branché dans d' autres modules de TFF conçus pour fonctionner avec tff.aggregators à leur agrégation nécessaire paramétrez.

Exemple:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Composition de l'agrégation. Un bloc de construction d'agrégation peut être composé avec d'autres blocs de construction d'agrégation pour créer des agrégations composites plus complexes.

Exemple:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Le reste de ce tutoriel explique comment ces deux objectifs sont atteints.

Processus d'agrégation

Nous résumons d' abord la tff.templates.AggregationProcess et suivre avec le modèle d'usine pour sa création.

Le tff.templates.AggregationProcess est un tff.templates.MeasuredProcess avec les signatures de types spécifiés pour l' agrégation. En particulier, l' initialize et next fonctions ont les signatures de type suivant:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

L'état (de type state_type ) doit être placé au serveur. La next fonction prend comme argument d' entrée l'état et une valeur à être agrégées (de type value_type ) placé à ses clients. Les * des moyens éventuels autres arguments d'entrée, pour des poids d'instance dans une moyenne pondérée. Il renvoie un objet d'état mis à jour, la valeur agrégée du même type placée sur le serveur et quelques mesures.

Notez que les deux l'état à passer entre des exécutions de la next fonction, et les mesures indiquées pour but de signaler toute information en fonction d'une exécution spécifique de la next fonction, peut être vide. Néanmoins, ils doivent être explicitement spécifiés pour que d'autres parties de TFF aient un contrat clair à suivre.

D' autres modules de TFF, par exemple les mises à jour du modèle en tff.learning , devraient utiliser le tff.templates.AggregationProcess pour paramétrer la façon dont les valeurs sont agrégées. Cependant, quelles sont exactement les valeurs agrégées et quelles sont leurs signatures de type, dépend d'autres détails du modèle en cours d'apprentissage et de l'algorithme d'apprentissage utilisé pour le faire.

Pour l' agrégation indépendante des autres aspects des calculs, nous utilisons le modèle d'usine - nous créons la appropriée tff.templates.AggregationProcess une fois que les signatures de type d'objets pertinents à agréger sont disponibles, en appelant la create méthode de l'usine. La gestion directe du processus d'agrégation n'est donc nécessaire que pour les auteurs de bibliothèques, qui sont responsables de cette création.

Usines de processus d'agrégation

Il existe deux classes de fabrique de base abstraites pour l'agrégation non pondérée et pondérée. Leur create méthode prend des signatures de type de valeur à être agrégées et renvoie un tff.templates.AggregationProcess pour l' agrégation de ces valeurs.

Le processus créé par tff.aggregators.UnweightedAggregationFactory prend deux arguments d'entrée: (1) l' état au serveur et (2) la valeur du type spécifié value_type .

Un exemple d' implémentation est tff.aggregators.SumFactory .

Le processus créé par tff.aggregators.WeightedAggregationFactory prend trois arguments d'entrée: (1) l' état au serveur, (2) la valeur du type spécifié value_type et (3) de poids de type weight_type , tel que spécifié par l'utilisateur de l'usine lors de l' appel de sa create procédé.

Un exemple d' implémentation est tff.aggregators.MeanFactory qui calcule une moyenne pondérée.

Le modèle d'usine est la façon dont nous atteignons le premier objectif indiqué ci-dessus ; cette agrégation est un bloc de construction indépendant. Par exemple, lors de la modification des variables de modèle pouvant être entraînées, une agrégation complexe n'a pas nécessairement besoin de changer ; l'usine représentant elle sera invoquée avec une signature de type différent lorsqu'il est utilisé par une méthode telle que tff.learning.build_federated_averaging_process .

Compositions

Rappelez-vous qu'un processus d'agrégation général peut encapsuler (a) un certain prétraitement des valeurs chez les clients, (b) un mouvement des valeurs du client au serveur, et (c) un certain post-traitement de la valeur agrégée au niveau du serveur. Le deuxième objectif indiqué ci - dessus, la composition d'agrégation est réalisée à l' intérieur des tff.aggregators module en structurant la mise en œuvre des usines de concentration de telle sorte que la partie (b) peut être déléguée à une autre usine d'agrégation.

Plutôt que d'implémenter toute la logique nécessaire dans une seule classe d'usine, les implémentations sont par défaut concentrées sur un seul aspect pertinent pour l'agrégation. En cas de besoin, ce modèle nous permet ensuite de remplacer les blocs de construction un à la fois.

Un exemple est la moyenne pondérée tff.aggregators.MeanFactory . Son implémentation multiplie les valeurs et les poids fournis chez les clients, puis additionne les valeurs pondérées et les poids indépendamment, puis divise la somme des valeurs pondérées par la somme des poids au niveau du serveur. Au lieu de mettre en œuvre les sommations en utilisant directement le tff.federated_sum opérateur, la somme est déléguée à deux instances de tff.aggregators.SumFactory .

Une telle structure permet aux deux sommations par défaut d'être remplacées par des usines différentes, qui réalisent la somme différemment. Par exemple, un tff.aggregators.SecureSumFactory ou une implémentation personnalisée du tff.aggregators.UnweightedAggregationFactory . A l' inverse, le temps, tff.aggregators.MeanFactory peut être elle - même une agrégation interne d' une autre usine, comme tff.aggregators.clipping_factory , si les valeurs doivent être clipsé avant la moyenne.

Voir la précédente Tuning recommandé agrégations pour l' tff.aggregators apprentissage tutoriel pour des utilisations receommended du mécanisme de composition à l' aide des usines existantes dans le tff.aggregators module.

Les bonnes pratiques par l'exemple

Nous allons illustrer les tff.aggregators concepts en détail en mettant en œuvre une tâche simple exemple, et de le rendre de plus en plus générale. Une autre façon d'apprendre est de regarder la mise en œuvre des usines existantes.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Au lieu de sommer la value , la tâche est par exemple pour résumer la value * 2.0 puis diviser la somme par 2.0 . Le résultat de l' agrégation est donc mathématiquement équivalente à additionner directement la value , et pourrait être considéré comme constitué de trois parties: (1) mise à l' échelle à des clients (2) de sommation pour clients (3) unscaling au serveur.

Suite à la conception expliqué ci - dessus, la logique sera mise en œuvre en tant que sous - classe de tff.aggregators.UnweightedAggregationFactory , qui crée appropriée tff.templates.AggregationProcess lorsqu'il est administré un value_type à total:

Mise en œuvre minimale

Pour l'exemple de tâche, les calculs nécessaires sont toujours les mêmes, il n'est donc pas nécessaire d'utiliser l'état. Il est vide ainsi, et représentée comme tff.federated_value((), tff.SERVER) . Il en va de même pour les mesures, pour l'instant.

La mise en œuvre minimale de la tâche est donc la suivante :

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Si tout fonctionne comme prévu, vous pouvez vérifier avec le code suivant :

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

État et mesures

L'état est largement utilisé dans TFF pour représenter les calculs qui devraient être exécutés de manière itérative et changer à chaque itération. Par exemple, l'état d'un calcul d'apprentissage contient les poids du modèle en cours d'apprentissage.

Pour illustrer comment utiliser l'état dans un calcul d'agrégation, nous modifions l'exemple de tâche. Au lieu de multiplier la value par 2.0 , on multiplie par l'indice d'itération - le nombre de fois l'agrégation a été exécuté.

Pour ce faire, nous avons besoin d'un moyen de garder une trace de l'indice d'itération, qui est obtenu grâce au concept d'état. Dans le initialize_fn , au lieu de créer un état vide, initialisant l'état d'être un zéro scalaire. Ensuite, l' état peut être utilisé dans le next_fn en trois étapes: (1) incrément de 1.0 , (2) l' utilisation pour multiplier la value , et (3) de retour en tant que nouvel état mis à jour.

Une fois cela fait, vous pouvez noter: Mais exactement le même code que ci - dessus peut être utilisé pour vérifier tous les travaux comme prévu. Comment puis-je savoir que quelque chose a réellement changé ?

Bonne question! C'est là que le concept de mesures devient utile. En général, les mesures peuvent signaler toute valeur pertinente à une seule exécution de la next fonction, qui pourrait être utilisé pour la surveillance. Dans ce cas, il peut être le summed_value de l'exemple précédent. C'est-à-dire la valeur avant l'étape de "dégradation", qui devrait dépendre de l'indice d'itération. Encore une fois, cela n'est pas nécessairement utile dans la pratique, mais illustre le mécanisme pertinent.

La réponse avec état à la tâche se présente donc comme suit :

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Notez que l' state qui entre en next_fn comme entrée est placé au serveur. Pour l' utiliser à des clients, il faut d'abord communiquer, ce qui est réalisé à l' aide du tff.federated_broadcast opérateur.

Pour vérifier tous les travaux comme prévu, nous pouvons maintenant les déclarées measurements , qui devraient être différentes à chaque tour d'exécution, même si l' exécution avec le même client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Types structurés

Les poids de modèle d'un modèle formé en apprentissage fédéré sont généralement représentés comme un ensemble de tenseurs, plutôt que comme un seul tenseur. Dans TFF, ceci est représenté comme tff.StructType et les usines d'agrégation généralement utiles doivent être en mesure d'accepter les types structurés.

Cependant, dans les exemples ci - dessus, nous ne travaillions avec un tff.TensorType objet. Si nous essayons d'utiliser l'ancienne usine pour créer le processus d'agrégation avec un tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , nous obtenons une erreur étrange parce que tensorflow va essayer de multiplier un tf.Tensor et une list .

Le problème est qu'au lieu de multiplier la structure de tenseurs par une constante, il faut multiplier chaque tenseur dans la structure par une constante. La solution habituelle à ce problème est d'utiliser le tf.nest à l' intérieur du module créé des tff.tf_computation s.

La version de la précédente ExampleTaskFactory ainsi compatible avec les types structurés se présente comme suit:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Cet exemple met en évidence un modèle qui peut être utile à suivre lors de la structuration du code TFF. Lorsqu'ils ne sont pas face à des opérations très simples, le code devient plus lisible lorsque les tff.tf_computation s qui seront utilisés comme blocs de construction à l' intérieur d' un tff.federated_computation sont créés dans un endroit séparé. A l' intérieur du tff.federated_computation , ces blocs de construction ne sont connectés à l' aide des opérateurs intrinsèques.

Pour vérifier que cela fonctionne comme prévu :

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Agrégations internes

La dernière étape consiste à permettre éventuellement la délégation de l'agrégation réelle à d'autres usines, afin de permettre une composition facile des différentes techniques d'agrégation.

Ceci est réalisé en créant un option inner_factory argument dans le constructeur de notre ExampleTaskFactory . Si non spécifié, tff.aggregators.SumFactory est utilisé, qui applique le tff.federated_sum opérateur utilisé directement dans la section précédente.

Lorsque create est appelé, nous pouvons d' abord appeler create des inner_factory pour créer le processus d'agrégation interne avec le même value_type .

L'état de notre processus retourné par initialize_fn est une composition de deux parties: l'état créé par « ce » processus, et l'état du juste créé processus intérieur.

La mise en œuvre des next_fn diffère en ce que l'agrégation réelle est déléguée à la next fonction du processus interne, et dans la façon dont le résultat final est composé. L'état est composé nouveau de l' état « ce » et « intérieur », et les mesures sont composés d'une manière similaire en tant que OrderedDict .

Ce qui suit est une implémentation d'un tel modèle.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Au moment de déléguer à la inner_process.next fonction, la structure de retour que nous obtenons est un tff.templates.MeasuredProcessOutput , avec les mêmes trois domaines - state , result et measurements . Lors de la création de la structure globale de retour du processus d'agrégation composé, les state et measurements des champs doivent être généralement composés et sont revenus ensemble. En revanche, les result champ correspond à la valeur étant agrégées et place « flux à travers » l'agrégation composé.

L' state objet doit être considéré comme un détail de mise en œuvre de l'usine, et donc la composition pourrait être de toute structure. Cependant, les measurements correspondent à des valeurs à signaler à l'utilisateur à un moment donné. Nous vous recommandons donc d'utiliser OrderedDict , de nommer ce composé qu'il serait clair où dans une composition ne a rapporté métrique vient.

A noter également l'utilisation du tff.federated_zip opérateur. L' state objet contolled par le processus créé devrait être un tff.FederatedType . Si nous avions la place retourné (this_state, inner_state) dans le initialize_fn , sa signature de type de retour serait un tff.StructType contenant un 2-tuple de tff.FederatedType s. L'utilisation de tff.federated_zip « ascenseurs » le tff.FederatedType au niveau supérieur. Ceci est également utilisé dans le next_fn lors de la préparation de l'état et les mesures à retourner.

Enfin, nous pouvons voir comment cela peut être utilisé avec l'agrégation interne par défaut :

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... et avec une agrégation interne différente. Par exemple, un ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Sommaire

Dans ce didacticiel, nous avons expliqué les bonnes pratiques à suivre pour créer un bloc de construction d'agrégation à usage général, représenté comme une fabrique d'agrégation. La généralité passe par l'intention de conception de deux manières :

  1. Calculs paramétrés. L' agrégation est un bloc de construction indépendant qui peut être branché sur d' autres modules de la FFT conçu pour fonctionner avec tff.aggregators pour paramétrer leur agrégation nécessaire, tels que tff.learning.build_federated_averaging_process .
  2. Composition de l'agrégation. Un bloc de construction d'agrégation peut être composé avec d'autres blocs de construction d'agrégation pour créer des agrégations composites plus complexes.