Esta página foi traduzida pela API Cloud Translation.
Switch to English

Aprendizagem federada para classificação de imagens

Ver em TensorFlow.org Executar no Google Colab Ver fonte no GitHub

Neste tutorial, usamos o exemplo de treinamento MNIST clássico para apresentar a camada de API Federated Learning (FL) do TFF, tff.learning - um conjunto de interfaces de nível superior que pode ser usado para realizar tipos comuns de tarefas de aprendizado federado, como treinamento federado, contra modelos fornecidos pelo usuário implementados no TensorFlow.

Este tutorial e a API Federated Learning destinam-se principalmente a usuários que desejam conectar seus próprios modelos do TensorFlow ao TFF, tratando o último principalmente como uma caixa preta. Para uma compreensão mais aprofundada do TFF e como implementar seus próprios algoritmos de aprendizado federado, consulte os tutoriais na API FC Core - Algoritmos Federados Customizados Parte 1 e Parte 2 .

Para obter mais informações sobre tff.learning , continue com o Aprendizado federado para geração de texto , tutorial que, além de cobrir modelos recorrentes, também demonstra o carregamento de um modelo Keras serializado pré-treinado para refinamento com aprendizado federado combinado com avaliação usando Keras.

Antes de começarmos

Antes de começarmos, execute o seguinte para garantir que seu ambiente esteja configurado corretamente. Se você não receber uma saudação, consulte o guia de instalação para obter instruções.

 
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

%load_ext tensorboard
 
 import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
 
b'Hello, World!'

Preparando os dados de entrada

Vamos começar com os dados. A aprendizagem federada requer um conjunto de dados federados, ou seja, uma coleção de dados de vários usuários. Os dados federados geralmente são não- iid , o que representa um conjunto único de desafios.

Para facilitar a experimentação, semeamos o repositório TFF com alguns conjuntos de dados, incluindo uma versão federada do MNIST que contém uma versão do conjunto de dados NIST original que foi reprocessada usando Leaf, para que os dados sejam digitados pelo escritor original do os dígitos. Como cada gravador tem um estilo único, esse conjunto de dados exibe o tipo de comportamento não-iid esperado dos conjuntos de dados federados.

Veja como podemos carregá-lo.

 emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
 

Os conjuntos de dados retornados por load_data() são instâncias de tff.simulation.ClientData , uma interface que permite enumerar o conjunto de usuários, construir um tf.data.Dataset que representa os dados de um determinado usuário e consultar o estrutura de elementos individuais. Veja como você pode usar essa interface para explorar o conteúdo do conjunto de dados. Lembre-se de que, embora essa interface permita que você itere sobre os ids dos clientes, esse é apenas um recurso dos dados de simulação. Como você verá em breve, as identidades do cliente não são usadas pela estrutura de aprendizado federado - seu único propósito é permitir que você selecione subconjuntos de dados para simulações.

 len(emnist_train.client_ids)
 
3383
 emnist_train.element_type_structure
 
OrderedDict([('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None)), ('label', TensorSpec(shape=(), dtype=tf.int32, name=None))])
 example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
 
1
 from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()
 

png

Explorando a heterogeneidade em dados federados

Os dados federados geralmente são não- iid , os usuários geralmente têm diferentes distribuições de dados, dependendo dos padrões de uso. Alguns clientes podem ter menos exemplos de treinamento no dispositivo, sofrendo de escassez de dados localmente, enquanto alguns clientes terão exemplos de treinamento mais do que suficientes. Vamos explorar esse conceito de heterogeneidade de dados típico de um sistema federado com os dados EMNIST que temos disponíveis. É importante observar que essa análise profunda dos dados de um cliente está disponível apenas para nós, porque este é um ambiente de simulação em que todos os dados estão disponíveis localmente. Em um ambiente federado de produção real, você não seria capaz de inspecionar os dados de um único cliente.

Primeiro, vamos pegar uma amostra dos dados de um cliente para ter uma ideia dos exemplos em um dispositivo simulado. Como o conjunto de dados que estamos usando foi codificado por um gravador exclusivo, os dados de um cliente representam a letra de uma pessoa para uma amostra dos dígitos de 0 a 9, simulando o "padrão de uso" exclusivo de um usuário.

 ## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1
 

png

Agora vamos visualizar o número de exemplos em cada cliente para cada rótulo de dígito MNIST. No ambiente federado, o número de exemplos em cada cliente pode variar bastante, dependendo do comportamento do usuário.

 # Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
 

png

Agora vamos visualizar a imagem média por cliente para cada rótulo MNIST. Esse código produzirá a média de cada valor de pixel para todos os exemplos do usuário para um rótulo. Veremos que a imagem média de um cliente para um dígito será diferente da imagem média de outro cliente para o mesmo dígito, devido ao estilo de caligrafia exclusivo de cada pessoa. Podemos refletir sobre como cada rodada de treinamento local moverá o modelo em uma direção diferente para cada cliente, pois estamos aprendendo com os dados exclusivos do usuário nessa rodada local. Posteriormente no tutorial, veremos como podemos levar cada atualização para o modelo de todos os clientes e agregá-las em nosso novo modelo global, que foi aprendido com os dados exclusivos de cada cliente.

 # Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')
 

png

png

png

png

png

Os dados do usuário podem ser barulhentos e rotulados de maneira não confiável. Por exemplo, observando os dados do cliente nº 2 acima, podemos ver que, para o rótulo 2, é possível que tenha havido alguns exemplos incorretos que criam uma imagem média mais ruidosa.

Pré-processamento dos Dados de Entrada

Como os dados já são um tf.data.Dataset , o pré-processamento pode ser realizado usando transformações de Dataset. Aqui, nós achatar os 28x28 imagens em 784 matrizes -element, embaralhar os exemplos individuais, organizá-los em lotes, e renomeia as características de pixels e label para x e y para uso com Keras. Também adicionamos uma repeat do conjunto de dados para executar várias épocas.

 NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER= 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)
 

Vamos verificar isso funcionou.

 preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
 
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [2],
       [3],
       [6],
       [0],
       [1],
       [4],
       [1],
       [0],
       [6],
       [9],
       [9],
       [3],
       [6],
       [1],
       [4],
       [8],
       [0],
       [2]], dtype=int32))])

Temos quase todos os blocos de construção no local para construir conjuntos de dados federados.

Uma das maneiras de alimentar dados federados para TFF em uma simulação é simplesmente como uma lista Python, com cada elemento da lista contendo os dados de um usuário individual, seja como uma lista ou como um tf.data.Dataset . Como já temos uma interface que fornece a última, vamos usá-la.

Aqui está uma função auxiliar simples que criará uma lista de conjuntos de dados do conjunto de usuários fornecido como uma entrada para uma rodada de treinamento ou avaliação.

 def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]
 

Agora, como escolhemos clientes?

Em um cenário típico de treinamento federado, estamos lidando com uma população potencialmente muito grande de dispositivos de usuário, dos quais apenas uma fração pode estar disponível para treinamento em um determinado momento. É o caso, por exemplo, quando os dispositivos clientes são telefones celulares que participam do treinamento somente quando conectados a uma fonte de energia, a uma rede medida e inativos.

Obviamente, estamos em um ambiente de simulação e todos os dados estão disponíveis localmente. Normalmente, ao executar simulações, simplesmente amostramos um subconjunto aleatório dos clientes a serem envolvidos em cada rodada de treinamento, geralmente diferentes em cada rodada.

Dito isso, como você pode descobrir estudando o artigo sobre o algoritmo da Média Federada , alcançar a convergência em um sistema com subconjuntos de clientes amostrados aleatoriamente em cada rodada pode demorar um pouco e seria impraticável ter que executar centenas de rodadas em este tutorial interativo.

Em vez disso, faremos uma amostra do conjunto de clientes uma vez e reutilizaremos o mesmo conjunto durante as rodadas para acelerar a convergência (intencionalmente se ajustando excessivamente a esses poucos dados do usuário). Deixamos como um exercício para o leitor modificar este tutorial para simular amostragem aleatória - é bastante fácil de fazer (depois disso, lembre-se de que a convergência do modelo pode demorar um pouco).

 sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
 
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

Criando um modelo com Keras

Se você estiver usando o Keras, provavelmente já possui um código que constrói um modelo Keras. Aqui está um exemplo de um modelo simples que será suficiente para nossas necessidades.

 def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])
 

Para usar qualquer modelo com TFF, ele precisa ser tff.learning.Model em uma instância da interface tff.learning.Model , que expõe métodos para carimbar a passagem de avanço do modelo, propriedades de metadados, etc., de forma semelhante a Keras, mas também apresenta outros elementos, como maneiras de controlar o processo de computação de métricas federadas. Não vamos nos preocupar com isso por enquanto; se você tiver um modelo Keras como o que acabamos de definir acima, poderá tff.learning.from_keras_model lo ao tff.learning.from_keras_model invocando tff.learning.from_keras_model , passando o modelo e um lote de dados de amostra como argumentos, conforme mostrado abaixo.

 def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
 

Treinando o modelo em dados federados

Agora que temos um modelo empacotado como tff.learning.Model para uso com TFF, podemos permitir que TFF construa um algoritmo de Média Federada invocando a função auxiliar tff.learning.build_federated_averaging_process , como segue.

Lembre-se de que o argumento precisa ser um construtor (como model_fn acima), não uma instância já construída, para que a construção do seu modelo possa ocorrer em um contexto controlado pelo TFF (se você tiver curiosidade sobre os motivos de isso, recomendamos que você leia o tutorial de acompanhamento sobre algoritmos personalizados ).

Uma observação crítica sobre o algoritmo de Média Federada abaixo, existem 2 otimizadores: um otimizador _client e um otimizador _server. O otimizador _client é usado apenas para calcular atualizações de modelos locais em cada cliente. O otimizador _server aplica a atualização média ao modelo global no servidor. Em particular, isso significa que a escolha do otimizador e da taxa de aprendizado usada pode precisar ser diferente das usadas para treinar o modelo em um conjunto de dados iid padrão. Recomendamos começar com o SGD regular, possivelmente com uma taxa de aprendizado menor que o normal. A taxa de aprendizado que usamos não foi ajustada cuidadosamente, sinta-se à vontade para experimentar.

 iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
 

O que acabou de acontecer? O TFF construiu um par de cálculos federados e os empacotou em um tff.templates.IterativeProcess no qual esses cálculos estão disponíveis como um par de propriedades initialize e next .

Em poucas palavras, cálculos federados são programas na linguagem interna do TFF que podem expressar vários algoritmos federados (você pode encontrar mais sobre isso no tutorial de algoritmos personalizados ). Nesse caso, os dois cálculos gerados e compactados em iterative_process implementam a média federada .

É um objetivo do TFF definir cálculos de forma que eles possam ser executados em configurações de aprendizagem federadas reais, mas atualmente apenas o tempo de execução de simulação de execução local é implementado. Para executar uma computação em um simulador, você simplesmente a invoca como uma função Python. Este ambiente interpretado padrão não foi projetado para alto desempenho, mas será suficiente para este tutorial; esperamos fornecer tempos de execução de simulação de alto desempenho para facilitar pesquisas em larga escala em versões futuras.

Vamos começar com a computação de initialize . Como é o caso de todos os cálculos federados, você pode pensar nisso como uma função. O cálculo não leva argumentos e retorna um resultado - a representação do estado do processo de Média Federada no servidor. Embora não desejemos entrar em detalhes sobre a TFF, pode ser instrutivo ver como é esse estado. Você pode visualizá-lo da seguinte maneira.

 str(iterative_process.initialize.type_signature)
 
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<>,model_broadcast_state=<>>@SERVER)'

Embora a assinatura do tipo acima possa parecer um pouco enigmática, é possível reconhecer que o estado do servidor consiste em um model (os parâmetros iniciais do modelo para o MNIST que serão distribuídos para todos os dispositivos) e optimizer_state (informações adicionais mantidas pelo servidor, como o número de rodadas a serem usadas para programações de hiperparâmetros etc.).

Vamos invocar a computação de initialize para construir o estado do servidor.

 state = iterative_process.initialize()
 

O segundo do par de cálculos federados, a next , representa uma única rodada de Média Federada, que consiste em enviar o estado do servidor (incluindo os parâmetros do modelo) para os clientes, treinamento no dispositivo em seus dados locais, coleta e média de atualizações do modelo e produzindo um novo modelo atualizado no servidor.

Conceitualmente, você pode pensar em next como tendo uma assinatura de tipo funcional com a seguinte aparência.

 SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS
 

Em particular, deve-se pensar em next() não como uma função que roda em um servidor, mas sim como uma representação funcional declarativa de toda a computação descentralizada - algumas das entradas são fornecidas pelo servidor ( SERVER_STATE ), mas cada participante O dispositivo contribui com seu próprio conjunto de dados local.

Vamos fazer uma única rodada de treinamento e visualizar os resultados. Podemos usar os dados federados que já geramos acima para uma amostra de usuários.

 state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
 
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12037037312984467,loss=3.0108425617218018>>

Vamos fazer mais algumas rodadas. Como observado anteriormente, normalmente nesse ponto você seleciona um subconjunto de seus dados de simulação de uma nova amostra aleatória de usuários para cada rodada, a fim de simular uma implantação realista na qual os usuários vão e vêm continuamente, mas neste bloco de anotações interativo, para para demonstração, apenas reutilizaremos os mesmos usuários, para que o sistema converja rapidamente.

 NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
 
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14814814925193787,loss=2.8865506649017334>>
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.148765429854393,loss=2.9079062938690186>>
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.17633745074272156,loss=2.724686622619629>>
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.20226337015628815,loss=2.6334855556488037>>
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.22427983582019806,loss=2.5482592582702637>>
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.24094650149345398,loss=2.4472343921661377>>
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.259876549243927,loss=2.3809611797332764>>
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.29814815521240234,loss=2.156442403793335>>
round 10, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.31687241792678833,loss=2.122845411300659>>

A perda de treinamento está diminuindo após cada rodada de treinamento federado, indicando que o modelo está convergindo. Existem algumas advertências importantes com essas métricas de treinamento; no entanto, consulte a seção Avaliação posteriormente neste tutorial.

Exibindo métricas de modelo no TensorBoard

A seguir, vamos visualizar as métricas dessas computações federadas usando o Tensorboard.

Vamos começar criando o diretório e o gravador de resumo correspondente para gravar as métricas.

 
logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()
 

Plote as métricas escalares relevantes com o mesmo gravador de resumo.

 
with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics.train._asdict().items():
      tf.summary.scalar(name, value, step=round_num)
 

Inicie o TensorBoard com o diretório de log raiz especificado acima. O carregamento dos dados pode demorar alguns segundos.

 
%tensorboard --logdir /tmp/logs/scalars/ --port=0
 
 
# Run this this cell to clean your directory of old output for future graphs from this directory.
!rm -R /tmp/logs/scalars/*
 

Para visualizar as métricas de avaliação da mesma maneira, você pode criar uma pasta de avaliação separada, como "logs / scalars / eval", para gravar no TensorBoard.

Customizando a Implementação do Modelo

Keras é a API de modelo de alto nível recomendada para TensorFlow , e recomendamos o uso de modelos Keras (via tff.learning.from_keras_model ) no TFF sempre que possível.

No entanto, o tff.learning fornece uma interface de modelo de nível inferior, tff.learning.Model , que expõe a funcionalidade mínima necessária para o uso de um modelo para aprendizado federado. A implementação direta dessa interface (possivelmente ainda usando blocos de construção como tf.keras.layers ) permite a personalização máxima sem modificar os elementos internos dos algoritmos de aprendizado federado.

Então, vamos fazer tudo de novo do zero.

Definindo variáveis ​​de modelo, encaminhamento e métricas

A primeira etapa é identificar as variáveis ​​do TensorFlow com as quais trabalharemos. Para tornar o código a seguir mais legível, vamos definir uma estrutura de dados para representar o conjunto inteiro. Isto incluirá variáveis tais como weights e bias que vamos treinar, bem como as variáveis que irá realizar várias estatísticas acumuladas e contadores vamos atualizar durante o treinamento, como loss_sum , accuracy_sum e num_examples .

 MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
 

Aqui está um método que cria as variáveis. Para fins de simplicidade, representamos todas as estatísticas como tf.float32 , pois isso eliminará a necessidade de conversões de tipo em um estágio posterior. Encapsular inicializadores de variáveis ​​como lambdas é um requisito imposto pelas variáveis ​​de recursos .

 def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))
 

Com as variáveis ​​para parâmetros de modelo e estatísticas cumulativas em vigor, agora podemos definir o método de passagem direta que calcula a perda, emite previsões e atualiza as estatísticas cumulativas para um único lote de dados de entrada, como segue.

 def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions
 

Em seguida, definimos uma função que retorna um conjunto de métricas locais, novamente usando o TensorFlow. Esses são os valores (além das atualizações do modelo, manipulados automaticamente) que são elegíveis para serem agregados ao servidor em um processo federado de aprendizado ou avaliação.

Aqui, simplesmente retornamos a loss e a accuracy médias, bem como num_examples , que precisaremos para num_examples corretamente as contribuições de diferentes usuários ao calcular agregados federados.

 def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)
 

Finalmente, precisamos determinar como agregar as métricas locais emitidas por cada dispositivo via get_local_mnist_metrics . Esta é a única parte do código que não foi escrita no TensorFlow - é uma computação federada expressa em TFF. Se você quiser se aprofundar, dê uma olhada no tutorial de algoritmos personalizados , mas na maioria dos aplicativos, você realmente não precisa; variantes do padrão mostrado abaixo devem ser suficientes. Esta é a aparência:

 @tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))
  
 

O argumento input metrics corresponde ao OrderedDict retornado por get_local_mnist_metrics acima, mas criticamente os valores não são mais tf.Tensors - eles são "encaixotados" como tff.Value s, para deixar claro que você não pode mais manipulá-los usando TensorFlow, mas apenas usando os operadores federados da TFF como tff.federated_mean e tff.federated_sum . O dicionário retornado de agregados globais define o conjunto de métricas que estará disponível no servidor.

Construindo uma instância de tff.learning.Model

Com todas as opções acima, estamos prontos para construir uma representação de modelo para uso com o TFF semelhante ao que é gerado para você quando você permite que o TFF ingira um modelo Keras.

 class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients
 

Como você pode ver, os métodos e propriedades abstratos definidos por tff.learning.Model correspondem aos fragmentos de código na seção anterior que apresentou as variáveis ​​e definiu a perda e as estatísticas.

Aqui estão alguns pontos que valem a pena destacar:

  • Todos os estados que seu modelo usará devem ser capturados como variáveis ​​TensorFlow, pois o TFF não usa Python em tempo de execução (lembre-se de que seu código deve ser escrito para que possa ser implantado em dispositivos móveis; consulte o tutorial de algoritmos personalizados para obter mais detalhes comentário sobre as razões).
  • Seu modelo deve descrever que forma de dados ele aceita ( input_spec ), como em geral, TFF é um ambiente fortemente tipado e deseja determinar assinaturas de tipo para todos os componentes. Declarar o formato da entrada do seu modelo é uma parte essencial.
  • Embora tecnicamente não seja necessário, recomendamos agrupar toda a lógica do TensorFlow (passagem de avanço, cálculos de métrica etc.) como tf.function s, pois isso ajuda a garantir que o TensorFlow possa ser serializado e elimina a necessidade de dependências de controle explícitas.

O acima é suficiente para avaliação e algoritmos como Federated SGD. No entanto, para a média federada, precisamos especificar como o modelo deve ser treinado localmente em cada lote. Especificaremos um otimizador local ao construir o algoritmo de Média Federada.

Simulando o treinamento federado com o novo modelo

Com tudo isso, o restante do processo se parece com o que já vimos - basta substituir o construtor de modelo pelo construtor de nossa nova classe de modelo e usar os dois cálculos federados no processo iterativo que você criou para percorrer rodadas de treinamento.

 iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
 
 state = iterative_process.initialize()
 
 state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
 
round  1, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.9713594913482666,accuracy=0.13518518209457397>>

 for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
 
round  2, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.975412607192993,accuracy=0.14032921195030212>>
round  3, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.9395227432250977,accuracy=0.1594650149345398>>
round  4, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.710164785385132,accuracy=0.17139917612075806>>
round  5, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.5891618728637695,accuracy=0.20267489552497864>>
round  6, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.5148487091064453,accuracy=0.21666666865348816>>
round  7, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.2816808223724365,accuracy=0.2580246925354004>>
round  8, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.3656885623931885,accuracy=0.25884774327278137>>
round  9, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=2.23549222946167,accuracy=0.28477364778518677>>
round 10, metrics=<broadcast=<>,aggregation=<>,train=<num_examples=4860.0,loss=1.974222183227539,accuracy=0.35329216718673706>>

Para ver essas métricas no TensorBoard, consulte as etapas listadas acima em "Exibindo métricas do modelo no TensorBoard".

Avaliação

Até o momento, todas as nossas experiências apresentaram apenas métricas federadas de treinamento - as métricas médias de todos os lotes de dados treinados em todos os clientes da rodada. Isso introduz as preocupações normais sobre o ajuste excessivo, especialmente porque usamos o mesmo conjunto de clientes em cada rodada para simplificar, mas existe uma noção adicional de ajuste excessivo nas métricas de treinamento específicas do algoritmo de Média Federada. Isso é mais fácil de ver se imaginarmos que cada cliente tinha um único lote de dados e treinarmos nesse lote por muitas iterações (épocas). Nesse caso, o modelo local se ajustará com rapidez e exatidão àquele lote e, portanto, a métrica de precisão local que calculamos se aproximará de 1,0. Portanto, essas métricas de treinamento podem ser tomadas como um sinal de que o treinamento está progredindo, mas não muito mais.

Para executar a avaliação em dados federados, é possível construir outra computação federada projetada exatamente para esse propósito, usando a função tff.learning.build_federated_evaluation e passando seu construtor de modelo como argumento. Observe que, ao contrário do Federated Averaging, onde usamos MnistTrainableModel , é suficiente passar o MnistModel . A avaliação não executa a descida do gradiente e não há necessidade de construir otimizadores.

Para experimentação e pesquisa, quando um conjunto de dados de teste centralizado está disponível, o Federated Learning for Text Generation demonstra outra opção de avaliação: pegar os pesos treinados do federated learning, aplicá-los a um modelo Keras padrão e, em seguida, simplesmente chamar tf.keras.models.Model.evaluate() em um conjunto de dados centralizado.

 evaluation = tff.learning.build_federated_evaluation(MnistModel)
 

Você pode inspecionar a assinatura do tipo abstrato da função de avaliação da seguinte forma.

 str(evaluation.type_signature)
 
'(<<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,{<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <num_examples=float32@SERVER,loss=float32@SERVER,accuracy=float32@SERVER>)'

Não há necessidade de se preocupar com os detalhes neste momento; lembre-se de que ele assume a seguinte forma geral, semelhante a tff.templates.IterativeProcess.next mas com duas diferenças importantes. Primeiro, não estamos retornando o estado do servidor, uma vez que a avaliação não modifica o modelo ou qualquer outro aspecto do estado - você pode pensar nisso como sem estado. Em segundo lugar, a avaliação só precisa do modelo e não exige nenhuma outra parte do estado do servidor que possa estar associada ao treinamento, como variáveis ​​do otimizador.

 SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS
 

Vamos invocar a avaliação do estado mais recente ao qual chegamos durante o treinamento. Para extrair o modelo treinado mais recente do estado do servidor, basta acessar o membro .model , da seguinte maneira.

 train_metrics = evaluation(state.model, federated_train_data)
 

Aqui está o que temos. Observe que os números parecem marginalmente melhores do que o relatado na última rodada de treinamento acima. Por convenção, as métricas de treinamento relatadas pelo processo de treinamento iterativo geralmente refletem o desempenho do modelo no início da rodada de treinamento, portanto, as métricas de avaliação sempre estarão um passo à frente.

 str(train_metrics)
 
'<num_examples=4860.0,loss=1.7142657041549683,accuracy=0.38683128356933594>'

Agora, vamos compilar uma amostra de teste de dados federados e executar novamente a avaliação nos dados de teste. Os dados virão da mesma amostra de usuários reais, mas de um conjunto distinto de dados retidos.

 federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
 
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
 test_metrics = evaluation(state.model, federated_test_data)
 
 str(test_metrics)
 
'<num_examples=580.0,loss=1.861915111541748,accuracy=0.3362068831920624>'

Isso conclui o tutorial. Nós encorajamos você a brincar com os parâmetros (por exemplo, tamanhos de lote, número de usuários, épocas, taxas de aprendizagem, etc.), para modificar o código acima para simular o treinamento em amostras aleatórias de usuários em cada rodada e explorar os outros tutoriais nós desenvolvemos.