Google I/O is a wrap! Catch up on TensorFlow sessions View sessions

Construindo Seu Próprio Algoritmo de Aprendizagem Federada

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Antes de começarmos

Antes de começar, execute o seguinte para se certificar de que seu ambiente está configurado corretamente. Se você não vê uma saudação, consulte a instalação guia para obter instruções.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
import tensorflow as tf
import tensorflow_federated as tff

Nos classificação de imagens e texto de geração de tutoriais, aprendemos como configurar modelo de dados e condutas para Federated Aprendizagem (FL), e realizou treinamento federado via tff.learning camada de API da TFF.

Esta é apenas a ponta do iceberg quando se trata de pesquisa na FL. Neste tutorial, vamos discutir como implementar algoritmos de aprendizagem federados sem adiando para o tff.learning API. Nosso objetivo é realizar o seguinte:

Metas:

  • Compreender a estrutura geral dos algoritmos de aprendizagem federada.
  • Explore a Federated Núcleo da TFF.
  • Use o Federated Core para implementar a Média Federada diretamente.

Embora este tutorial é auto-suficiente, recomendamos a primeira leitura de classificação de imagens e geração de texto tutoriais.

Preparando os dados de entrada

Primeiro carregamos e pré-processamos o conjunto de dados EMNIST incluído no TFF. Para mais detalhes, consulte a classificação de imagens tutorial.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Para alimentar o conjunto de dados em nosso modelo, achatar os dados, e converter cada exemplo em uma tupla da forma (flattened_image_vector, label) .

NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch of EMNIST data and return a (features, label) tuple."""
    return (tf.reshape(element['pixels'], [-1, 784]), 
            tf.reshape(element['label'], [-1, 1]))

  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

Agora selecionamos um pequeno número de clientes e aplicamos o pré-processamento acima a seus conjuntos de dados.

client_ids = sorted(emnist_train.client_ids)[:NUM_CLIENTS]
federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]

Preparando o modelo

Nós usamos o mesmo modelo como na classificação de imagens tutorial. Este modelo (implementado através tf.keras ) tem uma única camada oculta, seguindo-se uma camada Softmax.

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])

Para utilizar este modelo em TFF, nós envolvemos o modelo Keras como um tff.learning.Model . Isso nos permite executar o modelo de passe para frente dentro de TFF, e saídas extrato modelo . Para mais detalhes, veja também a classificação de imagens tutorial.

def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Enquanto usamos tf.keras para criar uma tff.learning.Model , TFF suporta modelos muito mais gerais. Esses modelos têm os seguintes atributos relevantes que capturam os pesos do modelo:

  • trainable_variables : Uma iteráveis dos tensores correspondentes a camadas treináveis.
  • non_trainable_variables : Uma iteráveis dos tensores correspondentes a camadas não-treináveis.

Para os nossos propósitos, só vamos usar os trainable_variables . (já que nosso modelo só tem isso!).

Construindo seu próprio algoritmo de Aprendizado Federado

Enquanto o tff.learning API permite criar muitas variantes de Federated Média, existem outros algoritmos federados que não se encaixam perfeitamente neste quadro. Por exemplo, você pode querer adicionar regularização, recorte, ou algoritmos mais complicados, tais como formação GAN federado . Você também pode ser em vez estar interessado em análise federados .

Para esses algoritmos mais avançados, teremos que escrever nosso próprio algoritmo personalizado usando TFF. Em muitos casos, os algoritmos federados têm 4 componentes principais:

  1. Uma etapa de transmissão de servidor para cliente.
  2. Uma etapa de atualização do cliente local.
  3. Uma etapa de upload de cliente para servidor.
  4. Uma etapa de atualização do servidor.

Em TFF, que geralmente representam algoritmos federados como tff.templates.IterativeProcess (aos quais nos referimos como apenas um IterativeProcess em todo). Esta é uma classe que contém initialize e next funções. Aqui, initialize é usado para inicializar o servidor, e next realizará uma rodada de comunicação do algoritmo federado. Vamos escrever um esqueleto de como deve ser o nosso processo iterativo para FedAvg.

Primeiro, temos uma função de inicialização que simplesmente cria uma tff.learning.Model , e retorna os seus pesos treináveis.

def initialize_fn():
  model = model_fn()
  return model.trainable_variables

Esta função parece boa, mas como veremos mais tarde, precisaremos fazer uma pequena modificação para torná-la um "cálculo TFF".

Nós também queremos esboçar o next_fn .

def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = client_update(federated_dataset, server_weights_at_client)

  # The server averages these updates.
  mean_client_weights = mean(client_weights)

  # The server updates its model.
  server_weights = server_update(mean_client_weights)

  return server_weights

Vamos nos concentrar na implementação desses quatro componentes separadamente. Primeiro, focamos nas partes que podem ser implementadas no TensorFlow puro, ou seja, as etapas de atualização do cliente e do servidor.

Blocos do TensorFlow

Atualização do cliente

Vamos usar a nossa tff.learning.Model para fazer o treinamento cliente essencialmente da mesma maneira que você iria treinar um modelo TensorFlow. Em particular, usaremos tf.GradientTape para calcular o gradiente em lotes de dados, em seguida, aplicar estes gradiente usando um client_optimizer . Nós nos concentramos apenas nos pesos treináveis.

@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

Atualização do servidor

A atualização do servidor para FedAvg é mais simples do que a atualização do cliente. Implementaremos a média federada "vanilla", na qual simplesmente substituímos os pesos do modelo do servidor pela média dos pesos do modelo do cliente. Novamente, nos concentramos apenas nos pesos treináveis.

@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

O trecho poderia ser simplificado, basta retornar os mean_client_weights . No entanto, as implementações mais avançadas de federados utilizam Média mean_client_weights com técnicas mais sofisticadas, tais como impulso ou adaptabilidade.

Desafio: Implementar uma versão do server_update que atualiza os pesos dos servidores para ser o ponto médio de model_weights e mean_client_weights. (Nota: Este tipo de abordagem "ponto médio" é análogo ao trabalho recente sobre o otimizador Lookahead !).

Até agora, escrevemos apenas código TensorFlow puro. Isso ocorre intencionalmente, pois o TFF permite que você use muito do código do TensorFlow com o qual você já está familiarizado. No entanto, agora temos de especificar a lógica de orquestração, ou seja, a lógica que dita o que as transmissões do servidor para o cliente, eo que os envios de cliente para o servidor.

Isso exigirá a Federated Núcleo da TFF.

Introdução ao Federated Core

O Federados núcleo (FC) é um conjunto de interfaces de nível inferior que servem como a base para o tff.learning API. No entanto, essas interfaces não se limitam ao aprendizado. Na verdade, eles podem ser usados ​​para análises e muitos outros cálculos sobre dados distribuídos.

Em um nível superior, o núcleo federado é um ambiente de desenvolvimento que permite a lógica de programa expressa de forma compacta para combinar o código do TensorFlow com operadores de comunicação distribuída (como somas distribuídas e transmissões). O objetivo é dar aos pesquisadores e profissionais o controle explícito sobre a comunicação distribuída em seus sistemas, sem exigir detalhes de implementação do sistema (como especificar trocas de mensagens de rede ponto a ponto).

Um ponto importante é que a TFF foi projetada para preservar a privacidade. Portanto, ele permite o controle explícito sobre onde os dados residem, para evitar o acúmulo indesejado de dados no local do servidor centralizado.

Dados federados

Um conceito-chave na TFF é "dados federados", que se refere a uma coleção de itens de dados hospedados em um grupo de dispositivos em um sistema distribuído (por exemplo, conjuntos de dados do cliente ou pesos do modelo do servidor). Nós modelar toda a coleção de itens de dados em todos os dispositivos como um único valor federado.

Por exemplo, suponha que temos dispositivos clientes em que cada um tem um flutuador que representa a temperatura de um sensor. Nós poderíamos representá-lo como uma bóia federado

federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

Tipos federados são especificadas por um tipo T dos seus constituintes membro (por ex. tf.float32 ) e um grupo G de dispositivos. Vamos nos concentrar nos casos em que G é ou tff.CLIENTS ou tff.SERVER . Tal tipo federado é representado como {T}@G , como mostrado abaixo.

str(federated_float_on_clients)
'{float32}@CLIENTS'

Por que nos preocupamos tanto com os canais? Um dos principais objetivos do TFF é permitir a escrita de código que possa ser implantado em um sistema distribuído real. Isso significa que é vital raciocinar sobre quais subconjuntos de dispositivos executam quais códigos e onde residem as diferentes partes dos dados.

TFF concentra-se em três coisas: dados, onde os dados são colocados e como os dados estão sendo transformadas. Os dois primeiros são encapsulados em tipos federados, enquanto o último é encapsulado em cálculos federados.

Cálculos federados

TFF é um ambiente de programação funcional fortemente tipado cujas unidades básicas são cálculos federados. Essas são partes da lógica que aceitam valores federados como entrada e retornam valores federados como saída.

Por exemplo, suponha que quiséssemos calcular a média das temperaturas dos sensores de nossos clientes. Poderíamos definir o seguinte (usando nosso flutuador federado):

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)

Você pode perguntar: como isso é diferente do tf.function decorador em TensorFlow? A resposta fundamental é que o código gerado pelo tff.federated_computation é nem código TensorFlow nem Python; Ele é uma especificação de um sistema distribuído em uma linguagem de cola independente da plataforma interna.

Embora isso possa parecer complicado, você pode pensar em cálculos TFF como funções com assinaturas de tipo bem definidas. Essas assinaturas de tipo podem ser consultadas diretamente.

str(get_average_temperature.type_signature)
'({float32}@CLIENTS -> float32@SERVER)'

Este tff.federated_computation aceita argumentos do tipo federado {float32}@CLIENTS e retorna valores do tipo federado {float32}@SERVER . Os cálculos federados também podem ir de servidor para cliente, de cliente para cliente ou de servidor para servidor. Os cálculos federados também podem ser compostos como funções normais, desde que suas assinaturas de tipo correspondam.

Para apoiar o desenvolvimento, TFF permite invocar um tff.federated_computation como uma função Python. Por exemplo, podemos chamar

get_average_temperature([68.5, 70.3, 69.8])
69.53334

Cálculos não ansiosos e TensorFlow

Existem duas restrições principais a serem observadas. Primeiro, quando o interpretador encontra um tff.federated_computation decorador, a função é atribuída uma vez e serializada para uso futuro. Devido à natureza descentralizada do Federated Learning, esse uso futuro pode ocorrer em outro lugar, como um ambiente de execução remota. Portanto, cálculos TFF são fundamentalmente não-ansiosos. Este comportamento é um tanto análoga à do tf.function decorador em TensorFlow.

Em segundo lugar, uma computação federado só pode consistir de operadores federados (tais como tff.federated_mean ), que não pode conter operações TensorFlow. Código TensorFlow deve limitar-se aos blocos decorados com tff.tf_computation . A maioria dos códigos TensorFlow comum pode ser diretamente decorados, como a seguinte função que recebe um número e acrescenta 0.5 a ele.

@tff.tf_computation(tf.float32)
def add_half(x):
  return tf.add(x, 0.5)

Estes também têm assinaturas de tipo, mas sem colocações. Por exemplo, podemos chamar

str(add_half.type_signature)
'(float32 -> float32)'

Aqui vemos uma diferença importante entre tff.federated_computation e tff.tf_computation . O primeiro tem colocações explícitas, enquanto o último não.

Podemos usar tff.tf_computation blocos em cálculos federados especificando colocações. Vamos criar uma função que adiciona metade, mas apenas aos flutuadores federados nos clientes. Podemos fazer isso usando tff.federated_map , que se aplica uma determinada tff.tf_computation , preservando a colocação.

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def add_half_on_clients(x):
  return tff.federated_map(add_half, x)

Esta função é quase idêntico ao add_half , exceto que ele só aceita valores com a colocação em tff.CLIENTS e valores retorna com a mesma colocação. Podemos ver isso em sua assinatura de tipo:

str(add_half_on_clients.type_signature)
'({float32}@CLIENTS -> {float32}@CLIENTS)'

Resumindo:

  • TFF opera em valores federados.
  • Cada valor federado tem um tipo associado, com um tipo (por ex. tf.float32 ) e um canal (por ex. tff.CLIENTS ).
  • Valores federados podem ser transformados usando cálculos federados, que devem ser decoradas com tff.federated_computation e um tipo de assinatura federado.
  • Código TensorFlow deve estar contido em blocos com tff.tf_computation decoradores.
  • Esses blocos podem então ser incorporados em cálculos federados.

Construindo seu próprio algoritmo de Aprendizado Federado, revisitado

Agora que vimos o Federated Core, podemos construir nosso próprio algoritmo de aprendizado federado. Lembre-se que acima, definimos um initialize_fn e next_fn para o nosso algoritmo. O next_fn fará uso da client_update e server_update definimos usando o código TensorFlow puro.

No entanto, a fim de fazer o nosso algoritmo uma computação federada, precisaremos tanto o next_fn e initialize_fn a cada ser um tff.federated_computation .

Blocos federados do TensorFlow

Criando o cálculo de inicialização

A função initialize será bastante simples: Vamos criar um modelo usando model_fn . No entanto, lembre-se que é preciso separar o nosso código TensorFlow usando tff.tf_computation .

@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables

Podemos, então, passar esta diretamente em uma computação federada usando tff.federated_value .

@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

Criando o next_fn

Agora usamos nosso código de atualização de cliente e servidor para escrever o algoritmo real. Vamos primeiro transformar a nossa client_update em um tff.tf_computation que aceita um conjuntos de dados de clientes e pesos dos servidores, e gera um pesos cliente tensor atualizado.

Precisaremos dos tipos correspondentes para decorar adequadamente nossa função. Felizmente, o tipo de pesos do servidor pode ser extraído diretamente de nosso modelo.

whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

Vejamos a assinatura do tipo de conjunto de dados. Lembre-se de que pegamos 28 por 28 imagens (com rótulos inteiros) e as achatamos.

str(tf_dataset_type)
'<float32[?,784],int32[?,1]>*'

Também pode extrair o tipo pesos modelo usando nossa server_init função acima.

model_weights_type = server_init.type_signature.result

Examinando a assinatura de tipo, poderemos ver a arquitetura do nosso modelo!

str(model_weights_type)
'<float32[784,10],float32[10]>'

Agora podemos criar a nossa tff.tf_computation para a atualização do cliente.

@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

O tff.tf_computation versão da atualização do servidor pode ser definido de uma forma similar, usando tipos que já extraídos.

@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

Por último, mas não menos importante, precisamos criar o tff.federated_computation que traz tudo isso junto. Esta função irá aceitar dois valores federados, um correspondentes aos pesos dos servidores (com colocação tff.SERVER ), e o outro correspondente às bases de dados de cliente (com colocação tff.CLIENTS ).

Observe que ambos os tipos foram definidos acima! Nós simplesmente precisamos dar-lhes o posicionamento adequado usando tff.FederatedType .

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

Lembra dos 4 elementos de um algoritmo FL?

  1. Uma etapa de transmissão de servidor para cliente.
  2. Uma etapa de atualização do cliente local.
  3. Uma etapa de upload de cliente para servidor.
  4. Uma etapa de atualização do servidor.

Agora que construímos o acima, cada parte pode ser representada de forma compacta como uma única linha de código TFF. É por essa simplicidade que tivemos que tomar cuidado extra para especificar coisas como tipos federados!

@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))

  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

Temos agora uma tff.federated_computation tanto para a inicialização do algoritmo, e para a execução de um passo do algoritmo. Para terminar o nosso algoritmo, passamos estes em tff.templates.IterativeProcess .

federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

Vamos olhar a assinatura tipo de initialize e next funções do nosso processo iterativo.

str(federated_algorithm.initialize.type_signature)
'( -> <float32[784,10],float32[10]>@SERVER)'

Isto reflecte o facto de que federated_algorithm.initialize é uma função não-arg que retorna um modelo de camada única (com um peso de matriz 784 por 10, e 10 unidades de polarização).

str(federated_algorithm.next.type_signature)
'(<server_weights=<float32[784,10],float32[10]>@SERVER,federated_dataset={<float32[?,784],int32[?,1]>*}@CLIENTS> -> <float32[784,10],float32[10]>@SERVER)'

Aqui, vemos que federated_algorithm.next aceita um modelo de servidor e os dados do cliente e retorna um modelo de servidor atualizado.

Avaliando o algoritmo

Vamos fazer algumas rodadas e ver como a perda muda. Em primeiro lugar, que vai definir uma função de avaliação utilizando a abordagem centralizada discutido na segunda tutorial.

Primeiro criamos um conjunto de dados de avaliação centralizado e, em seguida, aplicamos o mesmo pré-processamento que usamos para os dados de treinamento.

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients()
central_emnist_test = preprocess(central_emnist_test)

Em seguida, escrevemos uma função que aceita um estado de servidor e usa Keras para avaliar o conjunto de dados de teste. Se você estiver familiarizado com tf.Keras , isto toda a aparência familiar, embora note o uso de set_weights !

def evaluate(server_state):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]  
  )
  keras_model.set_weights(server_state)
  keras_model.evaluate(central_emnist_test)

Agora, vamos inicializar nosso algoritmo e avaliar no conjunto de teste.

server_state = federated_algorithm.initialize()
evaluate(server_state)
2042/2042 [==============================] - 2s 767us/step - loss: 2.8479 - sparse_categorical_accuracy: 0.1027

Vamos treinar algumas rodadas e ver se algo muda.

for round in range(15):
  server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)
2042/2042 [==============================] - 2s 738us/step - loss: 2.5867 - sparse_categorical_accuracy: 0.0980

Vemos uma ligeira diminuição na função de perda. Embora o salto seja pequeno, realizamos apenas 15 rodadas de treinamento e em um pequeno subconjunto de clientes. Para ver melhores resultados, podemos ter que fazer centenas, senão milhares de rodadas.

Modificando nosso algoritmo

Neste ponto, vamos parar e pensar sobre o que conquistamos. Implementamos o Federated Averaging diretamente combinando o código puro do TensorFlow (para as atualizações do cliente e do servidor) com cálculos federados do Federated Core do TFF.

Para realizar um aprendizado mais sofisticado, podemos simplesmente alterar o que temos acima. Em particular, ao editar o código TF puro acima, podemos mudar a forma como o cliente realiza o treinamento ou como o servidor atualiza seu modelo.

Desafio: Adicionar recorte gradiente ao client_update função.

Se quiséssemos fazer mudanças maiores, também poderíamos fazer com que o servidor armazenasse e transmitisse mais dados. Por exemplo, o servidor também pode armazenar a taxa de aprendizagem do cliente e fazê-la diminuir com o tempo! Note-se que isso vai exigir mudanças nas assinaturas de tipo utilizadas no tff.tf_computation chama acima.

Mais difícil desafio: Implementar Federated Média com a aprendizagem de decadência taxa sobre os clientes.

Neste ponto, você pode começar a perceber quanta flexibilidade existe no que você pode implementar nesta estrutura. Para idéias (incluindo a resposta para o desafio mais difícil acima) você pode ver o código-fonte para tff.learning.build_federated_averaging_process , ou check-out vários projetos de pesquisa utilizando TFF.