Construire votre propre algorithme d'apprentissage fédéré

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Avant de commencer

Avant de commencer, veuillez exécuter ce qui suit pour vous assurer que votre environnement est correctement configuré. Si vous ne voyez pas un message d' accueil, s'il vous plaît se référer à l' installation guide pour les instructions.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
import tensorflow as tf
import tensorflow_federated as tff

Dans la classification d'image et génération texte didacticiels, nous avons appris comment mettre en place des pipelines modèles et données pour Federated Learning (FL), et la formation fédérée via effectué la tff.learning couche API de TFF.

Ce n'est que la pointe de l'iceberg en ce qui concerne la recherche sur le FL. Dans ce tutoriel, nous discutons de la façon de mettre en œuvre des algorithmes d'apprentissage fédérées sans en remettre à l' tff.learning API. Nous visons à accomplir ce qui suit :

Buts:

  • Comprendre la structure générale des algorithmes d'apprentissage fédéré.
  • Explorez les fédérés de base de TFF.
  • Utilisez Federated Core pour implémenter directement la moyenne fédérée.

Bien que ce tutoriel est autonome, il est recommandé en première lecture la classification d'image et génération de texte tutoriels.

Préparation des données d'entrée

Nous chargeons et prétraitons d'abord l'ensemble de données EMNIST inclus dans TFF. Pour plus de détails, voir le classement des images tutoriel.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Pour alimenter l'ensemble de données dans notre modèle, nous Aplatir les données et convertir chaque exemple en un tuple de la forme (flattened_image_vector, label) .

NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch of EMNIST data and return a (features, label) tuple."""
    return (tf.reshape(element['pixels'], [-1, 784]), 
            tf.reshape(element['label'], [-1, 1]))

  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

Nous sélectionnons maintenant un petit nombre de clients et appliquons le prétraitement ci-dessus à leurs ensembles de données.

client_ids = sorted(emnist_train.client_ids)[:NUM_CLIENTS]
federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]

Préparation du modèle

Nous utilisons le même modèle que dans la classification des images tutoriel. Ce modèle (via tf.keras ) comporte une seule couche cachée, suivie d'une couche softmax.

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])

Pour utiliser ce modèle dans TFF, nous terminerons le modèle Keras comme tff.learning.Model . Cela nous permet d'effectuer du modèle passe en avant au sein de TFF, et les résultats du modèle d'extrait . Pour plus de détails, voir aussi la classification des images tutoriel.

def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Alors que nous avons utilisé tf.keras pour créer un tff.learning.Model , TFF prend en charge les modèles beaucoup plus généraux. Ces modèles ont les attributs pertinents suivants qui capturent les poids du modèle :

  • trainable_variables : Un iterable des tenseurs correspondant aux couches entraînables.
  • non_trainable_variables : Un iterable des tenseurs correspondant aux couches non entraînables.

Pour nos besoins, nous utiliserons les trainable_variables . (car notre modèle n'en a que !).

Construire votre propre algorithme d'apprentissage fédéré

Alors que la tff.learning API permet de créer un grand nombre de variantes de calcul de la moyenne fédérée, il existe d' autres algorithmes fédérés qui ne correspondent pas parfaitement dans ce cadre. Par exemple, vous pouvez ajouter la régularisation, écrêtage, ou des algorithmes plus complexes tels que la formation GAN fédérée . Vous pouvez également être au lieu être intéressé par l' analyse fédérée .

Pour ces algorithmes plus avancés, nous devrons écrire notre propre algorithme personnalisé en utilisant TFF. Dans de nombreux cas, les algorithmes fédérés ont 4 composants principaux :

  1. Une étape de diffusion de serveur à client.
  2. Une étape de mise à jour du client local.
  3. Une étape de téléchargement client-serveur.
  4. Une étape de mise à jour du serveur.

Dans TFF, nous représentons généralement des algorithmes fédérés comme tff.templates.IterativeProcess (que nous appelons juste un IterativeProcess tout au long). Ceci est une classe qui contient initialize et next fonctions. Ici, initialize est utilisé pour initialiser le serveur, et next effectuera un tour de communication de l'algorithme fédéré. Écrivons un squelette de ce à quoi devrait ressembler notre processus itératif pour FedAvg.

Tout d' abord, nous avons une fonction initialize qui crée simplement un tff.learning.Model et renvoie ses poids trainable.

def initialize_fn():
  model = model_fn()
  return model.trainable_variables

Cette fonction a l'air bien, mais comme nous le verrons plus tard, nous devrons faire une petite modification pour en faire un "calcul TFF".

Nous voulons aussi esquisser le next_fn .

def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = client_update(federated_dataset, server_weights_at_client)

  # The server averages these updates.
  mean_client_weights = mean(client_weights)

  # The server updates its model.
  server_weights = server_update(mean_client_weights)

  return server_weights

Nous allons nous concentrer sur la mise en œuvre de ces quatre composants séparément. Nous nous concentrons d'abord sur les parties qui peuvent être implémentées dans TensorFlow pur, à savoir les étapes de mise à jour du client et du serveur.

Blocs TensorFlow

Mise à jour des clients

Nous utiliserons notre tff.learning.Model pour faire la formation des clients essentiellement de la même façon que vous former un modèle tensorflow. Nous utiliserons en particulier tf.GradientTape pour calculer le gradient sur des lots de données, puis appliquer ces gradient en utilisant un client_optimizer . Nous nous concentrons uniquement sur les poids pouvant être entraînés.

@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

Mise à jour du serveur

La mise à jour du serveur pour FedAvg est plus simple que la mise à jour du client. Nous allons implémenter une moyenne fédérée "vanille", dans laquelle nous remplaçons simplement les poids du modèle serveur par la moyenne des poids du modèle client. Encore une fois, nous nous concentrons uniquement sur les poids pouvant être entraînés.

@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

L'extrait pourrait être simplifié en retournant simplement les mean_client_weights . Cependant, plus avancés implémentations d'utilisation de calcul de la moyenne fédérées mean_client_weights avec des techniques plus sophistiquées, telles que l' élan ou adaptativité.

Défi: Mettre en œuvre une version de server_update qui met à jour les poids du serveur pour être le point central de model_weights et mean_client_weights. (Note: Ce type d'approche « milieu » est analogue à des travaux récents sur l' optimiseur Lookahead !).

Jusqu'à présent, nous n'avons écrit que du code TensorFlow pur. C'est par conception, car TFF vous permet d'utiliser une grande partie du code TensorFlow que vous connaissez déjà. Cependant, nous devons préciser la logique d'orchestration, qui est, la logique qui dicte ce que les émissions du serveur au client, et ce que les téléchargements du client au serveur.

Cela nécessitera la base fédérée de TFF.

Introduction au noyau fédéré

Le Core fédéré (FC) est un ensemble d'interfaces bas niveau qui servent de base pour la tff.learning API. Cependant, ces interfaces ne se limitent pas à l'apprentissage. En fait, ils peuvent être utilisés pour l'analyse et de nombreux autres calculs sur des données distribuées.

À un niveau élevé, le noyau fédéré est un environnement de développement qui permet à une logique de programme exprimée de manière compacte de combiner le code TensorFlow avec des opérateurs de communication distribués (tels que des sommes distribuées et des diffusions). L'objectif est de donner aux chercheurs et aux praticiens un contrôle explicite sur la communication distribuée dans leurs systèmes, sans avoir besoin de détails sur la mise en œuvre du système (comme la spécification d'échanges de messages réseau point à point).

Un point clé est que TFF est conçu pour la préservation de la vie privée. Par conséquent, il permet un contrôle explicite sur l'emplacement des données, pour empêcher l'accumulation indésirable de données à l'emplacement du serveur centralisé.

Données fédérées

Un concept clé de TFF est celui des « données fédérées », qui font référence à une collection d'éléments de données hébergés sur un groupe d'appareils dans un système distribué (par exemple, des ensembles de données client ou les poids du modèle de serveur). Nous modélisons la collection d'éléments de données à travers tous les périphériques comme une seule valeur fédérée.

Par exemple, supposons que nous ayons des appareils clients qui ont chacun un flotteur représentant la température d'un capteur. Nous pourrions le représenter comme un flotteur fédéré par

federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

Types fédérés sont spécifiés par un type T de ses constituants membres (par exemple. tf.float32 ) et un groupe G de dispositifs. Nous allons nous concentrer sur les cas où G est soit tff.CLIENTS ou tff.SERVER . Un tel type fédéré est représenté comme {T}@G , comme indiqué ci - dessous.

str(federated_float_on_clients)
'{float32}@CLIENTS'

Pourquoi accordons-nous tant d'importance aux placements ? Un objectif clé de TFF est de permettre l'écriture de code qui pourrait être déployé sur un système distribué réel. Cela signifie qu'il est essentiel de déterminer quels sous-ensembles d'appareils exécutent quel code et où résident les différentes données.

TFF met l' accent sur trois choses: les données, où est placé les données et la façon dont les données sont en cours de transformation. Les deux premiers sont encapsulées dans les types fédérées, alors que le dernier est encapsulé dans les calculs fédérées.

Calculs fédérés

TFF est un environnement de programmation fonctionnel fortement typé dont les unités de base sont des calculs fédérés. Ce sont des éléments de logique qui acceptent des valeurs fédérées en entrée et renvoient des valeurs fédérées en sortie.

Par exemple, supposons que nous voulions faire la moyenne des températures sur nos capteurs clients. Nous pourrions définir ce qui suit (en utilisant notre flotteur fédéré) :

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)

Vous pourriez vous demander, comment est - ce différent du tf.function décorateur tensorflow? La réponse clé est que le code généré par tff.federated_computation est ni le code ni tensorflow Python; Il est une spécification d'un système distribué dans un langage de colle indépendant de la plateforme interne.

Bien que cela puisse sembler compliqué, vous pouvez considérer les calculs TFF comme des fonctions avec des signatures de type bien définies. Ces signatures de type peuvent être directement interrogées.

str(get_average_temperature.type_signature)
'({float32}@CLIENTS -> float32@SERVER)'

Ce tff.federated_computation accepte les arguments de type fédéré {float32}@CLIENTS , et les valeurs des rendements de type fédéré {float32}@SERVER . Les calculs fédérés peuvent également aller de serveur à client, de client à client ou de serveur à serveur. Les calculs fédérés peuvent également être composés comme des fonctions normales, tant que leurs signatures de type correspondent.

Pour soutenir le développement, TFF vous permet d'invoquer une tff.federated_computation en fonction Python. Par exemple, nous pouvons appeler

get_average_temperature([68.5, 70.3, 69.8])
69.53334

Calculs non désireux et TensorFlow

Il y a deux restrictions clés à connaître. Tout d' abord, lorsque l'interpréteur Python rencontre un tff.federated_computation décorateur, la fonction est tracée une fois sérialisé pour une utilisation future. En raison de la nature décentralisée de l'apprentissage fédéré, cette utilisation future peut se produire ailleurs, comme un environnement d'exécution à distance. Par conséquent, les calculs sont TFF fondamentalement non désireux. Ce comportement est un peu analogue à celle du tf.function décorateur tensorflow.

En second lieu , un calcul fédéré ne peut être constitué d'opérateurs fédérés (comme tff.federated_mean ), ils ne peuvent pas contenir des opérations tensorflow. Le code tensorflow doit se limiter à des blocs décorés de tff.tf_computation . La plupart du code tensorflow ordinaire peut être décorée directement, comme la fonction suivante qui prend un nombre et ajoute 0.5 à elle.

@tff.tf_computation(tf.float32)
def add_half(x):
  return tf.add(x, 0.5)

Celles - ci ont également des signatures de type, mais sans placement. Par exemple, nous pouvons appeler

str(add_half.type_signature)
'(float32 -> float32)'

Ici , nous voyons une différence importante entre tff.federated_computation et tff.tf_computation . Le premier a des emplacements explicites, tandis que le second n'en a pas.

Nous pouvons utiliser tff.tf_computation blocs dans les calculs fédérées en spécifiant les placements. Créons une fonction qui ajoute la moitié, mais uniquement aux flottants fédérés chez les clients. Nous pouvons le faire en utilisant tff.federated_map , qui applique une donnée tff.tf_computation , tout en préservant le placement.

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def add_half_on_clients(x):
  return tff.federated_map(add_half, x)

Cette fonction est presque identique à add_half , sauf qu'il accepte que des valeurs avec le placement à tff.CLIENTS , et les valeurs de retours avec le même placement. Nous pouvons le voir dans sa signature de type :

str(add_half_on_clients.type_signature)
'({float32}@CLIENTS -> {float32}@CLIENTS)'

En résumé:

  • TFF fonctionne sur des valeurs fédérées.
  • Chaque valeur fédérée possède un type fédéré, avec un type (par exemple. tf.float32 ) et un placement (par exemple. tff.CLIENTS ).
  • Les valeurs fédérées peuvent être transformées en utilisant des calculs fédérés, qui doivent être décorées avec tff.federated_computation et une signature de type fédéré.
  • Code tensorflow doit être contenu dans des blocs avec tff.tf_computation décorateurs.
  • Ces blocs peuvent ensuite être intégrés dans des calculs fédérés.

Construire votre propre algorithme d'apprentissage fédéré, revisité

Maintenant que nous avons un aperçu du noyau fédéré, nous pouvons créer notre propre algorithme d'apprentissage fédéré. Rappelez - vous que ci - dessus, nous avons défini un initialize_fn et next_fn pour notre algorithme. Le next_fn utilisera la client_update et server_update nous avons défini en utilisant le code pur tensorflow.

Toutefois, afin de rendre notre algorithme de calcul d' un fédéré, nous aurons besoin à la fois le next_fn et initialize_fn à chacun un tff.federated_computation .

Blocs fédérés TensorFlow

Création du calcul d'initialisation

La fonction d'initialisation sera assez simple: Nous allons créer un modèle en utilisant model_fn . Cependant, rappelez - vous que nous devons séparer notre code tensorflow en utilisant tff.tf_computation .

@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables

On peut alors passer ce directement dans un calcul fédérée à l' aide tff.federated_value .

@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

Création du next_fn

Nous utilisons maintenant notre code de mise à jour client et serveur pour écrire l'algorithme réel. Nous allons d' abord transformer notre client_update en tff.tf_computation qui accepte un ensembles de données client et poids du serveur et émet un poids client mis à jour tenseur.

Nous aurons besoin des types correspondants pour décorer correctement notre fonction. Heureusement, le type de poids du serveur peut être extrait directement de notre modèle.

whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

Regardons la signature de type de jeu de données. N'oubliez pas que nous avons pris 28 par 28 images (avec des étiquettes entières) et les avons aplaties.

str(tf_dataset_type)
'<float32[?,784],int32[?,1]>*'

Nous pouvons également extraire le poids du modèle de type en utilisant notre server_init fonction ci - dessus.

model_weights_type = server_init.type_signature.result

En examinant la signature de type, nous pourrons voir l'architecture de notre modèle !

str(model_weights_type)
'<float32[784,10],float32[10]>'

Nous pouvons maintenant créer notre tff.tf_computation pour la mise à jour du client.

@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

La tff.tf_computation version de la mise à jour du serveur peut être défini d'une manière similaire, en utilisant des types que nous avons déjà extrait.

@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

Dernier point , mais non le moindre, nous avons besoin de créer le tff.federated_computation qui apporte tout cela ensemble. Cette fonction accepte deux valeurs fédérées, l' une correspondant aux poids du serveur (avec le placement tff.SERVER ), et l'autre correspondant aux ensembles de données client (avec placement tff.CLIENTS ).

Notez que ces deux types ont été définis ci-dessus ! Nous avons simplement besoin de leur donner le bon placement à l' aide tff.FederatedType .

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

Vous vous souvenez des 4 éléments d'un algorithme FL ?

  1. Une étape de diffusion de serveur à client.
  2. Une étape de mise à jour du client local.
  3. Une étape de téléchargement client-serveur.
  4. Une étape de mise à jour du serveur.

Maintenant que nous avons construit ce qui précède, chaque partie peut être représentée de manière compacte sous la forme d'une seule ligne de code TFF. Cette simplicité est la raison pour laquelle nous avons dû faire très attention à spécifier des éléments tels que les types fédérés !

@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))

  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

Nous avons maintenant un tff.federated_computation tant pour l'initialisation de l' algorithme, et pour exécuter une étape de l'algorithme. Pour terminer notre algorithme, nous passons en ces tff.templates.IterativeProcess .

federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

Look Let à la signature du type de initialize et next fonctions de notre processus itératif.

str(federated_algorithm.initialize.type_signature)
'( -> <float32[784,10],float32[10]>@SERVER)'

Cela reflète le fait que federated_algorithm.initialize est une fonction non-arg qui renvoie un modèle mono-couche (avec un 784 par 10 matrice de poids, et 10 unités de polarisation).

str(federated_algorithm.next.type_signature)
'(<server_weights=<float32[784,10],float32[10]>@SERVER,federated_dataset={<float32[?,784],int32[?,1]>*}@CLIENTS> -> <float32[784,10],float32[10]>@SERVER)'

Ici, nous voyons que federated_algorithm.next accepte un modèle de serveur et les données du client, et retourne un modèle de serveur mis à jour.

Évaluation de l'algorithme

Faisons quelques tours et voyons comment la perte change. Tout d' abord, nous allons définir une fonction d'évaluation en utilisant l'approche centralisée discuté dans le second tutoriel.

Nous créons d'abord un ensemble de données d'évaluation centralisé, puis appliquons le même prétraitement que nous avons utilisé pour les données d'apprentissage.

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients()
central_emnist_test = preprocess(central_emnist_test)

Ensuite, nous écrivons une fonction qui accepte un état de serveur et utilise Keras pour évaluer sur l'ensemble de données de test. Si vous êtes familier avec tf.Keras , tout cela va apparence familière, mais notez l'utilisation de set_weights !

def evaluate(server_state):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]  
  )
  keras_model.set_weights(server_state)
  keras_model.evaluate(central_emnist_test)

Maintenant, initialisons notre algorithme et évaluons sur l'ensemble de test.

server_state = federated_algorithm.initialize()
evaluate(server_state)
2042/2042 [==============================] - 2s 767us/step - loss: 2.8479 - sparse_categorical_accuracy: 0.1027

Entraînons-nous pendant quelques tours et voyons si quelque chose change.

for round in range(15):
  server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)
2042/2042 [==============================] - 2s 738us/step - loss: 2.5867 - sparse_categorical_accuracy: 0.0980

On constate une légère diminution de la fonction de perte. Bien que le saut soit faible, nous n'avons effectué que 15 cycles d'entraînement et sur un petit sous-ensemble de clients. Pour voir de meilleurs résultats, nous devrons peut-être faire des centaines, voire des milliers de tours.

Modifier notre algorithme

À ce stade, arrêtons-nous et réfléchissons à ce que nous avons accompli. Nous avons implémenté la moyenne fédérée directement en combinant du code TensorFlow pur (pour les mises à jour client et serveur) avec des calculs fédérés à partir du noyau fédéré de TFF.

Pour effectuer un apprentissage plus sophistiqué, nous pouvons simplement modifier ce que nous avons ci-dessus. En particulier, en modifiant le code TF pur ci-dessus, nous pouvons changer la façon dont le client effectue la formation ou la façon dont le serveur met à jour son modèle.

Défi: Ajouter écrêtage gradient à la client_update fonction.

Si nous voulions apporter des modifications plus importantes, nous pourrions également faire en sorte que le serveur stocke et diffuse plus de données. Par exemple, le serveur pourrait également stocker le taux d'apprentissage du client et le faire décroître au fil du temps ! Notez que cela exigera des changements aux signatures de type utilisé dans le tff.tf_computation appelle ci - dessus.

Plus dur Défi: Mettre en œuvre la moyenne fédérée avec l' apprentissage carie taux sur les clients.

À ce stade, vous pouvez commencer à réaliser à quel point il y a de la flexibilité dans ce que vous pouvez implémenter dans ce cadre. Pour des idées (y compris la réponse au défi plus difficile) ci - dessus , vous pouvez voir le code source pour tff.learning.build_federated_averaging_process ou consulter divers projets de recherche utilisant TFF.