Halaman ini diterjemahkan oleh Cloud Translation API.
Switch to English

TFF untuk Penelitian Pembelajaran Federasi: Model dan Kompresi Pembaruan

Dalam tutorial ini, kami menggunakan set data EMNIST untuk mendemonstrasikan cara mengaktifkan algoritme kompresi lossy untuk mengurangi biaya komunikasi dalam algoritme Federated Averaging menggunakan API tff.learning.build_federated_averaging_process dan API tensor_encoding . Untuk detail lebih lanjut tentang algoritme Federated Averaging, lihat makalah Communication-Efficient Learning of Deep Networks from Decentralized Data .

Sebelum kita mulai

Sebelum kita mulai, jalankan perintah berikut untuk memastikan bahwa lingkungan Anda sudah diatur dengan benar. Jika Anda tidak melihat salam, lihat panduan Instalasi untuk instruksi.


!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

%load_ext tensorboard
import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

Verifikasi apakah TFF berfungsi.

@tff.federated_computation
def hello_world():
  return 'Hello, World!'

hello_world()
b'Hello, World!'

Mempersiapkan data masukan

Di bagian ini kami memuat dan memproses sebelumnya set data EMNIST yang termasuk dalam TFF. Silakan lihat tutorial Pembelajaran Federasi untuk Klasifikasi Gambar untuk detail lebih lanjut tentang set data EMNIST.

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  return (dataset
          # Shuffle according to the largest client dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          # Repeat to do multiple local epochs
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          # Batch to a fixed client batch size
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          # Preprocessing step
          .map(reshape_emnist_element))

emnist_train = emnist_train.preprocess(preprocess_train_dataset)

Mendefinisikan model

Di sini kita mendefinisikan model keras berdasarkan orginial FedAvg CNN, dan kemudian membungkus model keras tersebut dalam sebuah instance tff.learning.Model sehingga bisa dikonsumsi oleh TFF.

Perhatikan bahwa kita memerlukan fungsi yang menghasilkan model, bukan hanya model secara langsung. Selain itu, fungsi tersebut tidak dapat hanya menangkap model yang telah dibuat sebelumnya, tetapi harus membuat model dalam konteks yang disebutnya. Alasannya adalah bahwa TFF dirancang untuk digunakan pada perangkat, dan membutuhkan kendali atas kapan sumber daya dibangun sehingga mereka dapat ditangkap dan dikemas.

def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0]).element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Melatih model dan mengeluarkan metrik pelatihan

Sekarang kita siap untuk membuat algoritme Federated Averaging dan melatih model yang ditentukan pada set data EMNIST.

Pertama kita perlu membangun algoritma Federated Averaging menggunakan API tff.learning.build_federated_averaging_process .

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

Sekarang mari kita jalankan algoritma Federated Averaging. Eksekusi algoritme Federated Learning dari perspektif TFF terlihat seperti ini:

  1. Inisialisasi algoritme dan dapatkan status server awal. Status server berisi informasi yang diperlukan untuk menjalankan algoritme. Ingat, karena TFF berfungsi, status ini mencakup status pengoptimal apa pun yang digunakan algoritme (misalnya istilah momentum) serta parameter model itu sendiri - ini akan diteruskan sebagai argumen dan dikembalikan sebagai hasil dari perhitungan TFF.
  2. Jalankan algoritma putaran demi putaran. Di setiap putaran, status server baru akan dikembalikan sebagai hasil dari setiap klien yang melatih model pada datanya. Biasanya dalam satu putaran:
    1. Server menyiarkan model ke semua klien yang berpartisipasi.
    2. Setiap klien melakukan pekerjaan berdasarkan model dan datanya sendiri.
    3. Server menggabungkan semua model untuk menghasilkan status server yang berisi model baru.

Untuk detail selengkapnya, silakan lihat Algoritme Federasi Kustom, Bagian 2: Menerapkan tutorial Perataan Federasi .

Metrik pelatihan ditulis ke direktori Tensorboard untuk ditampilkan setelah pelatihan.



def format_size(size):
  """A helper function for creating a human-readable size."""
  size = float(size)
  for unit in ['bit','Kibit','Mibit','Gibit']:
    if size < 1024.0:
      return "{size:3.2f}{unit}".format(size=size, unit=unit)
    size /= 1024.0
  return "{size:.2f}{unit}".format(size=size, unit='TiB')

def set_sizing_environment():
  """Creates an environment that contains sizing information."""
  # Creates a sizing executor factory to output communication cost
  # after the training finishes. Note that sizing executor only provides an
  # estimate (not exact) of communication cost, and doesn't capture cases like
  # compression of over-the-wire representations. However, it's perfect for
  # demonstrating the effect of compression in this tutorial.
  sizing_factory = tff.framework.sizing_executor_factory()

  # TFF has a modular runtime you can configure yourself for various
  # environments and purposes, and this example just shows how to configure one
  # part of it to report the size of things.
  context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
  tff.framework.set_default_context(context)

  return sizing_factory
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Trains the federated averaging process and output metrics."""
  # Create a environment to get communication cost.
  environment = set_sizing_environment()

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # Sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = [
          emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      # For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

      # Add metrics to Tensorboard.
      for name, value in metrics['train'].items():
          tf.summary.scalar(name, value, step=round_num)

      # Add broadcasted and aggregated data size to Tensorboard.
      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()
# Clean the log directory to avoid conflicts.
!rm -R /tmp/logs/scalars/*

# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=10, summary_writer=summary_writer)
round  0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09433962404727936,loss=2.3181073665618896>>, broadcasted_bits=507.62MiB, aggregated_bits=507.62MiB
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.0765027329325676,loss=2.3148586750030518>>, broadcasted_bits=1015.24MiB, aggregated_bits=1015.24MiB
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08872458338737488,loss=2.3089394569396973>>, broadcasted_bits=1.49GiB, aggregated_bits=1.49GiB
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10852713137865067,loss=2.304060220718384>>, broadcasted_bits=1.98GiB, aggregated_bits=1.98GiB
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10818713158369064,loss=2.3026843070983887>>, broadcasted_bits=2.48GiB, aggregated_bits=2.48GiB
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10454985499382019,loss=2.300365447998047>>, broadcasted_bits=2.97GiB, aggregated_bits=2.97GiB
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12841254472732544,loss=2.29765248298645>>, broadcasted_bits=3.47GiB, aggregated_bits=3.47GiB
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14023210108280182,loss=2.2977216243743896>>, broadcasted_bits=3.97GiB, aggregated_bits=3.97GiB
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.15060241520404816,loss=2.29490327835083>>, broadcasted_bits=4.46GiB, aggregated_bits=4.46GiB
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13088512420654297,loss=2.2942349910736084>>, broadcasted_bits=4.96GiB, aggregated_bits=4.96GiB

Mulai TensorBoard dengan direktori log root yang ditentukan di atas untuk menampilkan metrik pelatihan. Perlu waktu beberapa detik untuk memuat data. Kecuali Kehilangan dan Akurasi, kami juga mengeluarkan jumlah data yang disiarkan dan dikumpulkan. Data yang disiarkan mengacu pada tensor yang didorong server ke setiap klien sementara data gabungan mengacu pada tensor yang dikembalikan setiap klien ke server.


%tensorboard --logdir /tmp/logs/scalars/ --port=0

Buat siaran khusus dan fungsi agregat

Sekarang mari kita implementasikan fungsi untuk menggunakan algoritma kompresi lossy pada data yang disiarkan dan data agregat menggunakan tensor_encoding API.

Pertama, kami mendefinisikan dua fungsi:

  • broadcast_encoder_fn yang membuat turunan te.core.SimpleEncoder untuk menyandikan tensor atau variabel di komunikasi server ke klien (Data siaran).
  • mean_encoder_fn yang membuat turunan te.core.GatherEncoder untuk menyandikan tensor atau variabel dalam komunikasi klien ke server (data Agregasi).

Penting untuk diperhatikan bahwa kami tidak menerapkan metode kompresi ke seluruh model sekaligus. Sebaliknya, kami memutuskan bagaimana (dan apakah) mengompresi setiap variabel model secara independen. Alasannya adalah secara umum, variabel kecil seperti bias lebih sensitif terhadap ketidakakuratan, dan karena relatif kecil, potensi penghematan komunikasi juga relatif kecil. Karenanya kami tidak mengompresi variabel kecil secara default. Dalam contoh ini, kami menerapkan kuantisasi seragam ke 8 bit (256 bucket) ke setiap variabel dengan lebih dari 10.000 elemen, dan hanya menerapkan identitas ke variabel lain.

def broadcast_encoder_fn(value):
  """Function for building encoded broadcast."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_simple_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(value):
  """Function for building encoded mean."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_gather_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

TFF menyediakan API untuk mengubah fungsi encoder menjadi format yang dapat digunakan oleh API tff.learning.build_federated_averaging_process . Dengan menggunakan tff.learning.framework.build_encoded_broadcast_from_model dan tff.learning.framework.build_encoded_mean_from_model , kita dapat membuat dua fungsi yang dapat diteruskan ke agrumen broadcast_process dan aggregation_process dari tff.learning.build_federated_averaging_process dengan tff.learning.build_federated_averaging_process .

encoded_broadcast_process = (
    tff.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn, broadcast_encoder_fn))
encoded_mean_process = (
    tff.learning.framework.build_encoded_mean_process_from_model(
    tff_model_fn, mean_encoder_fn))

federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process=encoded_broadcast_process,
    aggregation_process=encoded_mean_process)

Latih model itu lagi

Sekarang mari kita jalankan algoritma Federated Averaging yang baru.

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

train(federated_averaging_process=federated_averaging_with_compression, 
      num_rounds=10,
      num_clients_per_round=10,
      summary_writer=summary_writer_for_compression)
round  0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08722109347581863,loss=2.3216357231140137>>, broadcasted_bits=146.46MiB, aggregated_bits=146.46MiB
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08379272371530533,loss=2.3108291625976562>>, broadcasted_bits=292.92MiB, aggregated_bits=292.92MiB
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08834951370954514,loss=2.3074147701263428>>, broadcasted_bits=439.38MiB, aggregated_bits=439.39MiB
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10467479377985,loss=2.305814027786255>>, broadcasted_bits=585.84MiB, aggregated_bits=585.85MiB
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09853658825159073,loss=2.3012874126434326>>, broadcasted_bits=732.30MiB, aggregated_bits=732.31MiB
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14904330670833588,loss=2.3005223274230957>>, broadcasted_bits=878.77MiB, aggregated_bits=878.77MiB
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13152804970741272,loss=2.2985599040985107>>, broadcasted_bits=1.00GiB, aggregated_bits=1.00GiB
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12392637878656387,loss=2.297102451324463>>, broadcasted_bits=1.14GiB, aggregated_bits=1.14GiB
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13289350271224976,loss=2.2944107055664062>>, broadcasted_bits=1.29GiB, aggregated_bits=1.29GiB
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12661737203598022,loss=2.2971296310424805>>, broadcasted_bits=1.43GiB, aggregated_bits=1.43GiB

Mulai TensorBoard lagi untuk membandingkan metrik pelatihan antara dua proses.

Seperti yang Anda lihat di Tensorboard, ada pengurangan yang signifikan antara kurva orginial dan compression di plot broadcasted_bits dan aggregated_bits sementara di plot loss dan sparse_categorical_accuracy kedua kurva tersebut cukup mirip.

Sebagai kesimpulan, kami menerapkan algoritma kompresi yang dapat mencapai kinerja yang sama dengan algoritma orignial Federated Averaging sementara biaya komunikasi berkurang secara signifikan.


%tensorboard --logdir /tmp/logs/scalars/ --port=0

Latihan

Untuk menerapkan algoritme kompresi khusus dan menerapkannya ke loop pelatihan, Anda dapat:

  1. Implementasikan algoritme kompresi baru sebagai subclass EncodingStageInterface atau varian yang lebih umum, AdaptiveEncodingStageInterface mengikuti contoh ini .
  2. Buat Encoder baru Anda dan mengkhususkan untuk siaran model atau rata-rata pembaruan model .
  3. Gunakan objek tersebut untuk membuat seluruh komputasi pelatihan .

Pertanyaan penelitian terbuka yang berpotensi berharga meliputi: kuantisasi tidak seragam, kompresi lossless seperti pengkodean huffman, dan mekanisme untuk mengadaptasi kompresi berdasarkan informasi dari putaran pelatihan sebelumnya.

Bahan bacaan yang direkomendasikan: