O Dia da Comunidade de ML é dia 9 de novembro! Junte-nos para atualização de TensorFlow, JAX, e mais Saiba mais

Aprendizagem federada para classificação de imagens

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Neste tutorial, vamos usar o exemplo formação MNIST clássico para introduzir a Federated Aprendizagem (FL) camada de API da TFF, tff.learning - um conjunto de interfaces de nível mais alto que pode ser usado para executar tipos comuns de tarefas de aprendizagem federados, como treinamento federado, contra modelos fornecidos pelo usuário implementados no TensorFlow.

Este tutorial e a API Federated Learning destinam-se principalmente a usuários que desejam conectar seus próprios modelos do TensorFlow ao TFF, tratando o último principalmente como uma caixa preta. Para uma compreensão mais aprofundada da TFF e como implementar seus próprios algoritmos de aprendizagem federados, consulte os tutoriais na API FC Núcleo - personalizado Federados Algoritmos Parte 1 e Parte 2 .

Para saber mais sobre tff.learning , continuar com a aprendizagem Federated para Texto Generation , tutorial, que além de cobrir modelos recorrentes, também demonstra o carregamento de um modelo Keras serializados pré-treinado para o refinamento com a aprendizagem federado combinado com avaliação utilizando Keras.

Antes de começarmos

Antes de começar, execute o seguinte para se certificar de que o seu ambiente está configurado corretamente. Se você não vê uma saudação, consulte a instalação guia para obter instruções.

# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
!pip install --quiet --upgrade tb-nightly  # or tensorboard, but not both

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

Preparando os dados de entrada

Vamos começar com os dados. A aprendizagem federada requer um conjunto de dados federados, ou seja, uma coleção de dados de vários usuários. Dados federado é tipicamente não iid , o que representa um conjunto único de desafios.

A fim de facilitar a experimentação, que semearam o repositório TFF com alguns conjuntos de dados, incluindo uma versão federado de MNIST que contém uma versão do conjunto de dados NIST originais que tem sido re-processados usando Folha de modo que os dados são digitados pelo escritor original os dígitos. Como cada gravador tem um estilo único, esse conjunto de dados exibe o tipo de comportamento não-iid esperado dos conjuntos de dados federados.

Veja como podemos carregá-lo.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Os conjuntos de dados retornados pelo load_data() são exemplos de tff.simulation.ClientData , uma interface que permite que você enumerar o conjunto de usuários, para construir um tf.data.Dataset que representa os dados de um usuário em particular, e para consultar o estrutura de elementos individuais. Veja como você pode usar essa interface para explorar o conteúdo do conjunto de dados. Lembre-se de que, embora essa interface permita que você itere sobre os ids dos clientes, esse é apenas um recurso dos dados de simulação. Como você verá em breve, as identidades do cliente não são usadas pela estrutura de aprendizado federado - seu único propósito é permitir que você selecione subconjuntos de dados para simulações.

len(emnist_train.client_ids)
3383
emnist_train.element_type_structure
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
1
from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()

png

Explorando a heterogeneidade em dados federados

Dados Federados é tipicamente não iid , os utilizadores normalmente têm diferentes distribuições de dados dependendo dos padrões de utilização. Alguns clientes podem ter menos exemplos de treinamento no dispositivo, sofrendo de escassez de dados localmente, enquanto alguns clientes terão exemplos de treinamento mais do que suficientes. Vamos explorar esse conceito de heterogeneidade de dados típico de um sistema federado com os dados EMNIST que temos disponíveis. É importante notar que esta análise profunda dos dados de um cliente só está disponível para nós porque este é um ambiente de simulação onde todos os dados estão disponíveis para nós localmente. Em um ambiente federado de produção real, você não seria capaz de inspecionar os dados de um único cliente.

Primeiro, vamos pegar uma amostra dos dados de um cliente para ter uma ideia dos exemplos em um dispositivo simulado. Como o conjunto de dados que estamos usando foi digitado por um redator exclusivo, os dados de um cliente representam a caligrafia de uma pessoa para uma amostra dos dígitos de 0 a 9, simulando o "padrão de uso" exclusivo de um usuário.

## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1

png

Agora vamos visualizar o número de exemplos em cada cliente para cada rótulo de dígito MNIST. No ambiente federado, o número de exemplos em cada cliente pode variar um pouco, dependendo do comportamento do usuário.

# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

png

Agora vamos visualizar a imagem média por cliente para cada rótulo MNIST. Este código produzirá a média de cada valor de pixel para todos os exemplos do usuário para um rótulo. Veremos que a imagem média de um cliente para um dígito será diferente da imagem média de outro cliente para o mesmo dígito, devido ao estilo de caligrafia exclusivo de cada pessoa. Podemos refletir sobre como cada rodada de treinamento local irá empurrar o modelo em uma direção diferente em cada cliente, pois estamos aprendendo com os próprios dados exclusivos do usuário naquela rodada local. Posteriormente no tutorial, veremos como podemos pegar cada atualização do modelo de todos os clientes e agregá-los em nosso novo modelo global, que aprendeu com cada um dos dados exclusivos do nosso cliente.

# Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')

png

png

png

png

png

Os dados do usuário podem ser ruidosos e rotulados de forma não confiável. Por exemplo, observando os dados do Cliente 2 acima, podemos ver que para o rótulo 2, é possível que haja alguns exemplos com rótulos incorretos criando uma imagem média mais barulhenta.

Pré-processamento dos dados de entrada

Como os dados já é uma tf.data.Dataset , pré-processamento pode ser realizada utilizando transformações DataSet. Aqui, nós achatar os 28x28 imagens em 784 matrizes -element, embaralhar os exemplos individuais, organizá-los em lotes, e renomear as características de pixels e label para x e y para uso com Keras. Nós também jogar em uma repeat ao longo do conjunto de dados para executar várias épocas.

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Vamos verificar se isso funcionou.

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [5],
       [7],
       [1],
       [7],
       [7],
       [1],
       [4],
       [7],
       [4],
       [2],
       [2],
       [5],
       [4],
       [1],
       [1],
       [0],
       [0],
       [9]], dtype=int32))])

Temos quase todos os blocos de construção no local para construir conjuntos de dados federados.

Uma das maneiras de feed de dados federados para TFF em uma simulação é simplesmente como uma lista Python, com cada elemento da lista que contém os dados de um usuário individual, seja como uma lista ou como um tf.data.Dataset . Como já temos uma interface que fornece o último, vamos usá-lo.

Aqui está uma função auxiliar simples que construirá uma lista de conjuntos de dados de um determinado conjunto de usuários como uma entrada para uma rodada de treinamento ou avaliação.

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

Agora, como escolhemos os clientes?

Em um cenário de treinamento federado típico, estamos lidando com uma população potencialmente muito grande de dispositivos de usuários, apenas uma fração da qual pode estar disponível para treinamento em um determinado momento. Esse é o caso, por exemplo, quando os dispositivos do cliente são telefones celulares que participam do treinamento apenas quando conectados a uma fonte de alimentação, fora de uma rede medida e, de outra forma, ociosos.

Claro, estamos em um ambiente de simulação e todos os dados estão disponíveis localmente. Normalmente, então, ao executar simulações, simplesmente amostraríamos um subconjunto aleatório dos clientes a serem envolvidos em cada rodada de treinamento, geralmente diferente em cada rodada.

Dito isto, como você pode descobrir ao estudar o papel na Média Federated algoritmo, alcançar uma convergência em um sistema com subconjuntos amostra aleatória de clientes em cada rodada pode demorar um pouco, e seria impraticável ter para executar centenas de rodadas em este tutorial interativo.

Em vez disso, faremos uma amostra do conjunto de clientes uma vez e reutilizaremos o mesmo conjunto em rodadas para acelerar a convergência (sobreajuste intencionalmente para esses poucos dados do usuário). Deixamos como um exercício para o leitor modificar este tutorial para simular a amostragem aleatória - é bastante fácil de fazer (uma vez que você fizer isso, tenha em mente que fazer o modelo convergir pode demorar um pouco).

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

Criação de um modelo com Keras

Se você estiver usando Keras, provavelmente já possui um código que constrói um modelo Keras. Aqui está um exemplo de um modelo simples que será suficiente para nossas necessidades.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

A fim de utilizar qualquer modelo com TFF, ele tem de ser enrolado em uma instância do tff.learning.Model interface, que apresenta métodos para estampar passe para a frente do modelo, as propriedades de metadados, etc, à semelhança do Keras, mas também introduz adicional elementos, como formas de controlar o processo de cálculo de métricas federadas. Não vamos nos preocupar com isso por enquanto; Se você tem um modelo Keras como a que acabou de definir acima, você pode ter TFF envolvê-la para você, invocando tff.learning.from_keras_model , passando do modelo e um lote de dados de exemplo como argumentos, como mostrado abaixo.

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Treinar o modelo em dados federados

Agora que temos um modelo embrulhado como tff.learning.Model para uso com TFF, podemos deixar TFF construir um algoritmo Média Federated invocando a função auxiliar tff.learning.build_federated_averaging_process , como segue.

Tenha em mente que o argumento deve ser um construtor (como model_fn acima), não uma instância já construído, de modo que a construção de seu modelo pode acontecer em um contexto controlado pelo TFF (Se você está curioso sobre as razões para isso, nós encorajamos você a ler o tutorial de acompanhamento sobre algoritmos personalizados ).

Uma nota crítica sobre o algoritmo Média Federated abaixo, existem 2 otimizadores: um otimizador _client e um otimizador _SERVER. O otimizador _client só é usado para calcular as atualizações modelo locais em cada cliente. O otimizador _SERVER aplica a atualização média para o modelo global no servidor. Em particular, isso significa que a escolha do otimizador e da taxa de aprendizado usada pode precisar ser diferente das usadas para treinar o modelo em um conjunto de dados iid padrão. Recomendamos começar com SGD regular, possivelmente com uma taxa de aprendizado menor do que o normal. A taxa de aprendizado que usamos não foi ajustada cuidadosamente, sinta-se à vontade para experimentar.

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

O que acabou de acontecer? TFF foi construído um par de computações federados e embalados-los em uma tff.templates.IterativeProcess em que estes cálculos estão disponíveis como um par de propriedades initialize e next .

Em poucas palavras, cálculos federados são programas em linguagem interna da TFF que podem expressar vários algoritmos federados (você pode encontrar mais sobre isso no algoritmos personalizados tutorial). Neste caso, os dois cálculos gerados e embalado em iterative_process implementar Federated Média .

É um objetivo do TFF definir cálculos de uma forma que eles possam ser executados em configurações de aprendizagem federadas reais, mas atualmente apenas o tempo de execução de simulação de execução local é implementado. Para executar um cálculo em um simulador, basta invocá-lo como uma função Python. Este ambiente interpretado padrão não foi projetado para alto desempenho, mas será suficiente para este tutorial; esperamos fornecer tempos de execução de simulação de alto desempenho para facilitar a pesquisa em larga escala em versões futuras.

Vamos começar com o initialize computação. Como é o caso de todos os cálculos federados, você pode pensar nisso como uma função. O cálculo não leva argumentos e retorna um resultado - a representação do estado do processo de Média Federada no servidor. Embora não desejemos entrar em detalhes sobre a TFF, pode ser instrutivo ver como é esse estado. Você pode visualizá-lo da seguinte maneira.

str(iterative_process.initialize.type_signature)
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

Enquanto a assinatura de tipo acima pode parecer à primeira vista uma enigmática pouco, você pode reconhecer que o estado do servidor consiste em um model (os parâmetros do modelo iniciais para MNIST que serão distribuídas a todos os dispositivos) e optimizer_state (informações adicionais mantido pelo servidor, como o número de rodadas a serem usadas para programações de hiperparâmetros etc.).

Vamos invocar o initialize computação para construir o estado do servidor.

state = iterative_process.initialize()

O segundo do par de cálculos federados, next , representa uma única rodada de Federated Média, que consiste em empurrar o estado do servidor (incluindo os parâmetros do modelo) para os clientes, on-dispositivo de treinamento em seus dados locais, recolha e atualizações modelo média e produzindo um novo modelo atualizado no servidor.

Conceitualmente, você pode pensar next como tendo uma assinatura de tipo funcional que tem a seguinte aparência.

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

Em particular, deve-se pensar em next() não como sendo uma função que é executado em um servidor, mas sim ser uma representação funcional declarativa de toda a computação descentralizada - algumas das entradas são fornecidos pelo servidor ( SERVER_STATE ), mas cada participante dispositivo contribui com seu próprio conjunto de dados local.

Vamos fazer uma única rodada de treinamento e visualizar os resultados. Podemos usar os dados federados que já geramos acima para uma amostra de usuários.

state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12345679), ('loss', 3.1193738)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Vamos fazer mais algumas rodadas. Conforme observado anteriormente, normalmente neste ponto você escolheria um subconjunto de seus dados de simulação de uma nova amostra de usuários selecionada aleatoriamente para cada rodada, a fim de simular uma implantação realista na qual os usuários vêm e vão continuamente, mas neste notebook interativo, para para fins de demonstração, vamos apenas reutilizar os mesmos usuários, para que o sistema converta rapidamente.

NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13518518), ('loss', 2.9834728)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14382716), ('loss', 2.861665)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.17407407), ('loss', 2.7957022)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.19917695), ('loss', 2.6146567)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.21975309), ('loss', 2.529761)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2409465), ('loss', 2.4053504)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2611111), ('loss', 2.315389)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.30823046), ('loss', 2.1240263)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.33312756), ('loss', 2.1164262)])), ('stat', OrderedDict([('num_examples', 4860)]))])

A perda de treinamento está diminuindo após cada rodada de treinamento federado, indicando que o modelo está convergindo. Há algumas ressalvas importantes com essas métricas de treinamento, no entanto, consulte a seção sobre Avaliação mais adiante neste tutorial.

Exibindo métricas de modelo no TensorBoard

A seguir, vamos visualizar as métricas desses cálculos federados usando o Tensorboard.

Vamos começar criando o diretório e o gravador de resumo correspondente para gravar as métricas.

logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

Trace as métricas escalares relevantes com o mesmo redator de resumo.

with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

Inicie o TensorBoard com o diretório de registro raiz especificado acima. O carregamento dos dados pode demorar alguns segundos.

!ls {logdir}
%tensorboard --logdir {logdir} --port=0
events.out.tfevents.1629557449.ebe6e776479e64ea-4903924a278.borgtask.google.com.458912.1.v2
Launching TensorBoard...
Reusing TensorBoard on port 50681 (pid 292785), started 0:30:30 ago. (Use '!kill 292785' to kill it.)
<IPython.core.display.Javascript at 0x7fd6617e02d0>
# Uncomment and run this this cell to clean your directory of old output for
# future graphs from this directory. We don't run it by default so that if 
# you do a "Runtime > Run all" you don't lose your results.

# !rm -R /tmp/logs/scalars/*

Para visualizar as métricas de avaliação da mesma maneira, você pode criar uma pasta eval separada, como "logs / scalars / eval", para gravar no TensorBoard.

Customizando a implementação do modelo

Keras é o recomendado de alto nível do modelo API para TensorFlow , e nós encorajamos usando modelos Keras (via tff.learning.from_keras_model ) em TFF sempre que possível.

No entanto, tff.learning fornece uma interface de modelo de nível inferior, tff.learning.Model , que expõe a funcionalidade mínima necessária para a utilização de um modelo para a aprendizagem federado. Diretamente implementar essa interface (possivelmente ainda usando blocos de construção como tf.keras.layers ) permite a personalização máxima, sem modificar os internos dos algoritmos de aprendizagem federados.

Então, vamos fazer tudo de novo do zero.

Definição de variáveis ​​de modelo, passagem para frente e métricas

A primeira etapa é identificar as variáveis ​​do TensorFlow com as quais trabalharemos. Para tornar o código a seguir mais legível, vamos definir uma estrutura de dados para representar todo o conjunto. Isto incluirá variáveis tais como weights e bias que vamos treinar, bem como as variáveis que irá realizar várias estatísticas acumuladas e contadores vamos atualizar durante o treinamento, como loss_sum , accuracy_sum e num_examples .

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

Aqui está um método que cria as variáveis. Por uma questão de simplicidade, que representam todas as estatísticas como tf.float32 , como que vai eliminar a necessidade de conversões de tipo, numa fase posterior. Envolvendo initializers variáveis como lambdas é uma exigência imposta por variáveis de recursos .

def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

