Bergabunglah dengan TensorFlow di Google I/O, 11-12 Mei Daftar sekarang

Menerapkan Agregasi Kustom

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

Dalam tutorial ini, kami akan menjelaskan prinsip-prinsip desain belakang tff.aggregators modul dan praktik terbaik untuk menerapkan agregasi kustom dari nilai-nilai dari klien ke server.

Prasyarat. Tutorial ini mengasumsikan Anda sudah akrab dengan konsep dasar Federasi Inti seperti penempatan ( tff.SERVER , tff.CLIENTS ), bagaimana TFF merupakan perhitungan ( tff.tf_computation , tff.federated_computation ) dan jenis tanda tangan mereka.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Ringkasan desain

Dalam TFF, "agregasi" mengacu pada pergerakan seperangkat nilai-nilai pada tff.CLIENTS untuk menghasilkan nilai agregat dari jenis yang sama pada tff.SERVER . Artinya, setiap nilai klien individu tidak perlu tersedia. Misalnya dalam pembelajaran gabungan, pembaruan model klien dirata-ratakan untuk mendapatkan pembaruan model agregat untuk diterapkan ke model global di server.

Selain operator mencapai tujuan ini seperti tff.federated_sum , TFF menyediakan tff.templates.AggregationProcess (a proses stateful ) yang meresmikan tanda tangan jenis untuk agregasi perhitungan sehingga dapat menggeneralisasi ke bentuk yang lebih kompleks daripada jumlah yang sederhana.

Komponen utama dari tff.aggregators modul adalah pabrik-pabrik untuk penciptaan AggregationProcess , yang dirancang untuk secara umum berguna dan replacable blok bangunan dari TFF dalam dua aspek:

  1. Perhitungan parameter. Agregasi merupakan blok bangunan independen yang dapat dipasang ke modul TFF lain yang dirancang untuk bekerja dengan tff.aggregators untuk parameterisasi agregasi diperlukan mereka.

Contoh:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Komposisi agregasi. Sebuah blok bangunan agregasi dapat disusun dengan blok bangunan agregasi lainnya untuk membuat agregasi komposit yang lebih kompleks.

Contoh:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Sisa dari tutorial ini menjelaskan bagaimana kedua tujuan ini tercapai.

Proses agregasi

Kami pertama merangkum tff.templates.AggregationProcess , dan ikuti dengan pola pabrik untuk penciptaan.

The tff.templates.AggregationProcess adalah tff.templates.MeasuredProcess dengan jenis tanda tangan yang ditentukan untuk agregasi. Secara khusus, initialize dan next fungsi memiliki tipe berikut tanda tangan:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Negara (tipe state_type ) harus ditempatkan di server yang. The next fungsi mengambil sebagai argumen masukan negara dan nilai yang akan digabungkan (tipe value_type ) ditempatkan di klien. The * berarti opsional argumen input lainnya, misalnya bobot dalam rata-rata tertimbang. Ini mengembalikan objek status yang diperbarui, nilai agregat dari jenis yang sama yang ditempatkan di server, dan beberapa pengukuran.

Perhatikan bahwa kedua negara akan melewati antara eksekusi dari next fungsi, dan pengukuran dilaporkan dimaksudkan untuk melaporkan informasi tergantung pada pelaksanaan tertentu dari next fungsi, mungkin kosong. Namun demikian, mereka harus secara eksplisit ditentukan agar bagian lain dari TFF memiliki kontrak yang jelas untuk diikuti.

Modul TFF lain, misalnya update model dalam tff.learning , diharapkan untuk menggunakan tff.templates.AggregationProcess untuk parameterisasi bagaimana nilai-nilai yang dikumpulkan. Namun, apa sebenarnya nilai yang dikumpulkan dan apa jenis tanda tangannya, tergantung pada detail lain dari model yang dilatih dan algoritme pembelajaran yang digunakan untuk melakukannya.

Untuk membuat agregasi independen dari aspek-aspek lain dari perhitungan, kita menggunakan pola pabrik - kami membuat sesuai tff.templates.AggregationProcess sekali jenis tanda tangan yang relevan dari objek yang akan dikumpulkan yang tersedia, dengan menerapkan create metode pabrik. Oleh karena itu, penanganan langsung dari proses agregasi hanya diperlukan untuk penulis perpustakaan, yang bertanggung jawab atas pembuatan ini.

Pabrik proses agregasi

Ada dua kelas pabrik dasar abstrak untuk agregasi tak tertimbang dan tertimbang. Mereka create metode mengambil tanda tangan jenis nilai yang akan dikumpulkan dan mengembalikan tff.templates.AggregationProcess untuk agregasi dari nilai-nilai tersebut.

Proses yang diciptakan oleh tff.aggregators.UnweightedAggregationFactory membutuhkan dua argumen input: (1) negara pada server dan nilai tipe tertentu (2) value_type .

Contoh implementasi adalah tff.aggregators.SumFactory .

Proses yang diciptakan oleh tff.aggregators.WeightedAggregationFactory mengambil tiga argumen input: (1) negara pada Server, nilai (2) jenis tertentu value_type dan (3) berat jenis weight_type , sebagaimana ditentukan oleh pengguna pabrik saat menjalankan nya create metode.

Contoh implementasi adalah tff.aggregators.MeanFactory yang menghitung rata-rata tertimbang.

Pola pabrik adalah bagaimana kita mencapai tujuan pertama yang disebutkan di atas; bahwa agregasi adalah blok bangunan independen. Misalnya, ketika mengubah variabel model mana yang dapat dilatih, agregasi kompleks tidak perlu diubah; pabrik yang mewakili itu akan dipanggil dengan jenis tanda tangan yang berbeda bila digunakan dengan metode seperti tff.learning.build_federated_averaging_process .

Komposisi

Ingat bahwa proses agregasi umum dapat merangkum (a) beberapa prapemrosesan nilai di klien, (b) pergerakan nilai dari klien ke server, dan (c) beberapa pascapemrosesan nilai agregat di server. Gol kedua yang disebutkan di atas, komposisi agregasi, diwujudkan dalam tff.aggregators modul dengan penataan pelaksanaan pabrik agregasi seperti bagian (b) dapat didelegasikan kepada pabrik agregasi lain.

Daripada menerapkan semua logika yang diperlukan dalam satu kelas pabrik, implementasi secara default difokuskan pada satu aspek yang relevan untuk agregasi. Saat dibutuhkan, pola ini kemudian memungkinkan kita untuk mengganti blok bangunan satu per satu.

Contohnya adalah tertimbang tff.aggregators.MeanFactory . Implementasinya mengalikan nilai dan bobot yang diberikan pada klien, lalu menjumlahkan nilai bobot dan bobot secara independen, lalu membagi jumlah nilai bobot dengan jumlah bobot di server. Alih-alih menerapkan penjumlahan dengan langsung menggunakan tff.federated_sum operator, penjumlahan tersebut didelegasikan kepada dua contoh tff.aggregators.SumFactory .

Struktur seperti itu memungkinkan dua penjumlahan default diganti oleh pabrik yang berbeda, yang merealisasikan penjumlahan secara berbeda. Misalnya, tff.aggregators.SecureSumFactory , atau pelaksanaan adat dari tff.aggregators.UnweightedAggregationFactory . Sebaliknya, waktu, tff.aggregators.MeanFactory bisa sendiri menjadi agregasi dalam dari pabrik lain seperti tff.aggregators.clipping_factory , jika nilai-nilai yang harus dipotong sebelum averaging.

Lihat sebelumnya Tuning direkomendasikan agregasi untuk belajar tutorial untuk penggunaan receommended dari mekanisme komposisi menggunakan pabrik-pabrik yang ada di tff.aggregators modul.

Praktik terbaik dengan contoh

Kita akan menggambarkan tff.aggregators konsep secara detail dengan menerapkan contoh tugas sederhana, dan membuatnya semakin lebih umum. Cara lain untuk belajar adalah dengan melihat implementasi pabrik-pabrik yang ada.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Alih-alih menjumlahkan value , contoh tugas adalah untuk jumlah value * 2.0 dan kemudian membagi jumlah tersebut dengan 2.0 . Hasil agregasi demikian matematis setara dengan langsung menjumlahkan value , dan bisa dianggap sebagai terdiri dari tiga bagian: (1) skala pada klien (2) menjumlahkan seluruh klien (3) unscaling di Server.

Berikut desain dijelaskan di atas, logika akan dilaksanakan sebagai subclass dari tff.aggregators.UnweightedAggregationFactory , yang menciptakan sesuai tff.templates.AggregationProcess ketika diberi value_type ke agregat:

Implementasi minimal

Untuk tugas contoh, perhitungan yang diperlukan selalu sama, jadi tidak perlu menggunakan status. Demikian kosong, dan direpresentasikan sebagai tff.federated_value((), tff.SERVER) . Hal yang sama berlaku untuk pengukuran, untuk saat ini.

Implementasi minimal dari tugas tersebut adalah sebagai berikut:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Apakah semuanya berfungsi seperti yang diharapkan dapat diverifikasi dengan kode berikut:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Statefulness dan pengukuran

Statefulness secara luas digunakan dalam TFF untuk mewakili perhitungan yang diharapkan dieksekusi secara iteratif dan berubah dengan setiap iterasi. Misalnya, keadaan komputasi pembelajaran berisi bobot model yang dipelajari.

Untuk mengilustrasikan cara menggunakan status dalam komputasi agregasi, kami memodifikasi tugas contoh. Alih-alih mengalikan value dengan 2.0 , kita kalikan dengan indeks iterasi - jumlah kali agregasi telah dieksekusi.

Untuk melakukannya, kita memerlukan cara untuk melacak indeks iterasi, yang dicapai melalui konsep keadaan. Dalam initialize_fn , bukan menciptakan keadaan kosong, kita menginisialisasi negara menjadi nol skalar. Kemudian, negara dapat digunakan dalam next_fn dalam tiga langkah: (1) peningkatan oleh 1.0 , (2) gunakan untuk kalikan value , dan (3) kembali sebagai negara diperbarui baru.

Setelah ini dilakukan, Anda dapat mencatat: Tapi kode yang sama persis seperti di atas dapat digunakan untuk memverifikasi semua bekerja seperti yang diharapkan. Bagaimana saya tahu sesuatu telah benar-benar berubah?

Pertanyaan bagus! Di sinilah konsep pengukuran menjadi berguna. Secara umum, pengukuran dapat melaporkan nilai yang relevan untuk eksekusi tunggal dari next fungsi, yang dapat digunakan untuk memantau. Dalam hal ini, dapat menjadi summed_value dari contoh sebelumnya. Artinya, nilai sebelum langkah "unscaling", yang harus bergantung pada indeks iterasi. Sekali lagi, ini tidak selalu berguna dalam praktik, tetapi menggambarkan mekanisme yang relevan.

Jawaban stateful untuk tugas tersebut terlihat sebagai berikut:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Perhatikan bahwa state yang datang ke next_fn sebagai masukan ditempatkan di server yang. Dalam rangka untuk menggunakannya pada klien, pertama kali perlu dikomunikasikan, yang dicapai dengan menggunakan tff.federated_broadcast operator.

Untuk memverifikasi semua bekerja seperti yang diharapkan, kita sekarang dapat melihat dilaporkan measurements , yang harus berbeda dengan setiap putaran eksekusi, bahkan jika dijalankan dengan sama client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Tipe terstruktur

Bobot model dari model yang dilatih dalam pembelajaran federasi biasanya direpresentasikan sebagai kumpulan tensor, bukan tensor tunggal. Dalam TFF, ini diwakili sebagai tff.StructType dan pabrik agregasi biasanya berguna harus mampu menerima jenis terstruktur.

Namun, pada contoh di atas, kita hanya bekerja dengan tff.TensorType objek. Jika kita mencoba untuk menggunakan pabrik sebelumnya untuk membuat proses agregasi dengan tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , kita mendapatkan error aneh karena TensorFlow akan mencoba untuk memperbanyak suatu tf.Tensor dan list .

Masalahnya adalah bahwa alih-alih mengalikan struktur tensor dengan konstan, kita perlu kalikan masing-masing tensor dalam struktur dengan konstan. Solusi yang biasa untuk masalah ini adalah dengan menggunakan tf.nest di dalam modul yang dibuat tff.tf_computation s.

Versi sebelumnya ExampleTaskFactory kompatibel dengan jenis terstruktur sehingga terlihat sebagai berikut:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Contoh ini menyoroti pola yang mungkin berguna untuk diikuti saat menyusun kode TFF. Ketika tidak berurusan dengan operasi yang sangat sederhana, kode menjadi lebih mudah dibaca ketika tff.tf_computation s yang akan digunakan sebagai blok bangunan di dalam tff.federated_computation diciptakan di tempat terpisah. Bagian dalam tff.federated_computation , blok bangunan ini hanya terhubung menggunakan operator intrinsik.

Untuk memverifikasi itu berfungsi seperti yang diharapkan:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Agregasi dalam

Langkah terakhir adalah secara opsional memungkinkan pendelegasian agregasi aktual ke pabrik lain, untuk memungkinkan komposisi yang mudah dari teknik agregasi yang berbeda.

Hal ini dicapai dengan menciptakan opsional inner_factory argumen dalam konstruktor dari kami ExampleTaskFactory . Jika tidak ditentukan, tff.aggregators.SumFactory digunakan, yang menerapkan tff.federated_sum operator yang digunakan langsung pada bagian sebelumnya.

Ketika create disebut, pertama kita dapat memanggil create dari inner_factory untuk membuat proses agregasi batin dengan sama value_type .

Keadaan proses kami dikembalikan oleh initialize_fn adalah komposisi dari dua bagian: negara yang diciptakan oleh "ini" proses, dan negara dari proses batin yang baru saja dibuat.

Pelaksanaan next_fn berbeda dalam bahwa agregasi yang sebenarnya didelegasikan kepada next fungsi dari proses batin, dan bagaimana hasil akhir terdiri. Negara ini lagi terdiri dari "ini" dan "batin" negara, dan pengukuran terdiri dengan cara yang sama sebagai OrderedDict .

Berikut ini adalah implementasi dari pola tersebut.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Ketika mendelegasikan ke inner_process.next fungsi, struktur kembali kami dapatkan adalah tff.templates.MeasuredProcessOutput , dengan tiga bidang yang sama - state , result dan measurements . Ketika membuat struktur keseluruhan laba dari proses agregasi terdiri, para state dan measurements bidang harus umumnya terdiri dan kembali bersama-sama. Sebaliknya, result berkorespondensi lapangan untuk nilai yang dikumpulkan dan bukannya "mengalir melalui" agregasi terdiri.

The state objek harus dilihat sebagai detil implementasi pabrik, dan dengan demikian komposisi bisa menjadi struktur apapun. Namun, measurements sesuai dengan nilai-nilai yang akan dilaporkan kepada pengguna di beberapa titik. Oleh karena itu, kami sarankan untuk menggunakan OrderedDict , dengan terdiri penamaan sehingga akan jelas mana dalam komposisi tidak dilaporkan metrik berasal dari.

Perhatikan juga penggunaan tff.federated_zip operator. The state objek contolled oleh proses menciptakan harus tff.FederatedType . Jika kita telah bukannya kembali (this_state, inner_state) di initialize_fn , kembali jenis tanda tangan akan menjadi tff.StructType yang berisi 2-tupel dari tff.FederatedType s. Penggunaan tff.federated_zip "lift" yang tff.FederatedType ke tingkat atas. Ini sama digunakan dalam next_fn ketika mempersiapkan negara dan pengukuran untuk dikembalikan.

Akhirnya, kita dapat melihat bagaimana ini dapat digunakan dengan agregasi internal default:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... dan dengan agregasi dalam yang berbeda. Sebagai contoh, sebuah ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Ringkasan

Dalam tutorial ini, kami menjelaskan praktik terbaik yang harus diikuti untuk membuat blok penyusun agregasi tujuan umum, yang direpresentasikan sebagai pabrik agregasi. Keumuman datang melalui maksud desain dalam dua cara:

  1. Perhitungan parameter. Agregasi merupakan blok bangunan independen yang dapat dipasang ke modul TFF lain yang dirancang untuk bekerja dengan tff.aggregators untuk parameterisasi agregasi yang diperlukan, seperti tff.learning.build_federated_averaging_process .
  2. Komposisi agregasi. Sebuah blok bangunan agregasi dapat disusun dengan blok bangunan agregasi lainnya untuk membuat agregasi komposit yang lebih kompleks.