Esta página foi traduzida pela API Cloud Translation.
Switch to English

Aprendizagem federada para classificação de imagens

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub

Neste tutorial, usamos o exemplo de treinamento MNIST clássico para apresentar a camada de API Federated Learning (FL) do TFF, tff.learning - um conjunto de interfaces de nível superior que pode ser usado para realizar tipos comuns de tarefas de aprendizado federado, como treinamento federado, em relação a modelos fornecidos pelo usuário implementados no TensorFlow.

Este tutorial e a API Federated Learning destinam-se principalmente a usuários que desejam conectar seus próprios modelos TensorFlow ao TFF, tratando o último principalmente como uma caixa preta. Para uma compreensão mais aprofundada do TFF e como implementar seus próprios algoritmos de aprendizado federado, consulte os tutoriais na API FC Core - Algoritmos Federados Customizados Parte 1 e Parte 2 .

Para obter mais informações sobre tff.learning , continue com o Aprendizado Federado para Geração de Texto , tutorial que além de cobrir modelos recorrentes, também demonstra o carregamento de um modelo Keras serializado pré-treinado para refinamento com aprendizado federado combinado com avaliação usando Keras.

Antes de começarmos

Antes de começar, execute o seguinte para se certificar de que seu ambiente está configurado corretamente. Se você não vir uma saudação, consulte o guia de instalação para obter instruções.


!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

%load_ext tensorboard
import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

Preparando os dados de entrada

Vamos começar com os dados. A aprendizagem federada requer um conjunto de dados federados, ou seja, uma coleção de dados de vários usuários. Os dados federados normalmente não são iid , o que representa um conjunto único de desafios.

Para facilitar a experimentação, semeamos o repositório TFF com alguns conjuntos de dados, incluindo uma versão federada de MNIST que contém uma versão do conjunto de dados NIST original que foi reprocessado usando Leaf para que os dados sejam digitados pelo escritor original de os dígitos. Como cada gravador tem um estilo único, esse conjunto de dados exibe o tipo de comportamento não-iid esperado dos conjuntos de dados federados.

Veja como podemos carregá-lo.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Os conjuntos de dados retornados por load_data() são instâncias de tff.simulation.ClientData , uma interface que permite enumerar o conjunto de usuários, construir um tf.data.Dataset que representa os dados de um determinado usuário e consultar o estrutura de elementos individuais. Veja como você pode usar essa interface para explorar o conteúdo do conjunto de dados. Lembre-se de que, embora essa interface permita que você itere sobre os ids dos clientes, esse é apenas um recurso dos dados de simulação. Como você verá em breve, as identidades do cliente não são usadas pela estrutura de aprendizado federado - seu único propósito é permitir que você selecione subconjuntos de dados para simulações.

len(emnist_train.client_ids)
3383
emnist_train.element_type_structure
OrderedDict([('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None)), ('label', TensorSpec(shape=(), dtype=tf.int32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
1
from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()

png

Explorando a heterogeneidade em dados federados

Os dados federados são normalmente não- iid , os usuários geralmente têm distribuições diferentes de dados dependendo dos padrões de uso. Alguns clientes podem ter menos exemplos de treinamento no dispositivo, sofrendo de escassez de dados localmente, enquanto alguns clientes terão exemplos de treinamento mais do que suficientes. Vamos explorar esse conceito de heterogeneidade de dados típico de um sistema federado com os dados EMNIST que temos disponíveis. É importante notar que esta análise profunda dos dados de um cliente só está disponível para nós porque este é um ambiente de simulação onde todos os dados estão disponíveis para nós localmente. Em um ambiente federado de produção real, você não seria capaz de inspecionar os dados de um único cliente.

Primeiro, vamos pegar uma amostra dos dados de um cliente para ter uma ideia dos exemplos em um dispositivo simulado. Como o conjunto de dados que estamos usando foi digitado por um escritor exclusivo, os dados de um cliente representam a escrita de uma pessoa para uma amostra dos dígitos de 0 a 9, simulando o "padrão de uso" exclusivo de um usuário.

## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1

png

Agora vamos visualizar o número de exemplos em cada cliente para cada rótulo de dígito MNIST. No ambiente federado, o número de exemplos em cada cliente pode variar um pouco, dependendo do comportamento do usuário.

# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

png

Agora vamos visualizar a imagem média por cliente para cada rótulo MNIST. Este código produzirá a média de cada valor de pixel para todos os exemplos do usuário para um rótulo. Veremos que a imagem média de um cliente para um dígito será diferente da imagem média de outro cliente para o mesmo dígito, devido ao estilo de caligrafia exclusivo de cada pessoa. Podemos refletir sobre como cada rodada de treinamento local irá empurrar o modelo em uma direção diferente em cada cliente, pois estamos aprendendo com os próprios dados exclusivos do usuário naquela rodada local. Mais tarde no tutorial, veremos como podemos pegar cada atualização do modelo de todos os clientes e agregá-los em nosso novo modelo global, que aprendeu com cada um dos dados exclusivos de cada cliente.

# Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')

png

png

png

png

png

Os dados do usuário podem ser ruidosos e rotulados de maneira não confiável. Por exemplo, observando os dados do Cliente 2 acima, podemos ver que para o rótulo 2, é possível que alguns exemplos tenham sido erroneamente criados, criando uma imagem média mais barulhenta.

Pré-processamento dos dados de entrada

Como os dados já são um tf.data.Dataset , o pré-processamento pode ser realizado usando transformações de Dataset. Aqui, nós achatar os 28x28 imagens em 784 matrizes -element, embaralhar os exemplos individuais, organizá-los em lotes, e renomear as características de pixels e label para x e y para uso com Keras. Também adicionamos uma repeat do conjunto de dados para executar várias épocas.

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Vamos verificar se isso funcionou.

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [2],
       [3],
       [6],
       [0],
       [1],
       [4],
       [1],
       [0],
       [6],
       [9],
       [9],
       [3],
       [6],
       [1],
       [4],
       [8],
       [0],
       [2]], dtype=int32))])

Temos quase todos os blocos de construção no local para construir conjuntos de dados federados.

Uma das maneiras de alimentar dados federados para TFF em uma simulação é simplesmente como uma lista Python, com cada elemento da lista contendo os dados de um usuário individual, seja como uma lista ou como um tf.data.Dataset . Como já temos uma interface que oferece o último, vamos usá-lo.

Aqui está uma função auxiliar simples que construirá uma lista de conjuntos de dados de um determinado conjunto de usuários como uma entrada para uma rodada de treinamento ou avaliação.

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

Agora, como escolhemos os clientes?

Em um cenário típico de treinamento federado, estamos lidando com uma população potencialmente muito grande de dispositivos de usuário, apenas uma fração da qual pode estar disponível para treinamento em um determinado momento. Esse é o caso, por exemplo, quando os dispositivos do cliente são telefones celulares que participam do treinamento apenas quando conectados a uma fonte de alimentação, fora de uma rede medida e, de outra forma, ociosos.

Claro, estamos em um ambiente de simulação e todos os dados estão disponíveis localmente. Normalmente, então, ao executar simulações, simplesmente amostraríamos um subconjunto aleatório dos clientes a serem envolvidos em cada rodada de treinamento, geralmente diferente em cada rodada.

Dito isso, como você pode descobrir estudando o artigo sobre o algoritmo da Média Federada , alcançar a convergência em um sistema com subconjuntos de clientes amostrados aleatoriamente em cada rodada pode demorar um pouco e seria impraticável ter que executar centenas de rodadas em este tutorial interativo.

Em vez disso, faremos uma amostra do conjunto de clientes uma vez e reutilizaremos o mesmo conjunto em rodadas para acelerar a convergência (sobreajuste intencionalmente para esses poucos dados do usuário). Deixamos isso como um exercício para o leitor modificar este tutorial para simular a amostragem aleatória - é bastante fácil de fazer (uma vez que você fizer isso, tenha em mente que fazer o modelo convergir pode demorar um pouco).

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

Criação de um modelo com Keras

Se você estiver usando o Keras, provavelmente já possui um código que constrói um modelo Keras. Aqui está um exemplo de um modelo simples que será suficiente para nossas necessidades.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

Para usar qualquer modelo com TFF, ele precisa ser tff.learning.Model em uma instância da interface tff.learning.Model , que expõe métodos para estampar a passagem de avanço do modelo, propriedades de metadados, etc., de forma semelhante a Keras, mas também apresenta outros elementos, como formas de controlar o processo de cálculo de métricas federadas. Não vamos nos preocupar com isso por enquanto; se você tiver um modelo Keras como o que acabamos de definir acima, você pode fazer com que o TFF o embrulhe para você invocando tff.learning.from_keras_model , passando o modelo e um lote de dados de amostra como argumentos, conforme mostrado abaixo.

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Treinar o modelo em dados federados

Agora que temos um modelo empacotado como tff.learning.Model para uso com TFF, podemos permitir que TFF construa um algoritmo de Média Federada invocando a função auxiliar tff.learning.build_federated_averaging_process , como segue.

Tenha em mente que o argumento precisa ser um construtor (como model_fn acima), não uma instância já construída, para que a construção de seu modelo possa acontecer em um contexto controlado por TFF (se você estiver curioso sobre os motivos para isso, recomendamos que você leia o tutorial de acompanhamento sobre algoritmos personalizados ).

Uma observação crítica sobre o algoritmo de Média Federada abaixo, existem 2 otimizadores: um otimizador _client e um otimizador _server. O otimizador _client é usado apenas para calcular atualizações de modelos locais em cada cliente. O otimizador _server aplica a atualização média ao modelo global no servidor. Em particular, isso significa que a escolha do otimizador e da taxa de aprendizado usada pode precisar ser diferente daquelas usadas para treinar o modelo em um conjunto de dados iid padrão. Recomendamos começar com SGD regular, possivelmente com uma taxa de aprendizado menor do que o normal. A taxa de aprendizado que usamos não foi ajustada cuidadosamente, sinta-se à vontade para experimentar.

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

O que acabou de acontecer? A TFF construiu um par de cálculos federados e os empacotou em um tff.templates.IterativeProcess no qual esses cálculos estão disponíveis como um par de propriedades initialize e next .

Em suma, os cálculos federados são programas na linguagem interna da TFF que podem expressar vários algoritmos federados (você pode encontrar mais sobre isso no tutorial de algoritmos personalizados ). Nesse caso, os dois cálculos gerados e empacotados em iterative_process implementam a Média Federada .

É um objetivo da TFF definir cálculos de forma que possam ser executados em configurações de aprendizagem federadas reais, mas atualmente apenas o tempo de execução de simulação de execução local é implementado. Para executar um cálculo em um simulador, basta invocá-lo como uma função Python. Este ambiente interpretado padrão não foi projetado para alto desempenho, mas será suficiente para este tutorial; Esperamos fornecer tempos de execução de simulação de alto desempenho para facilitar pesquisas em larga escala em versões futuras.

Vamos começar com a computação de initialize . Como é o caso de todos os cálculos federados, você pode pensar nisso como uma função. O cálculo não leva argumentos e retorna um resultado - a representação do estado do processo de Média Federada no servidor. Embora não desejemos entrar em detalhes sobre a TFF, pode ser instrutivo ver como é esse estado. Você pode visualizá-lo da seguinte maneira.

str(iterative_process.initialize.type_signature)
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<>,model_broadcast_state=<>>@SERVER)'

Embora a assinatura de tipo acima possa parecer um pouco enigmática à primeira vista, você pode reconhecer que o estado do servidor consiste em um model (os parâmetros do modelo inicial para MNIST que serão distribuídos a todos os dispositivos) e optimizer_state (informações adicionais mantidas pelo servidor, como o número de rodadas a serem usadas para programações de hiperparâmetros etc.).

Vamos invocar a computação de initialize para construir o estado do servidor.

state = iterative_process.initialize()

O segundo do par de cálculos federados, a next , representa uma única rodada de Média Federada, que consiste em enviar o estado do servidor (incluindo os parâmetros do modelo) para os clientes, treinamento no dispositivo em seus dados locais, coleta e média de atualizações do modelo e produzindo um novo modelo atualizado no servidor.

Conceitualmente, você pode pensar next como tendo uma assinatura de tipo funcional que tem a seguinte aparência.

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

Em particular, deve-se pensar em next() não como uma função que é executada em um servidor, mas sim como uma representação funcional declarativa de toda a computação descentralizada - algumas das entradas são fornecidas pelo servidor ( SERVER_STATE ), mas cada participante dispositivo contribui com seu próprio conjunto de dados local.

Vamos fazer uma única rodada de treinamento e visualizar os resultados. Podemos usar os dados federados que já geramos acima para uma amostra de usuários.

state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12037037312984467,loss=3.0108425617218018>>

Vamos fazer mais algumas rodadas. Conforme observado anteriormente, normalmente, neste ponto, você escolheria um subconjunto de seus dados de simulação de uma nova amostra de usuários selecionada aleatoriamente para cada rodada, a fim de simular uma implantação realista na qual os usuários vêm e vão continuamente, mas neste notebook interativo, para para fins de demonstração, vamos apenas reutilizar os mesmos usuários, para que o sistema converta rapidamente.

NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14814814925193787,loss=2.8865506649017334>>
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.148765429854393,loss=2.9079062938690186>>
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.17633745074272156,loss=2.724686622619629>>
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.20226337015628815,loss=2.6334855556488037>>
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.22427983582019806,loss=2.5482592582702637>>
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.24094650149345398,loss=2.4472343921661377>>
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.259876549243927,loss=2.3809611797332764>>
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.29814815521240234,loss=2.156442403793335>>
round 10, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.31687241792678833,loss=2.122845411300659>>

A perda de treinamento está diminuindo após cada rodada de treinamento federado, indicando que o modelo está convergindo. Existem algumas advertências importantes com essas métricas de treinamento, no entanto, consulte a seção sobre Avaliação posteriormente neste tutorial.

Exibindo métricas de modelo no TensorBoard

A seguir, vamos visualizar as métricas desses cálculos federados usando o Tensorboard.

Vamos começar criando o diretório e o gravador de resumo correspondente para gravar as métricas.


logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

Plote as métricas escalares relevantes com o mesmo redator de resumo.


with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

Inicie o TensorBoard com o diretório de registro raiz especificado acima. O carregamento dos dados pode demorar alguns segundos.


%tensorboard --logdir /tmp/logs/scalars/ --port=0

# Run this this cell to clean your directory of old output for future graphs from this directory.
rm -R /tmp/logs/scalars/*

Para visualizar as métricas de avaliação da mesma maneira, você pode criar uma pasta eval separada, como "logs / scalars / eval", para gravar no TensorBoard.

Customizando a implementação do modelo

Keras é a API de modelo de alto nível recomendada para TensorFlow , e recomendamos o uso de modelos Keras (via tff.learning.from_keras_model ) no TFF sempre que possível.

No entanto, tff.learning fornece uma interface de modelo de nível inferior, tff.learning.Model , que expõe a funcionalidade mínima necessária para usar um modelo de aprendizado federado. A implementação direta dessa interface (possivelmente ainda usando blocos de construção como tf.keras.layers ) permite a personalização máxima sem modificar os componentes internos dos algoritmos de aprendizagem federados.

Então, vamos fazer tudo de novo do zero.

Definindo variáveis ​​de modelo, passagem para frente e métricas

A primeira etapa é identificar as variáveis ​​do TensorFlow com as quais trabalharemos. Para tornar o código a seguir mais legível, vamos definir uma estrutura de dados para representar todo o conjunto. Isto incluirá variáveis tais como weights e bias que vamos treinar, bem como as variáveis que irá realizar várias estatísticas acumuladas e contadores vamos atualizar durante o treinamento, como loss_sum , accuracy_sum e num_examples .

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

Aqui está um método que cria as variáveis. Para simplificar, representamos todas as estatísticas como tf.float32 , pois isso eliminará a necessidade de conversões de tipo em um estágio posterior. Encapsular inicializadores de variáveis ​​como lambdas é um requisito imposto pelas variáveis ​​de recursos .

def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

Com as variáveis ​​para os parâmetros do modelo e estatísticas cumulativas no lugar, podemos agora definir o método de passagem direta que calcula a perda, emite previsões e atualiza as estatísticas cumulativas para um único lote de dados de entrada, como segue.

def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

Em seguida, definimos uma função que retorna um conjunto de métricas locais, novamente usando o TensorFlow. Esses são os valores (além das atualizações do modelo, que são tratadas automaticamente) que são elegíveis para serem agregados ao servidor em um processo de aprendizagem ou avaliação federado.

Aqui, simplesmente retornamos a loss média e a accuracy , bem como num_examples , que precisaremos para num_examples corretamente as contribuições de diferentes usuários ao calcular agregados federados.

def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

Finalmente, precisamos determinar como agregar as métricas locais emitidas por cada dispositivo por meio de get_local_mnist_metrics . Esta é a única parte do código que não foi escrita no TensorFlow - é uma computação federada expressa em TFF. Se quiser se aprofundar, dê uma olhada no tutorial de algoritmos personalizados , mas na maioria dos aplicativos, você realmente não precisa; as variantes do padrão mostrado abaixo devem ser suficientes. Esta é a aparência:

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))
  

O argumento input metrics corresponde ao OrderedDict retornado por get_local_mnist_metrics acima, mas criticamente os valores não são mais tf.Tensors - eles são "encaixotados" como tff.Value s, para deixar claro que você não pode mais manipulá-los usando TensorFlow, mas apenas usando os operadores federados da TFF como tff.federated_mean e tff.federated_sum . O dicionário retornado de agregados globais define o conjunto de métricas que estará disponível no servidor.

Construindo uma instância de tff.learning.Model

Com todos os itens acima em vigor, estamos prontos para construir uma representação de modelo para uso com TFF semelhante àquela que é gerada para você quando você permite que a TFF ingira um modelo Keras.

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

Como você pode ver, os métodos e propriedades abstratos definidos por tff.learning.Model correspondem aos fragmentos de código na seção anterior que apresentou as variáveis ​​e definiu a perda e as estatísticas.

Aqui estão alguns pontos que vale a pena destacar:

  • Todos os estados que seu modelo usará devem ser capturados como variáveis ​​do TensorFlow, já que o TFF não usa Python no tempo de execução (lembre-se de que seu código deve ser escrito de forma que possa ser implantado em dispositivos móveis; consulte o tutorial de algoritmos personalizados para uma análise mais aprofundada comentário sobre as razões).
  • Seu modelo deve descrever que forma de dados ele aceita ( input_spec ), como em geral, TFF é um ambiente fortemente tipado e deseja determinar assinaturas de tipo para todos os componentes. Declarar o formato da entrada do seu modelo é uma parte essencial dele.
  • Embora tecnicamente não seja necessário, recomendamos agrupar toda a lógica do TensorFlow (passagem de avanço, cálculos métricos etc.) como tf.function s, pois isso ajuda a garantir que o TensorFlow possa ser serializado e elimina a necessidade de dependências de controle explícitas.

O acima é suficiente para avaliação e algoritmos como Federated SGD. No entanto, para Federated Averaging, precisamos especificar como o modelo deve ser treinado localmente em cada lote. Especificaremos um otimizador local ao construir o algoritmo de Média Federada.

Simulando o treinamento federado com o novo modelo

Com tudo o que foi dito acima, o restante do processo se parece com o que já vimos - basta substituir o construtor do modelo pelo construtor da nossa nova classe de modelo e usar os dois cálculos federados no processo iterativo que você criou para percorrer rodadas de treinamento.

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.9713594913482666,accuracy=0.13518518209457397>>

for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.975412607192993,accuracy=0.14032921195030212>>
round  3, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.9395227432250977,accuracy=0.1594650149345398>>
round  4, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.710164785385132,accuracy=0.17139917612075806>>
round  5, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.5891618728637695,accuracy=0.20267489552497864>>
round  6, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.5148487091064453,accuracy=0.21666666865348816>>
round  7, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.2816808223724365,accuracy=0.2580246925354004>>
round  8, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.3656885623931885,accuracy=0.25884774327278137>>
round  9, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.23549222946167,accuracy=0.28477364778518677>>
round 10, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=1.974222183227539,accuracy=0.35329216718673706>>

Para ver essas métricas no TensorBoard, consulte as etapas listadas acima em "Exibindo métricas do modelo no TensorBoard".

Avaliação

Todos os nossos experimentos até agora apresentaram apenas métricas de treinamento federado - as métricas médias de todos os lotes de dados treinados em todos os clientes na rodada. Isso introduz as preocupações normais sobre overfitting, especialmente porque usamos o mesmo conjunto de clientes em cada rodada para simplificar, mas há uma noção adicional de overfitting em métricas de treinamento específicas para o algoritmo de Média Federada. Isso é mais fácil de ver se imaginarmos que cada cliente tinha um único lote de dados e treinarmos nesse lote por várias iterações (épocas). Nesse caso, o modelo local se ajustará com rapidez e exatidão àquele lote e, portanto, a métrica de precisão local que calculamos será próxima de 1,0. Assim, essas métricas de treinamento podem ser tomadas como um sinal de que o treinamento está progredindo, mas não muito mais.

Para realizar avaliação em dados federados, você pode construir outra computação federada projetada apenas para esse propósito, usando a função tff.learning.build_federated_evaluation e passando seu construtor de modelo como um argumento. Observe que, ao contrário do Federated Averaging, onde usamos MnistTrainableModel , é suficiente passar o MnistModel . A avaliação não executa a descida do gradiente e não há necessidade de construir otimizadores.

Para experimentação e pesquisa, quando um conjunto de dados de teste centralizado está disponível, o Federated Learning for Text Generation demonstra outra opção de avaliação: pegar os pesos treinados do aprendizado federado, aplicá-los a um modelo Keras padrão e simplesmente chamar tf.keras.models.Model.evaluate() em um conjunto de dados centralizado.

evaluation = tff.learning.build_federated_evaluation(MnistModel)

Você pode inspecionar a assinatura do tipo abstrato da função de avaliação da seguinte maneira.

str(evaluation.type_signature)
'(<<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,{<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <num_examples=float32@SERVER,loss=float32@SERVER,accuracy=float32@SERVER>)'

Não há necessidade de se preocupar com os detalhes neste ponto, apenas esteja ciente de que ele assume a seguinte forma geral, semelhante a tff.templates.IterativeProcess.next mas com duas diferenças importantes. Em primeiro lugar, não estamos retornando o estado do servidor, já que a avaliação não modifica o modelo ou qualquer outro aspecto do estado - você pode pensar nisso como sem estado. Em segundo lugar, a avaliação só precisa do modelo e não exige nenhuma outra parte do estado do servidor que possa estar associada ao treinamento, como variáveis ​​do otimizador.

SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS

Vamos invocar a avaliação do estado mais recente a que chegamos durante o treinamento. Para extrair o último modelo treinado do estado do servidor, você simplesmente acessa o membro .model , como segue.

train_metrics = evaluation(state.model, federated_train_data)

Aqui está o que temos. Observe que os números parecem ligeiramente melhores do que o relatado pela última rodada de treinamento acima. Por convenção, as métricas de treinamento relatadas pelo processo de treinamento iterativo geralmente refletem o desempenho do modelo no início da rodada de treinamento, portanto, as métricas de avaliação estarão sempre um passo à frente.

str(train_metrics)
'<num_examples=4860.0,loss=1.7142657041549683,accuracy=0.38683128356933594>'

Agora, vamos compilar uma amostra de teste de dados federados e executar novamente a avaliação nos dados de teste. Os dados virão da mesma amostra de usuários reais, mas de um conjunto de dados distinto.

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)
'<num_examples=580.0,loss=1.861915111541748,accuracy=0.3362068831920624>'

Isso conclui o tutorial. Nós encorajamos você a brincar com os parâmetros (por exemplo, tamanhos de lote, número de usuários, épocas, taxas de aprendizagem, etc.), para modificar o código acima para simular o treinamento em amostras aleatórias de usuários em cada rodada e explorar os outros tutoriais nós desenvolvemos.