TFF برای تحقیقات یادگیری فدرال: فشرده سازی مدل و بروزرسانی

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

در این آموزش، ما با استفاده از EMNIST مجموعه داده برای نشان دادن چگونگی فعال کردن الگوریتم های فشرده سازی با اتلاف به منظور کاهش هزینه های ارتباطی در الگوریتم به طور متوسط فدرال با استفاده از tff.learning.build_federated_averaging_process API و tensor_encoding API. برای اطلاعات بیشتر در الگوریتم به طور متوسط فدرال، مقاله را ببینید ارتباطات کارآمد آموزش شبکه عمیق از غیر متمرکز داده .

قبل از اینکه شروع کنیم

قبل از شروع ، لطفاً موارد زیر را اجرا کنید تا مطمئن شوید که محیط شما به درستی تنظیم شده است. اگر شما یک تبریک نمی بینم، لطفا به مراجعه نصب و راه اندازی راهنمای دستورالعمل.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard

import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

بررسی کنید آیا TFF کار می کند یا خیر.

@tff.federated_computation
def hello_world():
  return 'Hello, World!'

hello_world()
b'Hello, World!'

آماده سازی داده های ورودی

در این بخش ، مجموعه داده EMNIST موجود در TFF را بارگیری و پیش پردازش می کنیم. لطفا از فدرال آموزشی برای طبقه بندی تصویر آموزش برای جزئیات بیشتر در مورد مجموعه داده EMNIST.

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  return (dataset
          # Shuffle according to the largest client dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          # Repeat to do multiple local epochs
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          # Batch to a fixed client batch size
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          # Preprocessing step
          .map(reshape_emnist_element))

emnist_train = emnist_train.preprocess(preprocess_train_dataset)

تعریف یک مدل

در اینجا یک مدل keras بر اساس orginial FedAvg سی ان ان را تعریف می کنیم، و سپس قرار دادن مدل keras در یک نمونه از tff.learning.Model به طوری که می توان آن را توسط TFF مصرف می شود.

توجه داشته باشید که ما یک تابع است که به جای تولید یک مدل از سادگی یک مدل به طور مستقیم نیاز دارند. علاوه بر این، تابع می تواند نه فقط گرفتن یک مدل پیش ساخته، باید آن را مدل در این چارچوب که آن را به نام ایجاد کنید. دلیل این امر این است که TFF برای رفتن به دستگاه ها طراحی شده است و نیاز به کنترل زمان ایجاد منابع دارد تا بتوان آنها را ضبط و بسته بندی کرد.

def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0]).element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

آموزش مدل و خروجی معیارهای آموزشی

اکنون ما آماده ایم تا یک الگوریتم میانگین یابی فدرال بسازیم و مدل تعریف شده را بر روی مجموعه داده EMNIST آموزش دهیم.

در ابتدا ما نیاز به ساخت یک الگوریتم به طور متوسط فدرال با استفاده از tff.learning.build_federated_averaging_process API.

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

حالا بیایید الگوریتم میانگین یابی فدرال را اجرا کنیم. اجرای الگوریتم یادگیری فدرال از دیدگاه TFF به این شکل است:

  1. الگوریتم را اولیه کرده و حالت سرور اولیه را بدست آورید. وضعیت سرور حاوی اطلاعات لازم برای انجام الگوریتم است. به یاد بیاورید ، از آنجا که TFF عملکردی است ، این حالت شامل هر حالت بهینه سازی که الگوریتم استفاده می کند (به عنوان مثال اصطلاحات حرکت) و همچنین پارامترهای خود مدل-اینها به عنوان آرگومان ارسال می شوند و به عنوان نتیجه محاسبات TFF بازگردانده می شوند.
  2. الگوریتم را دور به دور اجرا کنید. در هر دور ، یک وضعیت سرور جدید به عنوان نتیجه هر مشتری که مدل را بر روی داده های خود آموزش می دهد ، بازگردانده می شود. به طور معمول در یک دور:
    1. سرور مدل را برای همه مشتریان شرکت کننده پخش می کند.
    2. هر مشتری کار را بر اساس مدل و داده های خاص خود انجام می دهد.
    3. سرور تمام مدل را برای تولید حالت جداگانه که شامل یک مدل جدید است ، تجمیع می کند.

برای جزئیات بیشتر، لطفا سفارشی فدرال الگوریتم، قسمت 2: آموزش پیاده سازی فدرال به طور متوسط آموزش.

معیارهای آموزشی برای نمایش پس از آموزش به فهرست Tensorboard نوشته می شود.

بارگذاری توابع ابزار

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Trains the federated averaging process and output metrics."""
  # Create a environment to get communication cost.
  environment = set_sizing_environment()

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # Sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = [
          emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      # For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

      # Add metrics to Tensorboard.
      for name, value in metrics['train'].items():
          tf.summary.scalar(name, value, step=round_num)

      # Add broadcasted and aggregated data size to Tensorboard.
      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
  tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
  pass  # Path doesn't exist

# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=10, summary_writer=summary_writer)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit

TensorBoard را با فهرست ورود ریشه مشخص شده در بالا شروع کنید تا معیارهای آموزش نمایش داده شود. بارگیری داده ها ممکن است چند ثانیه طول بکشد. به غیر از دست دادن و دقت ، مقدار داده های پخش شده و جمع آوری شده را نیز خروجی می دهیم. داده های پخش شده به تنسورهایی اشاره می کند که سرور به هر کلاینت فشار می دهد در حالی که داده های جمع آوری شده به تنسورهایی اشاره می کند که هر مشتری به سرور باز می گرداند.

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9135ef1630>

یک تابع پخش سفارشی و مجموع ایجاد کنید

حالا اجازه دهید اجرای تابع برای استفاده از الگوریتم فشرده سازی با اتلاف بر روی داده های پخش و داده های جمع آوری شده با استفاده از tensor_encoding API.

ابتدا دو تابع را تعریف می کنیم:

  • broadcast_encoder_fn که ایجاد یک نمونه از te.core.SimpleEncoder به تانسورها رمزگذاری و یا متغیرها در سرور به ارتباط مشتری (پخش داده).
  • mean_encoder_fn که ایجاد یک نمونه از te.core.GatherEncoder به سرور communicaiton (داده ادغام) به تانسورها رمزگذاری و یا متغیرها در مشتری.

توجه به این نکته ضروری است که ما از روش فشرده سازی به طور همزمان روی کل مدل استفاده نمی کنیم. در عوض ، ما تصمیم می گیریم که چگونه (و آیا) هر متغیر مدل را به طور مستقل فشرده کنیم. دلیل آن این است که به طور کلی ، متغیرهای کوچک مانند سوگیری ها نسبت به عدم دقت بیشتر حساس هستند و نسبتاً کوچک هستند ، پس اندازهای احتمالی ارتباطات نیز نسبتاً کم است. بنابراین ما متغیرهای کوچک را به طور پیش فرض فشرده نمی کنیم. در این مثال ، ما کوانتیزاسیون یکنواخت را روی 8 بیت (256 سطل) برای هر متغیر با بیش از 10000 عنصر اعمال می کنیم و فقط برای متغیرهای دیگر هویت اعمال می کنیم.

def broadcast_encoder_fn(value):
  """Function for building encoded broadcast."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_simple_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(tensor_spec):
  """Function for building a GatherEncoder."""
  spec = tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
  if tensor_spec.shape.num_elements() > 10000:
    return te.encoders.as_gather_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

TFF فراهم می کند رابط های برنامه کاربردی برای تبدیل تابع رمزگذار به یک فرمت است که tff.learning.build_federated_averaging_process API می تواند مصرف می کند. با استفاده از tff.learning.framework.build_encoded_broadcast_from_model و tff.aggregators.MeanFactory ، ما می توانیم دو جسم که می تواند به تصویب ایجاد broadcast_process و model_update_aggregation_factory agruments از tff.learning.build_federated_averaging_process برای ایجاد یک فدرال الگوریتم میانگین با یک الگوریتم فشرده سازی با اتلاف.

encoded_broadcast_process = (
    tff.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn, broadcast_encoder_fn))

mean_factory = tff.aggregators.MeanFactory(
    tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
    tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)

federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process=encoded_broadcast_process,
    model_update_aggregation_factory=mean_factory)

مدل را دوباره آموزش دهید

حالا بیایید الگوریتم جدید میانگین یابی فدرال را اجرا کنیم.

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

train(federated_averaging_process=federated_averaging_with_compression, 
      num_rounds=10,
      num_clients_per_round=10,
      summary_writer=summary_writer_for_compression)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit

دوباره TensorBoard را شروع کنید تا معیارهای آموزش را بین دو اجرا مقایسه کنید.

همانطور که شما می توانید در Tensorboard بینید، یک کاهش قابل توجهی بین وجود دارد orginial و compression منحنی در broadcasted_bits و aggregated_bits توطئه در حالی که در loss و sparse_categorical_accuracy طرح دو منحنی بسیار مشابه هستند.

در نتیجه ، ما یک الگوریتم فشرده سازی را اجرا کردیم که می تواند عملکرد مشابهی با الگوریتم میانگین فدرال اولیه بدست آورد در حالی که هزینه تخلیه به طور قابل توجهی کاهش می یابد.

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9140eb5ef0>

تمرینات

برای پیاده سازی یک الگوریتم فشرده سازی سفارشی و اعمال آن در حلقه آموزشی ، می توانید:

  1. پیاده سازی یک الگوریتم فشرده سازی جدید را به عنوان یک زیر کلاس از EncodingStageInterface و یا نوع کلی تر، AdaptiveEncodingStageInterface زیر این مثال .
  2. ساخت جدید خود را Encoder و تخصص آن را برای پخش مدل و یا متوسط به روز رسانی مدل .
  3. استفاده از آن اشیاء برای ساخت کل محاسبات آموزش .

پرسشهای تحقیقاتی باز با ارزش بالقوه عبارتند از: کمی سازی یکنواخت ، فشرده سازی بدون ضرر مانند کدگذاری هافمن و مکانیزمهای سازگاری فشرده سازی بر اساس اطلاعات دورهای آموزشی قبلی.

مطالب توصیه شده برای مطالعه: