کار با ClientData tff.

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

مفهوم مجموعه داده ای که توسط مشتریان (به عنوان مثال کاربران) کلید خورده است ، برای محاسبه فدرال مانند مدل TFF ضروری است. TFF فراهم می کند رابط tff.simulation.datasets.ClientData به انتزاعی بیش از این مفهوم، و مجموعه داده که میزبان TFF ( استک اورفلو ، شکسپیر ، emnist ، cifar100 و gldv2 ) تمام پیاده سازی این رابط.

اگر شما در حال کار بر یادگیری فدرال با مجموعه داده های خود را، TFF شدت تشویق می کند به شما یا اجرای ClientData رابط یا استفاده از یکی از توابع کمکی TFF به تولید یک ClientData که نشان دهنده داده های خود را بر روی دیسک، به عنوان مثال tff.simulation.datasets.ClientData.from_clients_and_fn به

همانطور که بسیاری از نمونه های پایان به پایان TFF با شروع ClientData اشیاء، اجرای ClientData رابط با مجموعه داده های سفارشی خود را آن را به spelunk آسان تر از طریق کد موجود نوشته شده با TFF خواهد شد. علاوه بر این، tf.data.Datasets که ClientData سازه می تواند بیش از تکرار به طور مستقیم به عملکرد ساختارهای numpy آرایه، بنابراین ClientData اشیاء را می توان با هر چارچوب ML بر اساس پایتون قبل از حرکت به TFF استفاده می شود.

اگر قصد دارید شبیه سازی های خود را در بسیاری از ماشین ها افزایش دهید یا آنها را به کار گیرید ، می توانید زندگی خود را آسان تر کنید. در زیر ما از طریق چند از راه های ما می توانید استفاده کنید راه رفتن ClientData و TFF به مقیاس کوچک تکرار به مقیاس بزرگ آزمایش به تجربه تولید استقرار ما را به عنوان که ممکن است صاف.

از کدام الگو برای انتقال ClientData به TFF استفاده کنم؟

ما دو کاربرد از TFF مورد بحث قرار خواهد ClientData در عمق. اگر در هر یک از دو دسته زیر قرار دارید ، به وضوح یکی را بر دیگری ترجیح می دهید. اگر نه ، ممکن است برای انتخاب دقیق تر ، به درک دقیق تری از مزایا و معایب هریک نیاز داشته باشید.

  • من می خواهم در اسرع وقت روی یک دستگاه محلی تکرار کنم. من نیازی ندارم که بتوانم به راحتی از زمان اجرای توزیع شده TFF استفاده کنم.

    • شما می خواهید به تصویب tf.data.Datasets به TFF به طور مستقیم.
    • این اجازه می دهد تا شما را به برنامه آمرانه با tf.data.Dataset اشیاء، و پردازش آنها را خودسرانه.
    • انعطاف پذیری بیشتری نسبت به گزینه زیر ارائه می دهد. فشار آوردن منطق به مشتریان مستلزم این است که این منطق سریالی شود.
  • من می خواهم محاسبات فدراسیون خود را در زمان اجرا از راه دور TFF اجرا کنم ، یا قصد دارم به زودی این کار را انجام دهم.

    • در این حالت شما می خواهید ساخت مجموعه داده و پیش پردازش را برای مشتریان ترسیم کنید.
    • این نتایج در شما عبور سادگی یک لیست از client_ids به طور مستقیم به محاسبات فدرال خود را.
    • ایجاد مجموعه داده و پیش پردازش به مشتریان از ایجاد تنگناهای سریال جلوگیری می کند و عملکرد صدها تا هزاران مشتری را به میزان قابل توجهی افزایش می دهد.

تنظیم محیط منبع باز

وارد کردن بسته ها

دستکاری یک شی ClientData

بیایید با بارگیری و کاوش EMNIST TFF شروع ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:20<00:00, 8391642.15it/s]
2021-09-03 11:17:37.005231: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

بازرسی مجموعه داده اول توانید به ما بگویید که چه نوع از نمونه در می ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

توجه داشته باشید که بازده مجموعه داده collections.OrderedDict اشیاء است که pixels و label کلید، که در آن پیکسل یک تانسور با شکل است [28, 28] . فرض کنید ما به پهن ورودی ما به شکل [784] . یکی از راه های ممکن ما می توانیم این کار را انجام، می تواند اعمال یک تابع قبل از پردازش به ما ClientData شی.

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

ممکن است بخواهیم علاوه بر این ، پیش پردازش های پیچیده تر (و احتمالاً حالت دار) را انجام دهیم ، به عنوان مثال ، زدن مختلط.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

واسط با tff.Computation

حالا که ما می توانیم برخی دستکاری اساسی با انجام ClientData اشیاء، ما آماده به داده های خوراک به یک هستند tff.Computation . تعریف می کنیم tff.templates.IterativeProcess که پیاده سازی فدرال به طور متوسط ، اکتشاف و روش های مختلف انتقال آن داده است.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

قبل از شروع به کار با این IterativeProcess ، یک نظر در معناشناسی ClientData در نظم است. ClientData شیء نشان دهنده کل جمعیت در دسترس برای آموزش فدرال، که به طور کلی در دسترس نیست به محیط زیست اجرای یک سیستم FL تولید و خاص به شبیه سازی است. ClientData واقع به کاربر اجازه میدهد ظرفیت برای دور زدن محاسبات فدرال به طور کامل و به سادگی آموزش یک مدل در سمت سرور به طور معمول از طریق ClientData.create_tf_dataset_from_all_clients .

محیط شبیه سازی TFF ، محقق را در کنترل کامل حلقه بیرونی قرار می دهد. به طور خاص ، این امر دلالت بر ملاحظات موجود بودن مشتری ، خروج مشتری و غیره دارد که باید توسط کاربر یا اسکریپت درایور پایتون مورد بررسی قرار گیرد. یک نفر می تواند برای مثال مدل ترک تحصیل مشتری با تنظیم توزیع نمونه خود را بیش از ClientData's client_ids به طوری که کاربران با داده ها (و نسبت به دیگر در حال اجرا محاسبات محلی) خواهد بود که با احتمال کمتر انتخاب شده است.

با این حال ، در یک سیستم فدراسیون واقعی ، مشتریان نمی توانند به طور صریح توسط مربی مدل انتخاب شوند. انتخاب مشتریان به سیستمی واگذار می شود که محاسبه فدرال را اجرا می کند.

عبور tf.data.Datasets به طور مستقیم به TFF

یکی از گزینه های ما برای واسط بین دارند ClientData و IterativeProcess که ساخت است tf.data.Datasets در پایتون، و عبور این مجموعه داده ها به TFF.

توجه داشته باشید که اگر ما با استفاده پیش پردازش ما ClientData مجموعه داده های عملکرد ما از نوع مناسب انتظار می رود توسط مدل ما در بالا تعریف شده است.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:60: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:60: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 3.1523890495300293, round time 4.611936569213867
loss 3.084413528442383, round time 0.512113094329834
loss 2.785008192062378, round time 0.5057694911956787
loss 3.0030126571655273, round time 0.5013787746429443
loss 2.6526098251342773, round time 0.49185681343078613

اگر ما به این مسیر، با این حال، ما قادر به بدیهی به شبیه سازی multimachine حرکت می کند. مجموعه داده های ما در زمان اجرا TensorFlow محلی ساخت می تواند دولت را از محیط اطراف پایتون ضبط، و شکست در ترتیب و یا deserialization زمانی که آنها به دولت مرجع است که دیگر به آنها در دسترس نیست تلاش. این می تواند به عنوان مثال در خطا مرموز از TensorFlow را آشکار tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

نقشه برداری از ساخت و ساز و پیش پردازش بر روی مشتریان

برای جلوگیری از این مسئله، TFF توصیه کاربران آن به در نظر گرفتن مجموعه داده نمونه و پیش پردازش به عنوان چیزی که به صورت محلی بر هر مشتری اتفاق می افتد، و برای استفاده از یاران TFF یا federated_map به صراحت این کد از پیش پردازش در هر اجرا می شود.

از نظر مفهومی ، دلیل ترجیح این امر واضح است: در زمان اجرای محلی TFF ، مشتریان فقط "به طور تصادفی" به محیط جهانی پایتون دسترسی دارند زیرا این واقعیت وجود دارد که کل ارکستراسیون فدرال روی یک دستگاه واحد انجام می شود. در این مرحله شایان ذکر است که تفکر مشابه باعث ایجاد فلسفه کاربردی TFF در سراسر بستر و همیشه سریالیزه می شود.

TFF باعث می شود چنین ساده تغییر از طریق ClientData's ویژگی dataset_computation ، یک tff.Computation که طول می کشد client_id و ارتباط گرداند tf.data.Dataset .

توجه داشته باشید که preprocess به سادگی با این نسخهها کار dataset_computation ؛ dataset_computation ویژگی از پیش پردازش ClientData شامل کل خط لوله پردازش ما فقط تعریف می شود:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

ما می توانیم استناد dataset_computation و دریافت مجموعه داده مشتاق در زمان اجرا پایتون، اما قدرت واقعی این رویکرد اعمال می شود زمانی که ما با یک روند تکراری و یا محاسبات دیگر برای جلوگیری از تحقق این مجموعه داده ها در زمان اجرا مشتاق جهانی در همه آهنگسازی. TFF یک تابع کمکی فراهم می کند tff.simulation.compose_dataset_computation_with_iterative_process است که می تواند مورد استفاده قرار گیرد به دقیقا این است.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

هر دو این tff.templates.IterativeProcesses و یکی در بالا اجرا به همان شیوه. اما سابق می پذیرد مجموعه داده مشتری پیش پردازش، و دومی رشته به نمایندگی شناسه مشتری، دست زدن به هر دو ساخت و ساز مجموعه داده ها و پیش پردازش در بدن خود می پذیرد - در واقع state را می توان بین این دو به تصویب رسید.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 3.003934621810913, round time 1.6643006801605225
loss 2.8357160091400146, round time 0.6825358867645264
loss 2.648752450942993, round time 0.5224685668945312
loss 2.6852502822875977, round time 0.5058529376983643
loss 2.8946309089660645, round time 0.5055575370788574

مقیاس بندی برای تعداد زیادی از مشتریان

trainer_accepting_ids می توانید بلافاصله در زمان اجرا multimachine TFF استفاده می شود، و اجتناب تحقق tf.data.Datasets و کنترل (و در نتیجه serialize کردن آنها و ارسال آنها را به کارگران).

این امر به طور قابل توجهی شبیه سازی های توزیع شده را سرعت می بخشد ، به ویژه با تعداد زیادی مشتری ، و تجمع متوسط ​​را برای جلوگیری از سریال سازی/سرریز زدایی مشابه در سربار امکان پذیر می کند.

deepdive اختیاری: ترکیب منطقی پیش پردازش در TFF

TFF برای ترکیب بندی از ابتدا طراحی شده است. نوع ترکیباتی که به تازگی توسط یاور TFF اجرا شده است ، کاملاً در کنترل ما کاربران است. ما می توانیم به صورت دستی دارند نوشتن محاسبات پیش پردازش ما فقط با تعریف مربی خود next کاملا به سادگی:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

در واقع ، این همان چیزی است که مددکاری که ما از آن استفاده می کنیم در زیر کاپوت انجام می دهد (به علاوه انجام بررسی و دستکاری مناسب). ما حتی می تواند از همین منطق را ابراز کرده اند کمی متفاوت، توسط serialize کردن preprocess_and_shuffle به یک tff.Computation ، و تجزیه federated_map به یک مرحله که ساخت مجموعه داده سازمان ملل متحد پیش پردازش و دیگری اجرا می شود که preprocess_and_shuffle در هر مشتری.

ما می توانیم تأیید کنیم که این مسیر دستی بیشتر منجر به محاسباتی با امضای نوع مشابه راهنمای TFF (نام پارامترهای modulo) می شود:

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)