TFF برای تحقیقات یادگیری فدرال: فشرده سازی مدل و بروزرسانی

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

در این آموزش، ما با استفاده از EMNIST مجموعه داده برای نشان دادن چگونگی فعال کردن الگوریتم های فشرده سازی با اتلاف به منظور کاهش هزینه های ارتباطی در الگوریتم به طور متوسط فدرال با استفاده از tff.learning.build_federated_averaging_process API و tensor_encoding API. برای اطلاعات بیشتر در الگوریتم به طور متوسط فدرال، مقاله را ببینید ارتباطات کارآمد آموزش شبکه عمیق از غیر متمرکز داده .

قبل از اینکه شروع کنیم

قبل از شروع، لطفاً موارد زیر را اجرا کنید تا مطمئن شوید که محیط شما به درستی تنظیم شده است. اگر شما یک تبریک نمی بینم، لطفا به مراجعه نصب و راه اندازی راهنمای دستورالعمل.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard

import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

بررسی کنید که آیا TFF کار می کند یا خیر.

@tff.federated_computation
def hello_world():
  return 'Hello, World!'

hello_world()
b'Hello, World!'

آماده سازی داده های ورودی

در این بخش مجموعه داده EMNIST موجود در TFF را بارگیری و پیش پردازش می کنیم. لطفا از فدرال آموزشی برای طبقه بندی تصویر آموزش برای جزئیات بیشتر در مورد مجموعه داده EMNIST.

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  return (dataset
          # Shuffle according to the largest client dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          # Repeat to do multiple local epochs
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          # Batch to a fixed client batch size
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          # Preprocessing step
          .map(reshape_emnist_element))

emnist_train = emnist_train.preprocess(preprocess_train_dataset)

تعریف مدل

در اینجا یک مدل keras بر اساس orginial FedAvg سی ان ان را تعریف می کنیم، و سپس قرار دادن مدل keras در یک نمونه از tff.learning.Model به طوری که می توان آن را توسط TFF مصرف می شود.

توجه داشته باشید که ما یک تابع است که به جای تولید یک مدل از سادگی یک مدل به طور مستقیم نیاز دارند. علاوه بر این، تابع می تواند نه فقط گرفتن یک مدل پیش ساخته، باید آن را مدل در این چارچوب که آن را به نام ایجاد کنید. دلیل آن این است که TFF برای رفتن به دستگاه ها طراحی شده است و نیاز به کنترل زمان ساخت منابع دارد تا بتوان آنها را جمع آوری و بسته بندی کرد.

def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0]).element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

آموزش مدل و خروجی معیارهای آموزشی

اکنون ما آماده ساختن یک الگوریتم میانگین گیری فدرال و آموزش مدل تعریف شده بر روی مجموعه داده EMNIST هستیم.

در ابتدا ما نیاز به ساخت یک الگوریتم به طور متوسط فدرال با استفاده از tff.learning.build_federated_averaging_process API.

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

حالا بیایید الگوریتم میانگین گیری فدرال را اجرا کنیم. اجرای یک الگوریتم یادگیری فدرال از دیدگاه TFF به شکل زیر است:

  1. الگوریتم را راه اندازی کنید و حالت اولیه سرور را دریافت کنید. وضعیت سرور حاوی اطلاعات لازم برای اجرای الگوریتم است. به یاد بیاورید، از آنجایی که TFF عملکردی است، این حالت هم شامل هر حالت بهینه ساز مورد استفاده الگوریتم (مثلاً شرایط حرکت) و هم خود پارامترهای مدل می شود - اینها به عنوان آرگومان ارسال می شوند و به عنوان نتایج حاصل از محاسبات TFF برگردانده می شوند.
  2. الگوریتم را دور به دور اجرا کنید. در هر دور، یک حالت سرور جدید به عنوان نتیجه آموزش هر کلاینت مدل بر روی داده های خود، برگردانده می شود. به طور معمول در یک دور:
    1. سرور مدل را برای همه مشتریان شرکت کننده پخش می کند.
    2. هر مشتری کار را بر اساس مدل و داده های خود انجام می دهد.
    3. سرور تمام مدل ها را جمع می کند تا یک حالت جداکننده که حاوی یک مدل جدید است تولید کند.

برای جزئیات بیشتر، لطفا سفارشی فدرال الگوریتم، قسمت 2: آموزش پیاده سازی فدرال به طور متوسط آموزش.

معیارهای تمرین در فهرست Tensorboard برای نمایش بعد از آموزش نوشته می شوند.