Com as variáveis ​​para os parâmetros do modelo e estatísticas cumulativas no lugar, podemos agora definir o método forward pass que calcula a perda, emite previsões e atualiza as estatísticas cumulativas para um único lote de dados de entrada, como segue.

def predict_on_batch(variables, x):
  return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)

def mnist_forward_pass(variables, batch):
  y = predict_on_batch(variables, batch['x'])
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

Em seguida, definimos uma função que retorna um conjunto de métricas locais, novamente usando o TensorFlow. Esses são os valores (além das atualizações do modelo, que são tratadas automaticamente) que são elegíveis para serem agregados ao servidor em um processo de aprendizagem ou avaliação federado.

Aqui, nós simplesmente retornar a média loss e accuracy , bem como os num_examples , que vamos precisar de peso corretamente as contribuições de diferentes usuários ao calcular agregados federados.

def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

Finalmente, precisamos determinar como agregar as métricas locais emitidos por cada dispositivo via get_local_mnist_metrics . Esta é a única parte do código que não está escrito em TensorFlow - é um cálculo federado expressa em TFF. Se você gostaria de cavar mais fundo, roçar sobre o costume algoritmos tutorial, mas na maioria das aplicações, você não vai realmente precisa; as variantes do padrão mostrado abaixo devem ser suficientes. Esta é a aparência:

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

A entrada metrics corresponde argumento para o OrderedDict retornado por get_local_mnist_metrics acima, mas criticamente os valores não são mais tf.Tensors - eles são "em caixa", como tff.Value s, para deixar claro que você não pode manipulá-los usando TensorFlow, mas apenas usando operadores federados da TFF como tff.federated_mean e tff.federated_sum . O dicionário retornado de agregados globais define o conjunto de métricas que estará disponível no servidor.

A construção de um exemplo de tff.learning.Model

Com todos os itens acima em vigor, estamos prontos para construir uma representação de modelo para uso com TFF semelhante àquela que é gerada para você quando você permite que a TFF ingira um modelo Keras.

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def predict_on_batch(self, x, training=True):
    del training
    return predict_on_batch(self._variables, x)

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

Como você pode ver, os métodos abstratos e propriedades definidas por tff.learning.Model corresponde aos trechos de código na seção anterior, que introduziu as variáveis e definiu a perda e estatísticas.

Aqui estão alguns pontos que valem a pena destacar:

  • Todos afirmam que o modelo usará devem ser capturados como variáveis TensorFlow, como TFF não usa Python em tempo de execução (lembre-se o seu código deve ser escrito de tal forma que ele pode ser implantado para dispositivos móveis, ver o costume algoritmos tutorial para uma mais aprofundada comentário sobre as razões).
  • Seu modelo deve descrever que tipo de dados que aceita ( input_spec ), como em geral, TFF é um ambiente fortemente tipado e quer determinar assinaturas de tipo para todos os componentes. Declarar o formato da entrada do seu modelo é uma parte essencial dele.
  • Embora tecnicamente não seja obrigatório, é recomendado para embrulhar toda a lógica TensorFlow (passe para frente, os cálculos de métricas, etc.) como tf.function s, pois isso ajuda a garantir a TensorFlow pode ser serializado, e elimina a necessidade de dependências de controle explícitas.

O acima é suficiente para avaliação e algoritmos como Federated SGD. No entanto, para Federated Averaging, precisamos especificar como o modelo deve ser treinado localmente em cada lote. Especificaremos um otimizador local ao construir o algoritmo de Média Federada.

Simulando o treinamento federado com o novo modelo

Com tudo o que foi mencionado acima, o restante do processo se parece com o que já vimos - basta substituir o construtor do modelo pelo construtor da nossa nova classe de modelo e usar os dois cálculos federados no processo iterativo que você criou para percorrer rodadas de treinamento.

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.0708053), ('accuracy', 0.12777779)])), ('stat', OrderedDict([('num_examples', 4860)]))])
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.011699), ('accuracy', 0.13024691)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.7408307), ('accuracy', 0.15576132)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.6761012), ('accuracy', 0.17921811)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.675567), ('accuracy', 0.1855967)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.5664043), ('accuracy', 0.20329218)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.4179392), ('accuracy', 0.24382716)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.3237286), ('accuracy', 0.26687244)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.1861682), ('accuracy', 0.28209877)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.046388), ('accuracy', 0.32037038)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Para ver essas métricas no TensorBoard, consulte as etapas listadas acima em "Exibindo métricas do modelo no TensorBoard".

Avaliação

Todos os nossos experimentos até agora apresentaram apenas métricas de treinamento federado - as métricas médias de todos os lotes de dados treinados em todos os clientes na rodada. Isso apresenta as preocupações normais sobre overfitting, especialmente porque usamos o mesmo conjunto de clientes em cada rodada para simplificar, mas há uma noção adicional de overfitting em métricas de treinamento específicas para o algoritmo de Média Federada. Isso é mais fácil de ver se imaginarmos que cada cliente tinha um único lote de dados e treinarmos nesse lote por várias iterações (épocas). Nesse caso, o modelo local se ajustará com rapidez e exatidão àquele lote e, portanto, a métrica de precisão local que calculamos se aproximará de 1,0. Assim, essas métricas de treinamento podem ser tomadas como um sinal de que o treinamento está progredindo, mas não muito mais.

Para realizar a avaliação em dados federados, você pode construir uma outra computação federada projetada apenas para este fim, usando o tff.learning.build_federated_evaluation função, e passando em seu construtor modelo como um argumento. Note-se que ao contrário de Federated Média, onde usamos MnistTrainableModel , basta passar o MnistModel . A avaliação não executa a descida do gradiente e não há necessidade de construir otimizadores.

Para a experimentação e pesquisa, quando um conjunto de dados de teste centralizado está disponível, Federated Aprendizagem para Texto Generation demonstra uma outra opção de avaliação: levando os pesos treinados de aprendizagem federado, aplicando-os a um modelo Keras padrão, e em seguida, simplesmente chamando tf.keras.models.Model.evaluate() em um conjunto de dados centralizado.

evaluation = tff.learning.build_federated_evaluation(MnistModel)

Você pode inspecionar a assinatura do tipo abstrato da função de avaliação da seguinte maneira.

str(evaluation.type_signature)
'(<server_model_weights=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,federated_dataset={<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <eval=<num_examples=float32,loss=float32,accuracy=float32>,stat=<num_examples=int64>>@SERVER)'

Não há necessidade de se preocupar com os detalhes neste momento, basta estar ciente de que ele toma a seguinte forma geral, semelhante ao tff.templates.IterativeProcess.next mas com duas diferenças importantes. Primeiro, não estamos retornando o estado do servidor, já que a avaliação não modifica o modelo ou qualquer outro aspecto do estado - você pode considerá-lo sem estado. Em segundo lugar, a avaliação só precisa do modelo e não requer nenhuma outra parte do estado do servidor que possa estar associada ao treinamento, como variáveis ​​do otimizador.

SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS

Vamos invocar a avaliação do estado mais recente a que chegamos durante o treinamento. Para extrair o modelo mais recente treinados desde o estado do servidor, basta acessar o .model membro, como se segue.

train_metrics = evaluation(state.model, federated_train_data)

Aqui está o que temos. Observe que os números parecem ligeiramente melhores do que o relatado pela última rodada de treinamento acima. Por convenção, as métricas de treinamento relatadas pelo processo de treinamento iterativo geralmente refletem o desempenho do modelo no início da rodada de treinamento, portanto, as métricas de avaliação estarão sempre um passo à frente.

str(train_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 4860.0), ('loss', 1.7510437), ('accuracy', 0.2788066)])), ('stat', OrderedDict([('num_examples', 4860)]))])"

Agora, vamos compilar uma amostra de teste de dados federados e executar novamente a avaliação nos dados de teste. Os dados virão da mesma amostra de usuários reais, mas de um conjunto de dados distinto.

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 580.0), ('loss', 1.8361608), ('accuracy', 0.2413793)])), ('stat', OrderedDict([('num_examples', 580)]))])"

Isso conclui o tutorial. Nós encorajamos você a brincar com os parâmetros (por exemplo, tamanhos de lote, número de usuários, épocas, taxas de aprendizagem, etc.), para modificar o código acima para simular o treinamento em amostras aleatórias de usuários em cada rodada e explorar os outros tutoriais nós desenvolvemos.