Apprentissage fédéré pour la classification des images

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Dans ce tutoriel, nous utilisons l'exemple de formation MNIST classique pour introduire la couche API d' apprentissage fédéré (FL) de TFF, tff.learning - un ensemble d'interfaces de niveau supérieur qui peut être utilisé pour effectuer des types communs de tâches d'apprentissage fédérées, telles que formation fédérée, par rapport aux modèles fournis par l'utilisateur mis en œuvre dans TensorFlow.

Ce didacticiel et l'API d'apprentissage fédéré sont principalement destinés aux utilisateurs qui souhaitent connecter leurs propres modèles TensorFlow à TFF, en traitant ce dernier principalement comme une boîte noire. Pour une compréhension plus approfondie de la TFF et comment mettre en œuvre vos propres algorithmes d'apprentissage fédérés, consultez les tutoriels sur l'API FC de base - Federated sur mesure Algorithmes Partie 1 et Partie 2 .

Pour en savoir plus sur tff.learning , poursuivre l' apprentissage fédéré de texte génération , tutoriel qui , en plus de couvrir les modèles récurrents, montre également le chargement d' un modèle Keras sérialisé pré-formation pour le raffinement avec l' apprentissage fédérée combinée à l' évaluation à l' aide Keras.

Avant de commencer

Avant de commencer, veuillez exécuter ce qui suit pour vous assurer que votre environnement est correctement configuré. Si vous ne voyez pas un message d' accueil, s'il vous plaît se référer à l' installation guide pour les instructions.

# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
!pip install --quiet --upgrade tb-nightly  # or tensorboard, but not both

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

Préparation des données d'entrée

Commençons par les données. L'apprentissage fédéré nécessite un ensemble de données fédérées, c'est-à-dire une collection de données provenant de plusieurs utilisateurs. Données fédérée est généralement non IID , ce qui pose un ensemble unique de défis.

Afin de faciliter l' expérimentation, nous avons semé le dépôt de TFF avec quelques ensembles de données, y compris une version fédérée de MNIST qui contient une version du jeu de données NIST qui a été re-traitées à l' aide des feuilles de sorte que les données sont calée par l'auteur original de les chiffres. Étant donné que chaque rédacteur a un style unique, cet ensemble de données présente le type de comportement non-iid attendu des ensembles de données fédérés.

Voici comment nous pouvons le charger.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Les ensembles de données retournées par load_data() sont des instances de tff.simulation.ClientData , une interface qui vous permet d'énumérer l'ensemble des utilisateurs, de construire un tf.data.Dataset qui représente les données d'un utilisateur particulier, et d'interroger la structure des éléments individuels. Voici comment vous pouvez utiliser cette interface pour explorer le contenu de l'ensemble de données. Gardez à l'esprit que bien que cette interface vous permette d'itérer sur les identifiants des clients, il ne s'agit que d'une fonctionnalité des données de simulation. Comme vous le verrez bientôt, les identités des clients ne sont pas utilisées par le cadre d'apprentissage fédéré - leur seul objectif est de vous permettre de sélectionner des sous-ensembles de données pour les simulations.

len(emnist_train.client_ids)
3383
emnist_train.element_type_structure
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
1
from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()

png

Explorer l'hétérogénéité des données fédérées

Données fédérée est généralement non IID , les utilisateurs ont généralement des distributions de données en fonction des habitudes d'utilisation. Certains clients peuvent avoir moins d'exemples de formation sur l'appareil, souffrant d'un manque de données localement, tandis que certains clients auront plus qu'assez d'exemples de formation. Explorons ce concept d'hétérogénéité des données typique d'un système fédéré avec les données EMNIST dont nous disposons. Il est important de noter que cette analyse approfondie des données d'un client n'est disponible que pour nous car il s'agit d'un environnement de simulation où toutes les données sont disponibles localement. Dans un environnement de production fédéré réel, vous ne seriez pas en mesure d'inspecter les données d'un seul client.

Tout d'abord, prenons un échantillon des données d'un client pour avoir une idée des exemples sur un appareil simulé. Étant donné que l'ensemble de données que nous utilisons a été saisi par un rédacteur unique, les données d'un client représentent l'écriture manuscrite d'une personne pour un échantillon des chiffres 0 à 9, simulant le « modèle d'utilisation » unique d'un utilisateur.

## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1

png

Visualisons maintenant le nombre d'exemples sur chaque client pour chaque étiquette de chiffre MNIST. Dans l'environnement fédéré, le nombre d'exemples sur chaque client peut varier considérablement, en fonction du comportement de l'utilisateur.

# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

png

Visualisons maintenant l'image moyenne par client pour chaque étiquette MNIST. Ce code produira la moyenne de chaque valeur de pixel pour tous les exemples de l'utilisateur pour une étiquette. Nous verrons que l'image moyenne d'un client pour un chiffre sera différente de l'image moyenne d'un autre client pour le même chiffre, en raison du style d'écriture unique de chaque personne. Nous pouvons réfléchir à la façon dont chaque cycle de formation local poussera le modèle dans une direction différente sur chaque client, car nous apprenons des données uniques de cet utilisateur dans ce cycle local. Plus tard dans le didacticiel, nous verrons comment nous pouvons prendre chaque mise à jour du modèle de tous les clients et les agréger dans notre nouveau modèle global, qui a appris de chacune des données uniques de nos clients.

# Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')

png

png

png

png

png

Les données utilisateur peuvent être bruyantes et étiquetées de manière peu fiable. Par exemple, en regardant les données du client n°2 ci-dessus, nous pouvons voir que pour l'étiquette 2, il est possible qu'il y ait eu des exemples mal étiquetés créant une image moyenne plus bruyante.

Prétraitement des données d'entrée

Étant donné que les données sont déjà tf.data.Dataset , pré - traitement peut être accompli en utilisant des transformations Dataset. Ici, nous aplatit les 28x28 images dans 784 tableaux -Element, mélanger les exemples individuels, les organiser en lots, et renomme les caractéristiques de pixels et l' label à x et y pour une utilisation avec Keras. Nous lançons également dans une repeat sur l'ensemble de données pour exécuter plusieurs époques.

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Vérifions que cela a fonctionné.

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [5],
       [7],
       [1],
       [7],
       [7],
       [1],
       [4],
       [7],
       [4],
       [2],
       [2],
       [5],
       [4],
       [1],
       [1],
       [0],
       [0],
       [9]], dtype=int32))])

Nous avons presque tous les éléments constitutifs en place pour construire des ensembles de données fédérés.

L' une des façons de nourrir les données fédérées TFF dans une simulation est simplement une liste Python, chaque élément de la liste qui contient les données d'un utilisateur, que ce soit une liste ou tf.data.Dataset . Puisque nous avons déjà une interface qui fournit ce dernier, utilisons-la.

Voici une fonction d'assistance simple qui construira une liste d'ensembles de données à partir de l'ensemble donné d'utilisateurs en tant qu'entrée pour une série de formation ou d'évaluation.

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

Maintenant, comment choisissons-nous les clients?

Dans un scénario de formation fédérée typique, nous traitons potentiellement une très grande population d'appareils utilisateur, dont seule une fraction peut être disponible pour la formation à un moment donné. C'est le cas, par exemple, lorsque les appareils clients sont des téléphones portables qui participent à la formation uniquement lorsqu'ils sont branchés sur une source d'alimentation, hors d'un réseau mesuré et autrement inactifs.

Bien sûr, nous sommes dans un environnement de simulation, et toutes les données sont disponibles localement. En règle générale, lors de l'exécution de simulations, nous échantillonnerions simplement un sous-ensemble aléatoire de clients à impliquer dans chaque cycle de formation, généralement différents à chaque cycle.

Cela dit, comme vous pouvez le découvrir en étudiant le papier sur le calcul de la moyenne fédérée algorithme, la convergence dans un système avec des sous - ensembles au hasard de l' échantillon de clients dans chaque tour peut prendre un certain temps, et il serait impossible d'avoir à exécuter des centaines de tours en ce tutoriel interactif.

Ce que nous allons faire à la place, c'est échantillonner l'ensemble de clients une fois et réutiliser le même ensemble à travers les tours pour accélérer la convergence (sur-ajustement intentionnel aux données de ces quelques utilisateurs). Nous laissons comme exercice au lecteur de modifier ce didacticiel pour simuler un échantillonnage aléatoire - c'est assez facile à faire (une fois que vous l'avez fait, gardez à l'esprit que faire converger le modèle peut prendre un certain temps).

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

Créer un modèle avec Keras

Si vous utilisez Keras, vous avez probablement déjà du code qui construit un modèle Keras. Voici un exemple de modèle simple qui suffira à nos besoins.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

Pour utiliser un modèle avec TFF, il doit être enveloppé dans une instance de la tff.learning.Model interface, qui expose des méthodes pour tamponner passe en avant du modèle, les propriétés de métadonnées, etc., de façon similaire à Keras, mais aussi introduit plus éléments, tels que les moyens de contrôler le processus de calcul des métriques fédérées. Ne nous en inquiétons pas pour l'instant ; si vous avez un modèle Keras comme celui que nous venons définie plus haut, vous pouvez envelopper TFF pour vous en invoquant tff.learning.from_keras_model , en passant le modèle et un échantillon lot de données comme arguments, comme indiqué ci - dessous.

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Entraîner le modèle sur des données fédérées

Maintenant que nous avons un modèle enveloppé comme tff.learning.Model pour une utilisation avec TFF, nous pouvons laisser TFF construire un algorithme de calcul de la moyenne fédéré en appelant la fonction d'aide tff.learning.build_federated_averaging_process , comme suit , .

Gardez à l' esprit que l'argument doit être un constructeur (comme model_fn ci - dessus), et non une instance déjà construite, de sorte que la construction de votre modèle peut se produire dans un contexte contrôlé par TFF (si vous êtes curieux de savoir les raisons de cela, nous vous invitons à lire le tutoriel de suivi des algorithmes personnalisés ).

Une note critique sur l'algorithme de calcul de la moyenne Federated ci - dessous, il y a 2 optimiseurs: un optimiseur de _client et un optimiseur de _SERVER. L'optimiseur de _client est uniquement utilisé pour calculer les mises à jour de modèle local sur chaque client. L'optimiseur _SERVER applique la mise à jour en moyenne au modèle global au niveau du serveur. En particulier, cela signifie que le choix de l'optimiseur et du taux d'apprentissage utilisé peut être différent de ceux que vous avez utilisés pour entraîner le modèle sur un jeu de données iid standard. Nous vous recommandons de commencer par un SGD régulier, éventuellement avec un taux d'apprentissage plus faible que d'habitude. Le taux d'apprentissage que nous utilisons n'a pas été soigneusement réglé, n'hésitez pas à expérimenter.

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

Qu'est-ce qui vient juste de se passer? TFF a construit une paire de calculs fédérés et de les emballer dans un tff.templates.IterativeProcess où ces calculs sont disponibles sous forme d' une paire de propriétés initialize et next .

En un mot, les calculs sont fédérés des programmes en langage interne de TFF qui peuvent exprimer différents algorithmes fédérées (vous pouvez trouver plus à ce sujet dans les algorithmes personnalisés tutoriel). Dans ce cas, les deux calculs générés et emballés dans iterative_process mettre en œuvre la moyenne fédérée .

L'un des objectifs de TFF est de définir les calculs de manière à ce qu'ils puissent être exécutés dans des paramètres d'apprentissage fédérés réels, mais actuellement, seul le runtime de simulation d'exécution locale est implémenté. Pour exécuter un calcul dans un simulateur, vous l'invoquez simplement comme une fonction Python. Cet environnement interprété par défaut n'est pas conçu pour des performances élevées, mais il suffira pour ce didacticiel ; nous prévoyons de fournir des temps d'exécution de simulation plus performants pour faciliter la recherche à plus grande échelle dans les prochaines versions.

Commençons par l' initialize de calcul. Comme c'est le cas pour tous les calculs fédérés, vous pouvez le considérer comme une fonction. Le calcul ne prend aucun argument et renvoie un résultat - la représentation de l'état du processus de moyenne fédérée sur le serveur. Bien que nous ne voulions pas plonger dans les détails de TFF, il peut être instructif de voir à quoi ressemble cet état. Vous pouvez le visualiser comme suit.

str(iterative_process.initialize.type_signature)
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

Alors que la signature de type ci - dessus peut sembler à première vue un cryptique de bits, vous pouvez reconnaître que l'état du serveur se compose d'un model (les paramètres du modèle initial pour MNIST qui seront distribués à tous les appareils) et optimizer_state (informations supplémentaires maintenu par le serveur, comme le nombre de tours à utiliser pour les horaires d'hyperparamètres, etc.).

Nous allons invoquer le initialize calcul pour construire l'état du serveur.

state = iterative_process.initialize()

Le deuxième de la paire de calculs fédérés, next , représente un cycle unique de calcul de la moyenne fédérée, qui consiste à pousser l'état du serveur (y compris les paramètres du modèle) pour les clients, la formation sur l'appareil sur ses données locales, la collecte et la mise à jour du modèle de calcul de moyenne , et produire un nouveau modèle mis à jour sur le serveur.

Conceptuellement, vous pouvez penser à next comme ayant une signature de type fonctionnel qui suit ressemble comme.

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

En particulier, il faut penser à la next() non pas comme étant une fonction qui fonctionne sur un serveur, mais étant plutôt une représentation fonctionnelle déclarative de l'ensemble de calcul décentralisée - certaines des entrées sont fournies par le serveur ( SERVER_STATE ), mais chaque participant l'appareil contribue à son propre jeu de données local.

Exécutons un seul cycle de formation et visualisons les résultats. Nous pouvons utiliser les données fédérées que nous avons déjà générées ci-dessus pour un échantillon d'utilisateurs.

state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12345679), ('loss', 3.1193738)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Faisons encore quelques tours. Comme indiqué précédemment, généralement à ce stade, vous choisiriez un sous-ensemble de vos données de simulation à partir d'un nouvel échantillon d'utilisateurs sélectionnés au hasard pour chaque tour afin de simuler un déploiement réaliste dans lequel les utilisateurs vont et viennent en continu, mais dans ce cahier interactif, pour dans un souci de démonstration nous allons juste réutiliser les mêmes utilisateurs, afin que le système converge rapidement.

NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13518518), ('loss', 2.9834728)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14382716), ('loss', 2.861665)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.17407407), ('loss', 2.7957022)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.19917695), ('loss', 2.6146567)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.21975309), ('loss', 2.529761)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2409465), ('loss', 2.4053504)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2611111), ('loss', 2.315389)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.30823046), ('loss', 2.1240263)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.33312756), ('loss', 2.1164262)])), ('stat', OrderedDict([('num_examples', 4860)]))])

La perte d'entraînement diminue après chaque cycle d'entraînement fédéré, ce qui indique que le modèle converge. Il y a quelques mises en garde importantes avec ces mesures de formation, cependant, voir plus loin la section sur l' évaluation dans ce tutoriel.

Afficher les métriques du modèle dans TensorBoard

Ensuite, visualisons les métriques de ces calculs fédérés à l'aide de Tensorboard.

Commençons par créer le répertoire et le rédacteur de résumé correspondant dans lequel écrire les métriques.

logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

Tracez les métriques scalaires pertinentes avec le même rédacteur de résumé.

with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

Démarrez TensorBoard avec le répertoire de journal racine spécifié ci-dessus. Le chargement des données peut prendre quelques secondes.

!ls {logdir}
%tensorboard --logdir {logdir} --port=0
events.out.tfevents.1629557449.ebe6e776479e64ea-4903924a278.borgtask.google.com.458912.1.v2
Launching TensorBoard...
Reusing TensorBoard on port 50681 (pid 292785), started 0:30:30 ago. (Use '!kill 292785' to kill it.)
<IPython.core.display.Javascript at 0x7fd6617e02d0>
# Uncomment and run this this cell to clean your directory of old output for
# future graphs from this directory. We don't run it by default so that if 
# you do a "Runtime > Run all" you don't lose your results.

# !rm -R /tmp/logs/scalars/*

Afin d'afficher les métriques d'évaluation de la même manière, vous pouvez créer un dossier eval distinct, comme "logs/scalars/eval", à écrire sur TensorBoard.

Personnalisation de la mise en œuvre du modèle

Keras est l' API du modèle de tff.learning.from_keras_model haut niveau recommandé pour tensorflow , et nous encourageons l' utilisation de modèles KERAS (via tff.learning.from_keras_model ) dans la mesure du possible TFF.

Cependant, tff.learning fournit une interface modèle de niveau inférieur, tff.learning.Model , qui expose la fonctionnalité minimale nécessaire à l' aide d' un modèle d'apprentissage fédéré. Directement mise en œuvre de cette interface (peut - être encore en utilisant des blocs de construction comme tf.keras.layers ) permet une personnalisation maximale sans modifier les composants internes des algorithmes d'apprentissage fédérées.

Reprenons donc tout à zéro.

Définition des variables de modèle, de la passe avant et des métriques

La première étape consiste à identifier les variables TensorFlow avec lesquelles nous allons travailler. Afin de rendre le code suivant plus lisible, définissons une structure de données pour représenter l'ensemble. Cela comprendra des variables telles que le weights et bias que nous allons former, ainsi que des variables qui contiendront diverses statistiques cumulatives et les compteurs que nous mettrons à jour au cours de la formation, comme loss_sum , accuracy_sum et num_examples .

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

Voici une méthode qui crée les variables. Par souci de simplicité, nous représentons toutes les statistiques comme tf.float32 , car cela éliminera la nécessité pour les conversions de type à un stade ultérieur. Emballage initializers variables comme lambdas est une exigence imposée par les variables de ressources .

def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

Avec les variables pour les paramètres du modèle et les statistiques cumulatives en place, nous pouvons maintenant définir la méthode de transmission directe qui calcule la perte, émet des prédictions et met à jour les statistiques cumulatives pour un seul lot de données d'entrée, comme suit.

def predict_on_batch(variables, x):
  return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)

def mnist_forward_pass(variables, batch):
  y = predict_on_batch(variables, batch['x'])
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

Ensuite, nous définissons une fonction qui renvoie un ensemble de métriques locales, toujours à l'aide de TensorFlow. Ce sont les valeurs (en plus des mises à jour de modèle, qui sont gérées automatiquement) qui peuvent être agrégées sur le serveur dans un processus d'apprentissage ou d'évaluation fédéré.

Ici, nous revenons tout simplement la moyenne la loss et la accuracy , ainsi que les num_examples , que nous aurons besoin de poids correctement les contributions des différents utilisateurs lors du calcul des agrégats fédérés.

def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

Enfin, nous devons déterminer comment regrouper les mesures locales émises par chaque appareil via get_local_mnist_metrics . Ceci est la seule partie du code qui n'est pas écrit dans tensorflow - c'est un calcul fédérée exprimé en TFF. Si vous souhaitez creuser plus profondément, effleurer les algorithmes personnalisés tutoriel, mais dans la plupart des applications, vous ne serez pas vraiment besoin; des variantes du modèle ci-dessous devraient suffire. Voici à quoi cela ressemble :

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

Les entrées metrics arguments correspond à la OrderedDict retourné par get_local_mnist_metrics ci - dessus, mais critique les valeurs ne sont plus tf.Tensors - ils sont « boxed » comme tff.Value s, pour le rendre clair , vous ne pouvez plus les manipuler à l' aide tensorflow, mais seulement à l' aide des opérateurs fédérés de TFF comme tff.federated_mean et tff.federated_sum . Le dictionnaire des agrégats globaux renvoyé définit l'ensemble des métriques qui seront disponibles sur le serveur.

La construction d' une instance de tff.learning.Model

Avec tout ce qui précède en place, nous sommes prêts à construire une représentation de modèle à utiliser avec TFF similaire à celle qui est générée pour vous lorsque vous laissez TFF ingérer un modèle Keras.

from typing import Callable, List, OrderedDict

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def predict_on_batch(self, x, training=True):
    del training
    return predict_on_batch(self._variables, x)

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

  @tf.function
  def report_local_unfinalized_metrics(
      self) -> OrderedDict[str, List[tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to unfinalized values."""
    return collections.OrderedDict(
        num_examples=[self._variables.num_examples],
        loss=[self._variables.loss_sum, self._variables.num_examples],
        accuracy=[self._variables.accuracy_sum, self._variables.num_examples])

  def metric_finalizers(
      self) -> OrderedDict[str, Callable[[List[tf.Tensor]], tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to finalizers."""
    return collections.OrderedDict(
        num_examples=tf.function(func=lambda x: x[0]),
        loss=tf.function(func=lambda x: x[0] / x[1]),
        accuracy=tf.function(func=lambda x: x[0] / x[1]))

Comme vous pouvez le voir, les méthodes abstraites et propriétés définies par tff.learning.Model correspond aux extraits de code dans la section précédente qui a introduit les variables et définit la perte et les statistiques.

Voici quelques points à souligner :

  • Tout état que votre modèle utilisera doit être capturé en tant que variables tensorflow, comme TFF ne pas utiliser Python à l' exécution (rappelez - vous votre code doit être écrit de telle sorte qu'il peut être déployé sur des appareils mobiles, voir les algorithmes personnalisés tutoriel pour une analyse plus approfondie commentaire sur les raisons).
  • Votre modèle doit décrire quelle forme de données qu'il accepte ( input_spec ), comme en général, TFF est un environnement fortement typé et veut déterminer les signatures de type pour tous les composants. Déclarer le format de l'entrée de votre modèle en est une partie essentielle.
  • Bien que techniquement pas nécessaire, nous vous recommandons de toute logique d' emballage de tensorflow (passe en avant, calculs métriques, etc.) comme tf.function s, car cela contribue à assurer la tensorflow peut être sérialisé, et supprime la nécessité pour les dépendances de contrôle explicites.

Ce qui précède est suffisant pour l'évaluation et les algorithmes comme Federated SGD. Cependant, pour la moyenne fédérée, nous devons spécifier comment le modèle doit s'entraîner localement sur chaque lot. Nous spécifierons un optimiseur local lors de la construction de l'algorithme de moyenne fédérée.

Simuler une formation fédérée avec le nouveau modèle

Avec tout ce qui précède en place, le reste du processus ressemble à ce que nous avons déjà vu - remplacez simplement le constructeur de modèle par le constructeur de notre nouvelle classe de modèle et utilisez les deux calculs fédérés dans le processus itératif que vous avez créé pour parcourir tours de formation.

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.0708053), ('accuracy', 0.12777779)])), ('stat', OrderedDict([('num_examples', 4860)]))])
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.011699), ('accuracy', 0.13024691)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.7408307), ('accuracy', 0.15576132)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.6761012), ('accuracy', 0.17921811)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.675567), ('accuracy', 0.1855967)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.5664043), ('accuracy', 0.20329218)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.4179392), ('accuracy', 0.24382716)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.3237286), ('accuracy', 0.26687244)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.1861682), ('accuracy', 0.28209877)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.046388), ('accuracy', 0.32037038)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Pour afficher ces métriques dans TensorBoard, reportez-vous aux étapes répertoriées ci-dessus dans « Affichage des métriques de modèle dans TensorBoard ».

Évaluation

Toutes nos expériences jusqu'à présent n'ont présenté que des métriques d'entraînement fédérées - les métriques moyennes sur tous les lots de données entraînées sur tous les clients du cycle. Cela introduit les préoccupations normales concernant le surapprentissage, d'autant plus que nous avons utilisé le même ensemble de clients à chaque tour pour plus de simplicité, mais il existe une notion supplémentaire de surapprentissage dans les métriques d'entraînement spécifiques à l'algorithme de moyenne fédérée. C'est plus facile à voir si nous imaginons que chaque client a un seul lot de données, et que nous nous entraînons sur ce lot pour de nombreuses itérations (époques). Dans ce cas, le modèle local s'adaptera rapidement et exactement à ce lot, et donc la métrique de précision locale dont nous faisons la moyenne approchera de 1,0. Ainsi, ces mesures d'entraînement peuvent être considérées comme un signe que l'entraînement progresse, mais pas beaucoup plus.

Pour effectuer une évaluation des données fédérées, vous pouvez construire un autre calcul fédéré conçu uniquement dans ce but, en utilisant la tff.learning.build_federated_evaluation fonction, et en passant dans le constructeur de votre modèle comme argument. Notez que contrairement à la moyenne fédérée, où nous avons utilisé MnistTrainableModel , il suffit de passer le MnistModel . L'évaluation n'effectue pas de descente de gradient et il n'est pas nécessaire de construire des optimiseurs.

Pour l' expérimentation et la recherche, lorsqu'un jeu de données de test centralisé est disponible, l' apprentissage fédéré de texte génération démontre une autre option d'évaluation: prendre les poids formés de l' apprentissage fédéré, en les appliquant à un modèle standard Keras, puis appeler simplement tf.keras.models.Model.evaluate() sur un ensemble de données centralisée.

evaluation = tff.learning.build_federated_evaluation(MnistModel)

Vous pouvez inspecter la signature de type abstrait de la fonction d'évaluation comme suit.

str(evaluation.type_signature)
'(<server_model_weights=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,federated_dataset={<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <eval=<num_examples=float32,loss=float32,accuracy=float32>,stat=<num_examples=int64>>@SERVER)'

Pas besoin d'être préoccupé par les détails à ce stade, il faut savoir que cela prend la forme générale suivante, similaire à tff.templates.IterativeProcess.next mais avec deux différences importantes. Premièrement, nous ne renvoyons pas l'état du serveur, car l'évaluation ne modifie pas le modèle ou tout autre aspect de l'état - vous pouvez le considérer comme sans état. Deuxièmement, l'évaluation n'a besoin que du modèle et ne nécessite aucune autre partie de l'état du serveur qui pourrait être associée à la formation, telle que les variables d'optimisation.

SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS

Invoquons l'évaluation sur le dernier état auquel nous sommes arrivés pendant la formation. Afin d'extraire le dernier modèle formé de l'état du serveur, vous accédez simplement le .model membre, comme suit.

train_metrics = evaluation(state.model, federated_train_data)

Voici ce que nous obtenons. Notez que les chiffres semblent légèrement meilleurs que ce qui a été rapporté par le dernier cycle de formation ci-dessus. Par convention, les métriques d'entraînement rapportées par le processus d'entraînement itératif reflètent généralement les performances du modèle au début du cycle d'entraînement, de sorte que les métriques d'évaluation auront toujours une longueur d'avance.

str(train_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 4860.0), ('loss', 1.7510437), ('accuracy', 0.2788066)])), ('stat', OrderedDict([('num_examples', 4860)]))])"

Compilons maintenant un échantillon de test de données fédérées et réexécutons l'évaluation sur les données de test. Les données proviendront du même échantillon d'utilisateurs réels, mais d'un ensemble de données distinctes conservées.

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 580.0), ('loss', 1.8361608), ('accuracy', 0.2413793)])), ('stat', OrderedDict([('num_examples', 580)]))])"

Ceci conclut le tutoriel. Nous vous encourageons à jouer avec les paramètres (par exemple, la taille des lots, le nombre d'utilisateurs, les époques, les taux d'apprentissage, etc.), à modifier le code ci-dessus pour simuler l'entraînement sur des échantillons aléatoires d'utilisateurs à chaque tour et à explorer les autres tutoriels nous avons développé.