بارگذاری توابع ابزار

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Trains the federated averaging process and output metrics."""
  # Create a environment to get communication cost.
  environment = set_sizing_environment()

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # Sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = [
          emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      # For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

      # Add metrics to Tensorboard.
      for name, value in metrics['train'].items():
          tf.summary.scalar(name, value, step=round_num)

      # Add broadcasted and aggregated data size to Tensorboard.
      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
  tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
  pass  # Path doesn't exist

# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=10, summary_writer=summary_writer)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit

برای نمایش معیارهای آموزشی، TensorBoard را با دایرکتوری root log مشخص شده در بالا شروع کنید. ممکن است چند ثانیه طول بکشد تا داده ها بارگیری شوند. به جز Loss and Accuracy، مقدار داده های پخش شده و جمع آوری شده را نیز خروجی می دهیم. داده های پخش شده به تانسورهایی اشاره دارد که سرور به هر کلاینت فشار می دهد در حالی که داده های انبوه به تانسورهایی اشاره دارد که هر کلاینت به سرور برمی گرداند.

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9135ef1630>

ساخت یک پخش سفارشی و عملکرد جمع

حالا اجازه دهید اجرای تابع برای استفاده از الگوریتم فشرده سازی با اتلاف بر روی داده های پخش و داده های جمع آوری شده با استفاده از tensor_encoding API.

ابتدا دو تابع را تعریف می کنیم:

  • broadcast_encoder_fn که ایجاد یک نمونه از te.core.SimpleEncoder به تانسورها رمزگذاری و یا متغیرها در سرور به ارتباط مشتری (پخش داده).
  • mean_encoder_fn که ایجاد یک نمونه از te.core.GatherEncoder به سرور communicaiton (داده ادغام) به تانسورها رمزگذاری و یا متغیرها در مشتری.

توجه به این نکته مهم است که ما یک روش فشرده سازی را به یکباره برای کل مدل اعمال نمی کنیم. در عوض، ما تصمیم می‌گیریم که چگونه (و آیا) هر متغیر مدل را به طور مستقل فشرده کنیم. دلیل این امر این است که معمولاً متغیرهای کوچک مانند سوگیری ها نسبت به عدم دقت حساس تر هستند و نسبتاً کوچک هستند، پس انداز ارتباطی بالقوه نیز نسبتاً ناچیز است. بنابراین ما متغیرهای کوچک را به طور پیش فرض فشرده نمی کنیم. در این مثال، برای هر متغیری که بیش از 10000 عنصر دارد، 8 بیت (256 سطل) یکنواخت را اعمال می‌کنیم و فقط برای متغیرهای دیگر هویت اعمال می‌کنیم.

def broadcast_encoder_fn(value):
  """Function for building encoded broadcast."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_simple_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(tensor_spec):
  """Function for building a GatherEncoder."""
  spec = tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
  if tensor_spec.shape.num_elements() > 10000:
    return te.encoders.as_gather_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

TFF فراهم می کند رابط های برنامه کاربردی برای تبدیل تابع رمزگذار به یک فرمت است که tff.learning.build_federated_averaging_process API می تواند مصرف می کند. با استفاده از tff.learning.framework.build_encoded_broadcast_from_model و tff.aggregators.MeanFactory ، ما می توانیم دو جسم که می تواند به تصویب ایجاد broadcast_process و model_update_aggregation_factory agruments از tff.learning.build_federated_averaging_process برای ایجاد یک فدرال الگوریتم میانگین با یک الگوریتم فشرده سازی با اتلاف.

encoded_broadcast_process = (
    tff.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn, broadcast_encoder_fn))

mean_factory = tff.aggregators.MeanFactory(
    tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
    tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)

federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process=encoded_broadcast_process,
    model_update_aggregation_factory=mean_factory)

آموزش مجدد مدل

حالا بیایید الگوریتم جدید فدرال میانگین گیری را اجرا کنیم.

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

train(federated_averaging_process=federated_averaging_with_compression, 
      num_rounds=10,
      num_clients_per_round=10,
      summary_writer=summary_writer_for_compression)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit

دوباره TensorBoard را شروع کنید تا معیارهای آموزشی را بین دو اجرا مقایسه کنید.

همانطور که شما می توانید در Tensorboard بینید، یک کاهش قابل توجهی بین وجود دارد orginial و compression منحنی در broadcasted_bits و aggregated_bits توطئه در حالی که در loss و sparse_categorical_accuracy طرح دو منحنی بسیار مشابه هستند.

در نتیجه، ما یک الگوریتم فشرده‌سازی را پیاده‌سازی کردیم که می‌تواند عملکردی مشابه با الگوریتم میانگین فدرال اصلی داشته باشد در حالی که هزینه تبادل به طور قابل‌توجهی کاهش می‌یابد.

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9140eb5ef0>

تمرینات

برای پیاده سازی یک الگوریتم فشرده سازی سفارشی و اعمال آن در حلقه آموزشی، می توانید:

  1. پیاده سازی یک الگوریتم فشرده سازی جدید را به عنوان یک زیر کلاس از EncodingStageInterface و یا نوع کلی تر، AdaptiveEncodingStageInterface زیر این مثال .
  2. ساخت جدید خود را Encoder و تخصص آن را برای پخش مدل و یا متوسط به روز رسانی مدل .
  3. استفاده از آن اشیاء برای ساخت کل محاسبات آموزش .

سوالات بالقوه با ارزش تحقیق باز عبارتند از: کمی سازی غیر یکنواخت، فشرده سازی بدون تلفات مانند کدگذاری هافمن، و مکانیسم هایی برای انطباق فشرده سازی بر اساس اطلاعات دوره های آموزشی قبلی.

مطالب خواندنی توصیه شده: