پیاده سازی مجموعه های سفارشی

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

در این آموزش، ما اصول طراحی پشت توضیح tff.aggregators ماژول و بهترین روش برای پیاده سازی تجمع سفارشی از ارزش ها از مشتریان به سرور.

پیش نیازها. این آموزش فرض شما در حال حاضر با مفاهیم اولیه آشنا فدرال هسته مانند جایگاه ( tff.SERVER ، tff.CLIENTS )، چگونه TFF نشان دهنده محاسبات ( tff.tf_computation ، tff.federated_computation ) و امضا نوع خود.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

خلاصه طراحی

در TFF، "تجمع" اشاره به حرکت یک مجموعه ای از ارزش در tff.CLIENTS برای تولید یک ارزش کل از همان نوع در tff.SERVER . به این معنی که هر مقدار مشتری جداگانه در دسترس نباشد. به عنوان مثال در یادگیری فدرال ، به روزرسانی مدل مشتری به طور متوسط ​​برای دریافت یک به روزرسانی مدل کلی برای اعمال در مدل جهانی روی سرور ، انجام می شود.

علاوه بر اپراتورهای رسیدن به این هدف مانند tff.federated_sum ، TFF فراهم می کند tff.templates.AggregationProcess (یک فرآیند stateful به ) که رسمی تعریف امضای نوع برای محاسبات تجمع پس از آن می توانید به اشکال پیچیده تر از یک جمع ساده را تعمیم دهند.

اجزای اصلی از tff.aggregators ماژول کارخانه ایجاد می AggregationProcess ، که طراحی شده اند به بلوک های ساختمان به طور کلی مفید و جایگزین از TFF در دو جنبه:

  1. محاسبات پارامتری تجمع یک بلوک ساختمان مستقل است که می تواند به دیگر ماژول های TFF به کار طراحی شده با وصل شده است tff.aggregators استفاده از پارامترها در تجمع لازم خود را.

مثال:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. ترکیب تجمع. یک بلوک ساختمان تجمیع می تواند با سایر اجزای سازنده تجمع ترکیب شود تا مجموعه های مرکب پیچیده تری ایجاد کند.

مثال:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

بقیه این آموزش نحوه دستیابی به این دو هدف را توضیح می دهد.

فرایند تجمع

ما برای اولین بار به طور خلاصه tff.templates.AggregationProcess ، و با الگوی کارخانه برای ایجاد آن دنبال کنید.

tff.templates.AggregationProcess یک IS tff.templates.MeasuredProcess با نوع امضا مشخص شده برای تجمع. به طور خاص، initialize و next توابع امضا نوع زیر:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

دولت (از نوع state_type ) باید در سرور قرار می گیرد. next تابع به عنوان آرگومان ورودی دولت و یک مقدار به جمع گردد (از نوع طول می کشد value_type ) قرار داده شده در مشتریان. * یعنی اختیاری دیگر آرگومان ورودی، وزن مثلا در یک میانگین موزون. این یک شیء وضعیت به روز شده ، مقدار کلی همان نوع قرار داده شده در سرور و برخی اندازه گیری ها را برمی گرداند.

توجه داشته باشید که هر دو دولت به بین اعدام منتقل می شود next تابع، و اندازه گیری گزارش در نظر گرفته شده برای گزارش هر گونه اطلاعات بسته به اعدام خاص از next تابع، ممکن است خالی باشد. با این وجود ، آنها باید به طور صریح مشخص شوند تا سایر قسمتهای TFF قرارداد واضحی را دنبال کنند.

دیگر ماژول های TFF، به عنوان مثال به روز رسانی مدل در tff.learning ، انتظار می رود که با استفاده از tff.templates.AggregationProcess ، استفاده از پارامترها چگونه ارزش جمع می شوند. با این حال ، اینکه ارزشها دقیقاً چگونه جمع شده اند و نوع امضاها چیست ، بستگی به جزئیات دیگر مدل در حال آموزش و الگوریتم یادگیری برای انجام آن دارد.

به تجمع مستقل از جنبه های دیگری از محاسبات، ما استفاده از الگوی کارخانه - ما مناسب ایجاد tff.templates.AggregationProcess پس از آن که امضا نوع مربوط به اشیاء به جمع گردد در دسترس هستند، با استناد به create روش از کارخانه. بنابراین ، رسیدگی مستقیم به فرآیند تجمیع فقط برای نویسندگان کتابخانه ، که مسئول این ایجاد هستند ، مورد نیاز است.

کارخانه های فرآیند تجمع

دو کلاس کارخانه پایه انتزاعی برای تجمع بدون وزن و وزن وجود دارد. آنها create روش نوع امضا از ارزش طول می کشد تا جمع شود و یک گرداند tff.templates.AggregationProcess برای تجمع چنین ارزش.

فرآیند ایجاد شده توسط tff.aggregators.UnweightedAggregationFactory (1) دولت در سرور و (2) مقدار از نوع مشخص شده: دو آرگومان ورودی طول می کشد value_type .

یک پیاده سازی مثال است tff.aggregators.SumFactory .

فرآیند ایجاد شده توسط tff.aggregators.WeightedAggregationFactory سه آرگومان ورودی طول می کشد: (1) دولت در سرور، (2) ارزش نوع مشخص value_type و (3) وزن از نوع weight_type ، به عنوان توسط کاربران این کارخانه مشخص شده هنگامی که با استناد به آن create روش.

یک پیاده سازی مثال است tff.aggregators.MeanFactory که محاسبه میانگین موزون.

الگوی کارخانه نحوه دستیابی به اولین هدفی است که در بالا ذکر شد. که تجمع یک بلوک ساختمان مستقل است. برای مثال ، هنگام تغییر متغیرهای مدل قابل آموزش ، لزوماً نیازی به تغییر یک مجموعه پیچیده نیست. کارخانه به نمایندگی از آن خواهد شد با یک نوع امضای مختلف استناد به زمانی با استفاده از روش هایی مانند استفاده tff.learning.build_federated_averaging_process .

ترکیبات

به یاد بیاورید که یک فرآیند تجمیع کلی می تواند (الف) برخی از پیش پردازش مقادیر را در کلاینت ها ، (ب) حرکت مقادیر از سرویس گیرنده به سرور ، و (ج) برخی از پردازش های بعد از ارزش جمع آوری شده در سرور را شامل شود. هدف دوم که در بالا ذکر، ترکیب تجمع، متوجه شدم داخل tff.aggregators ماژول های ساختار اجرای کارخانه تجمع به طوری که قسمت (ب) را می توان به کارخانه تجمع دیگر واگذار شده است.

بجای پیاده سازی تمام منطق لازم در یک کلاس کارخانه ، پیاده سازی ها به طور پیش فرض بر یک جنبه واحد مربوط به تجمع متمرکز شده اند. در صورت نیاز ، این الگو ما را قادر می سازد تا بلوک های ساختمان را یکی یکی جایگزین کنیم.

به عنوان مثال وزن است tff.aggregators.MeanFactory . پیاده سازی آن مقادیر و وزنهای ارائه شده را برای کلاینتها ضرب می کند ، سپس مقادیر وزنی و وزنی را به طور مستقل جمع می کند و سپس مجموع مقادیر وزنی را بر مجموع وزنهای سرور تقسیم می کند. جای پیاده کردن جمع به طور مستقیم با استفاده از tff.federated_sum اپراتورها، جمع به دو نمونه از واگذار tff.aggregators.SumFactory .

چنین ساختاری این امکان را می دهد که دو جمع بندی پیش فرض با کارخانه های مختلف جایگزین شوند ، که مجموع را به طور متفاوتی درک می کنند. برای مثال، یک tff.aggregators.SecureSumFactory ، و یا یک اجرای سفارشی از tff.aggregators.UnweightedAggregationFactory . در مقابل، زمان، tff.aggregators.MeanFactory می تواند خود را یک تجمع داخلی یکی دیگر از کارخانه مانند tff.aggregators.clipping_factory ، اگر مقادیر به قبل متوسط کوتاه می شود.

مشاهده قبلی تنظیم واحدهای برای یادگیری توصیه می شود آموزش برای استفاده receommended از مکانیسم ترکیب استفاده در کارخانه های موجود در tff.aggregators ماژول.

بهترین شیوه ها با مثال

ما می رویم به نشان دادن tff.aggregators مفاهیم را با جزئیات با اجرای یک کار مثال ساده، و آن را به تدریج کلی تر. راه دیگر برای یادگیری این است که به عملکرد کارخانه های موجود نگاه کنید.

import collections
import tensorflow as tf
import tensorflow_federated as tff

به جای جمع value ، وظیفه مثال است به طور خلاصه value * 2.0 و سپس با تقسیم مجموع 2.0 . نتیجه تجمع بنابراین ریاضی به طور مستقیم جمع معادل value ، و می تواند به عنوان مشتمل بر سه بخش فکر کردم: (1) پوسته پوسته شدن در مشتریان (2) جمع در سراسر مشتریان (3) unscaling در سرور.

پس از طراحی در بالا توضیح داده، منطق به عنوان یک زیر کلاس از اجرا tff.aggregators.UnweightedAggregationFactory ، که ایجاد مناسب tff.templates.AggregationProcess هنگامی که یک داده value_type به مجموع:

حداقل اجرا

برای مثال مثال ، محاسبات لازم همیشه یکسان است ، بنابراین نیازی به استفاده از حالت نیست. به این ترتیب خالی، و به عنوان نشان tff.federated_value((), tff.SERVER) . در حال حاضر همین امر در مورد اندازه گیری ها صادق است.

بنابراین حداقل اجرای کار به شرح زیر است:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

اینکه آیا همه چیز مطابق انتظار کار می کند را می توان با کد زیر تأیید کرد:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

دولتمندی و اندازه گیری ها

حالت پذیری به طور گسترده ای در TFF برای نشان دادن محاسباتی که انتظار می رود به صورت تکراری اجرا شده و با هر تکرار تغییر کند ، استفاده می شود. به عنوان مثال ، وضعیت محاسبه یادگیری شامل وزن مدل یاد گرفته شده است.

برای نشان دادن نحوه استفاده از حالت در محاسبه تجمع ، ما کار مثال را اصلاح می کنیم. به جای ضرب value های 2.0 ، ما تکثیر آن توسط شاخص تکرار - تعداد دفعاتی تجمع اجرا شده است.

برای انجام این کار ، ما نیاز به راهی برای پیگیری شاخص تکرار داریم که از طریق مفهوم حالت به دست می آید. در initialize_fn ، به جای ایجاد یک دولت خالی، ما مقداردهی اولیه دولت به صفر عددی. سپس، دولت را می توان در استفاده next_fn (1) افزایش شده توسط: در سه مرحله 1.0 ، (2) استفاده کنید و ضرب value ، و (3) بازگشت به دولت به روز شده است.

به محض این که انجام می شود، شما ممکن است توجه داشته باشید: اما دقیقا همان کد بالا را می توان به منظور بررسی همه آثار به عنوان انتظار می رود استفاده می شود. چگونه می توانم بفهمم که چیزی واقعاً تغییر کرده است؟

سؤال خوبی بود! اینجاست که مفهوم اندازه گیری مفید می شود. به طور کلی، اندازه گیری می توانید هر مقدار مربوط به اعدام تنها از گزارش next تابع، که می تواند برای نظارت بر استفاده می شود. در این مورد، می توان آن را summed_value از مثال قبلی است. یعنی مقدار قبل از مرحله "غیر مقیاس بندی" که باید به شاخص تکرار بستگی داشته باشد. باز هم ، این لزوماً در عمل مفید نیست ، اما مکانیسم مربوطه را نشان می دهد.

بنابراین پاسخ دولتمندانه به وظیفه به شرح زیر است:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

توجه داشته باشید که state که به می آید next_fn به عنوان ورودی است که در سرور قرار می گیرد. به منظور استفاده از آن را در مشتریان، آن را برای اولین بار باید منتقل شود، که به دست آورد با استفاده از tff.federated_broadcast اپراتور.

به منظور بررسی همه آثار به عنوان انتظار می رود، ما در حال حاضر می توانید در گزارش نگاه measurements ، که باید با هر دور از اعدام های مختلف می شود، حتی اگر اجرا با همان client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

انواع ساختار یافته

وزنه های مدل مدلی که در یادگیری فدرال آموزش دیده است معمولاً به عنوان یک مجموعه تنسور و نه یک تنسور واحد نشان داده می شود. در TFF، این است که به عنوان نمایندگی tff.StructType و کارخانه تجمع به طور کلی مفید باید قادر به پذیرفتن انواع ساختار.

با این حال، در مثال های بالا، ما تنها با یک کار tff.TensorType شی. اگر ما سعی به استفاده از کارخانه قبلی برای ایجاد روند تجمع با tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) ، ما یک خطای عجیب و غریب به دلیل TensorFlow سعی خواهد کرد تا ضرب tf.Tensor و یک list .

مشکل این است که به جای ضرب ساختار تانسورها توسط یک ثابت، ما نیاز به ضرب هر تانسور در ساختار توسط یک ثابت. راه حل معمول برای این مشکل است به استفاده از tf.nest در داخل ماژول از ایجاد tff.tf_computation است.

نسخه قبلی ExampleTaskFactory سازگار با انواع ساختار در نتیجه به نظر می رسد شرح زیر است:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

این مثال الگویی را برجسته می کند که رعایت آن هنگام ساختار کد TFF مفید خواهد بود. هنگامی که با عملیات بسیار ساده خرید و فروش نیست، کد خوانا تر می شود زمانی که tff.tf_computation بازدید کنندگان که به عنوان بلوک های ساختمان استفاده می شود در داخل یک tff.federated_computation در یک مکان جداگانه ایجاد شده است. داخل tff.federated_computation ، این بلوک های ساختمانی تنها با استفاده از اپراتورهای ذاتی متصل می شود.

برای تأیید صحت عملکرد مطابق انتظار:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

تجمعات داخلی

گام نهایی این است که به صورت اختیاری تفویض تجمیع واقعی به دیگر کارخانه ها امکان پذیر می شود ، تا امکان ترکیب آسان تکنیک های مختلف تجمع فراهم شود.

این است که با ایجاد یک اختیاری دست inner_factory استدلال در سازنده ما ExampleTaskFactory . اگر مشخص نشده است، tff.aggregators.SumFactory استفاده شده است، که اعمال tff.federated_sum اپراتور به طور مستقیم در بخش قبلی استفاده می شود.

هنگامی که create نامیده می شود، ما برای اولین بار می توانید تماس بگیرید create از inner_factory به ایجاد روند تجمع داخلی با همان value_type .

دولت از فرآیند ما بازگردانده شده توسط initialize_fn دولت ایجاد شده توسط "این" روند، و دولت از روند داخلی فقط ایجاد: یک ترکیب از دو بخش است.

اجرای next_fn تفاوت که تجمع واقعی به واگذار next تابعی از فرآیند درونی، و در چگونه خروجی نهایی تشکیل شده است. دولت دوباره از "این" و دولت "درونی" تشکیل شده است، و اندازه گیری به شیوه ای مشابه به عنوان یک تشکیل شده OrderedDict .

موارد زیر پیاده سازی چنین الگویی است.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

هنگامی که تفویض به inner_process.next تابع، ساختار بازگشت ما یک است tff.templates.MeasuredProcessOutput ، با همان سه زمینه - state ، result و measurements . هنگام ایجاد ساختار بازگشت کلی از روند تجمع تشکیل شده، state و measurements زمینه ها باید به طور کلی تشکیل شده و با هم بازگشت. در مقابل، result مربوط زمینه به ارزش بودن جمع و به جای "جریان از طریق" تجمع تشکیل شده است.

state شی باید به عنوان یک جزئیات اجرای کارخانه دیده می شود، و در نتیجه ترکیب می تواند از هر ساختار باشد. با این حال، measurements مربوط به ارزش به کاربران در برخی از نقطه گزارش شود. بنابراین، ما به استفاده توصیه OrderedDict ، با نامگذاری مرکب به طوری که این امر می تواند روشن که در آن در یک ترکیب یک گزارش متریک می آید.

همچنین توجه داشته باشید که استفاده از tff.federated_zip اپراتور. state شی contolled توسط فرایند ایجاد شود باید یک tff.FederatedType . اگر ما به جای بازگشته بود (this_state, inner_state) در initialize_fn ، امضا نوع بازگشت خود را خواهد بود tff.StructType حاوی تایی 2 از tff.FederatedType است. استفاده از tff.federated_zip "آسانسور" در tff.FederatedType به سطح بالا است. این به طور مشابه در استفاده next_fn هنگام آماده سازی دولت و اندازه گیری می شود برگشت.

در نهایت ، ما می توانیم ببینیم چگونه می توان از این با تجمع داخلی پیش فرض استفاده کرد:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... و با تجمع درونی متفاوت. به عنوان مثال، ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

خلاصه

در این آموزش ، ما بهترین شیوه هایی را که باید دنبال کنید تا بتوانید یک بلوک ساختمانی تجمع عمومی را ایجاد کنید ، به عنوان یک کارخانه تجمع نشان داده شده است. عمومیت از طریق قصد طراحی به دو طریق حاصل می شود:

  1. محاسبات پارامتری تجمع یک بلوک ساختمان مستقل است که می تواند به دیگر ماژول های TFF به کار طراحی شده با وصل شده است tff.aggregators استفاده از پارامترها در تجمع لازم خود را، مانند tff.learning.build_federated_averaging_process .
  2. ترکیب تجمع. یک بلوک ساختمان تجمیع می تواند با سایر اجزای سازنده تجمع ترکیب شود تا مجموعه های مرکب پیچیده تری ایجاد کند.