Özel Toplamaları Uygulama

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

Bu eğitimde, tasarım ardındaki ilkeleri açıklamak tff.aggregators modülüne ve sunucuya müşterilerinden gelen değerlerin özel toplanmasına ilgili en iyi uygulamaları.

Önkoşullar. Bu eğitimde zaten temel kavramlarına aşina varsayar Federe Çekirdek böyle yerleşimler (aynı tff.SERVER , tff.CLIENTS TFF hesaplamaları nasıl temsil), ( tff.tf_computation , tff.federated_computation ) ve bunların tip imzaları.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Tasarım özeti

TFF olarak, "birleştirme" ile ilgili bir dizi değer hareketi ifade eder tff.CLIENTS aynı tipte bir toplam değeri elde etmek için tff.SERVER . Yani, her bir bireysel müşteri değerinin mevcut olması gerekmez. Örneğin, federe öğrenimde, sunucudaki global modele uygulanacak bir toplu model güncellemesi elde etmek için istemci modeli güncellemelerinin ortalaması alınır.

Gibi bu amacı gerçekleştirmek operatörler ek olarak tff.federated_sum TFF içerir tff.templates.AggregationProcess (bir durum bilgisi işlem basit bir miktar daha karmaşık formlar genelleme böylece agregasyon hesaplama tipi imza formalizes).

Ana bileşenleri tff.aggregators oluşturulması için fabrikaları modülü AggregationProcess iki yönden TFF genellikle faydalı ve Değişebilir yapı blokları olarak tasarlanmıştır:

  1. Parametreli hesaplamalar. Toplama ile çalışmak üzere tasarlanmış diğer TFF modüllerine takılabilir bağımsız bir yapı taşıdır tff.aggregators onların gerekli toplanmasına parametreleştirmenin.

Örnek:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Toplama bileşimi. Bir toplama yapı taşı, daha karmaşık bileşik toplamalar oluşturmak için diğer toplama yapı taşlarıyla oluşturulabilir.

Örnek:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Bu öğreticinin geri kalanı, bu iki hedefe nasıl ulaşıldığını açıklar.

Toplama süreci

Biz ilk özetlemek tff.templates.AggregationProcess ve bunun oluşturulması için fabrika desenle izleyin.

tff.templates.AggregationProcess bir olduğunu tff.templates.MeasuredProcess toplama için belirtilen tip imzalarla. Özellikle, initialize ve next fonksiyonlar şu tip imzaları vardır:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

(Tip durumu state_type ) servis yerleştirilmelidir. next fonksiyonu, durum ve bir değer (tipi toplu için kullanılan giriş argüman olarak alır value_type müşteriler yerleştirilir). * Aracı bir ağırlıklı ortalama olarak, örneğin ağırlıkları için, diğer giriş bağımsız değişkeni isteğe. Güncellenmiş bir durum nesnesi, sunucuya yerleştirilen aynı türden toplu değer ve bazı ölçümler döndürür.

Devlet hem de çalıştırma arasında geçirilecek bu Not next işlev, ve rapor edilen ölçümler, belirli bir yürütme bağlı olarak herhangi bir bilgi vermek için amaçlanan next fonksiyonu, boş olabilir. Bununla birlikte, TFF'nin diğer bölümlerinin takip etmesi gereken net bir sözleşmeye sahip olmaları için açıkça belirtilmeleri gerekir.

Diğer TFF modülleri, örnek model güncellemelerini için tff.learning , kullanması beklenir tff.templates.AggregationProcess değerleri toplanır nasıl parametreleştirmenin. Ancak, toplanan değerlerin tam olarak ne olduğu ve tür imzalarının ne olduğu, eğitilen modelin diğer ayrıntılarına ve bunu yapmak için kullanılan öğrenme algoritmasına bağlıdır.

Hesaplamaların diğer yönlerinin toplanma bağımsız hale getirmek için, fabrika deseni kullanmak - biz uygun oluşturmak tff.templates.AggregationProcess nesnelerin ilgili tip imzaları çağırarak, toplanan mevcuttur edilecek bir kez create fabrikanın yöntemini. Bu nedenle, toplama işleminin doğrudan ele alınması, yalnızca bu oluşturmadan sorumlu olan kütüphane yazarları için gereklidir.

Toplama proses fabrikaları

Ağırlıksız ve ağırlıklı toplama için iki soyut temel fabrika sınıfı vardır. Bunların create yöntem toplu gereken değerin tipi imzalarını alır ve bir döner tff.templates.AggregationProcess bu değerlerin toplanması için.

Tarafından oluşturulan yöntem tff.aggregators.UnweightedAggregationFactory sunucusunda (1) durum ve belirtilen tipi (2) değeri: iki giriş argüman alır value_type .

Bir örnek uygulamasıdır tff.aggregators.SumFactory .

Tarafından oluşturulan yöntem tff.aggregators.WeightedAggregationFactory sunucusunda (1) durum, belirli bir tip (2) değeri: üç giriş argüman alır value_type ve tipi (3) ağırlığı weight_type fabrikanın kullanıcı tarafından belirtilen şekilde onun yürütmesini zaman, create yöntemi.

Bir örnek uygulamasıdır tff.aggregators.MeanFactory ağırlıklı ortalamasını hesaplar.

Fabrika modeli, yukarıda belirtilen ilk hedefe nasıl ulaştığımızdır; bu toplama bağımsız bir yapı taşıdır. Örneğin, hangi model değişkenlerinin eğitilebilir olduğunu değiştirirken, karmaşık bir toplamanın mutlaka değişmesi gerekmez; Bu tür bir yöntem ile kullanıldığında, bu temsil fabrika farklı bir türü imza ile çağrılır tff.learning.build_federated_averaging_process .

Kompozisyonlar

Genel bir toplama işleminin, (a) istemcilerdeki değerlerin bir miktar ön işlemesini, (b) değerlerin istemciden sunucuya hareketini ve (c) sunucuda toplanmış değerin bazı sonradan işlenmesini kapsadığını hatırlayın. İçindeki agregasyon bileşimin, yukarıda belirtilen ikinci amacı, gerçekleştirilen tff.aggregators yapılanma kısım (b) başka bir agregasyon fabrikasına temsilci şekildedir agregasyon fabrikaları uygulanmasını modülü.

Tüm gerekli mantığı tek bir fabrika sınıfı içinde uygulamak yerine, uygulamalar varsayılan olarak toplamayla ilgili tek bir yöne odaklanır. Gerektiğinde, bu model daha sonra yapı taşlarını birer birer değiştirmemizi sağlar.

Bir örnek ağırlıklı olan tff.aggregators.MeanFactory . Uygulaması, istemcilerde sağlanan değerleri ve ağırlıkları çarpar, ardından hem ağırlıklı değerleri hem de ağırlıkları bağımsız olarak toplar ve ardından ağırlıklı değerlerin toplamını sunucudaki ağırlıkların toplamına böler. Bunun yerine doğrudan kullanarak toplamları uygulanması tff.federated_sum operatörü, toplam iki örneğine delege tff.aggregators.SumFactory .

Bu yapı, iki varsayılan toplamın, toplamı farklı şekilde gerçekleştiren farklı fabrikalar tarafından değiştirilmesini mümkün kılar. Örneğin, bir tff.aggregators.SecureSumFactory veya özel bir uygulama tff.aggregators.UnweightedAggregationFactory . Bunun aksine, zaman, tff.aggregators.MeanFactory kendisi gibi bir başka fabrikanın bir iç agregasyon olabilir tff.aggregators.clipping_factory değerler ortalama alma öncesinde kırpılmasına ise.

Önceki Bkz Ayarlama öğrenme toplamalardan önerilen mevcut fabrikaları kullanarak kompozisyon mekanizmasının receommended kullanımlar için öğretici tff.aggregators modülü.

Örnek olarak en iyi uygulamalar

Biz göstermek için gidiyoruz tff.aggregators basit bir örnek görevi uygulayarak ayrıntılı olarak kavram ve giderek daha genel olun. Öğrenmenin başka bir yolu da mevcut fabrikaların uygulanmasına bakmaktır.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Bunun yerine toplanmasının value , örneğin görev toplanmasından ibarettir value * 2.0 ve o zamana kadar toplam ikiye bölünür 2.0 . Agregasyon sonucu böylece doğrudan toplanmasıyla için matematiksel olarak denk olan value , ve üç bölümden oluşan olarak düşünülebilir: sunucusunda unscaling istemciler (3) boyunca toplanmasıyla istemciler de (1) ölçekleme (2).

Tasarım Yukarıda açıklanan takiben, mantık bir alt sınıfı olarak uygulanacaktır tff.aggregators.UnweightedAggregationFactory uygun oluşturur tff.templates.AggregationProcess verilen bir value_type agrega için:

Minimum uygulama

Örnek görev için gerekli hesaplamalar her zaman aynıdır, bu nedenle durumu kullanmaya gerek yoktur. Böylece boş ve olarak temsil edilir tff.federated_value((), tff.SERVER) . Şimdilik ölçümler için de aynı şey geçerli.

Görevin minimum uygulaması aşağıdaki gibidir:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Her şeyin beklendiği gibi çalışıp çalışmadığı aşağıdaki kodla doğrulanabilir:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Durumsallık ve ölçümler

Durum bilgisi, TFF'de, yinelemeli olarak yürütülmesi ve her yinelemede değişmesi beklenen hesaplamaları temsil etmek için yaygın olarak kullanılır. Örneğin, bir öğrenme hesaplamasının durumu, öğrenilen modelin ağırlıklarını içerir.

Bir toplama hesaplamasında durumun nasıl kullanılacağını göstermek için örnek görevi değiştiriyoruz. Bunun yerine çarpımının value ile 2.0 , biz çarpın o yineleme endeksine göre - kaç kez toplama idam edilmiştir.

Bunu yapmak için, durum kavramı aracılığıyla elde edilen yineleme indeksini takip etmenin bir yoluna ihtiyacımız var. In initialize_fn , bunun yerine boş devlet yaratma, bir skaler sıfır olması devlet başlatmak. Daha sonra, durum kullanılabilecek next_fn (1) tarafından artışı: üç adımda 1.0 , çok-katlı (2) 'kullanım value ve yeni güncel durum olarak (3) döner.

Bu yapıldıktan sonra, şuna dikkat çekebilir: Ama tam yukarıdaki gibi aynı kod beklendiği gibi tüm çalışmaları doğrulamak için kullanılabilir. Bir şeyin gerçekten değiştiğini nasıl bilebilirim?

İyi soru! Ölçüm kavramının yararlı olduğu yer burasıdır. Genel olarak, ölçümler tek uygulanmasına ilişkin herhangi bir değer bildirebilirsiniz next izlenmesi için kullanılabilir fonksiyonu. Bu durumda, bu olabilir summed_value önceki örnekten. Diğer bir deyişle, yineleme dizinine bağlı olması gereken "ölçeklendirmeyi kaldırma" adımından önceki değer. Yine, bu pratikte mutlaka yararlı değildir, ancak ilgili mekanizmayı gösterir.

Göreve verilen durumsal cevap şu şekilde görünür:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

O Not state girer next_fn girdi olarak sunucuya yerleştirilir. İstemciler de kullanmak için, öncelikle kullanılarak elde edilir, iletilmesi gereken tff.federated_broadcast operatör.

Beklendiği gibi tüm çalışmaları doğrulamak için, artık bildirilen bakabilirsiniz measurements , yürütme her turda farklı olmalı bile aynı olan çalışma client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Yapılandırılmış tipler

Birleşik öğrenmede eğitilmiş bir modelin model ağırlıkları, genellikle tek bir tensörden ziyade bir tensör koleksiyonu olarak temsil edilir. TFF, bu şekilde temsil edilen tff.StructType ve genel olarak faydalı agregasyon fabrikaları yapılandırılmış türlerini kabul mümkün olması gerekmektedir.

Bununla birlikte, yukarıdaki örneklerde, sadece bir çalıştı tff.TensorType nesne. Biz ile toplama işlemini oluşturmak için önceki fabrika kullanmayı denerseniz tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , garip bir hata nedeniyle olsun TensorFlow bir çoğalmaya çalışacağız tf.Tensor ve bir list .

Sorun yerine sabiti tarafından tensörlerinin yapısını çarpılması nedeniyle, bir sabiti tarafından yapısında her tensörünü çarpmak gerekir olmasıdır. Bu sorunun olağan çözümü kullanmaktır tf.nest oluşturulan modül içini tff.tf_computation s.

Önceki sürümü ExampleTaskFactory aşağıdaki gibi yapılandırılmış türleriyle uyumlu böylece görünür:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Bu örnek, TFF kodunu yapılandırırken izlenmesi yararlı olabilecek bir modeli vurgulamaktadır. Çok basit operasyonlar ile ilgili değil zaman zaman, kod daha okunaklı hale tff.tf_computation bir iç yapı taşları olarak kullanılacak ler tff.federated_computation ayrı yerde oluşturulur. İçinde bir tff.federated_computation , bu yapı taşları yalnızca içsel operatörlerini kullanarak bağlanır.

Beklendiği gibi çalıştığını doğrulamak için:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

İç toplamalar

Son adım, farklı toplama tekniklerinin kolay bir şekilde oluşturulmasına izin vermek için fiili toplamanın diğer fabrikalara devredilmesini isteğe bağlı olarak etkinleştirmektir.

Bu isteğe bağlı bir oluşturularak gerçekleştirilir inner_factory bizim kurucuları içindedir argüman ExampleTaskFactory . Belirtilen Değilse, tff.aggregators.SumFactory uygulanacağı, kullanılan tff.federated_sum önceki bölümde doğrudan kullanılan operatöre.

Ne zaman create denir, öncelikle arayabilecek create ait inner_factory aynı iç toplama işlemini oluşturmak için value_type .

Tarafından döndürülen Sürecimizin devlet initialize_fn "Bu" işlemiyle oluşturulan devlet ve daha sonra oluşturulan iç sürecin devlet: iki bölümden bir bileşimdir.

Uygulanması next_fn bu gerçek birleşimine de farklılık için temsilci next iç işleminin fonksiyonu, ve son çıkış oluşmaktadır nasıl. Durum yine, "bu" ve "iç" durumuna oluşmaktadır ve ölçümler ile benzer bir şekilde oluşan OrderedDict .

Aşağıdaki, böyle bir modelin bir uygulamasıdır.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

İçin devredilirken inner_process.next fonksiyonu, aldığımız dönüş yapısı olan tff.templates.MeasuredProcessOutput - aynı üç alanlarla, state , result ve measurements . Oluşan toplanma işleminin genel dönüş yapısı oluştururken, state ve measurements alanları genellikle oluşan ve birlikte döndürülmelidir. Buna karşılık, result değerine alan karşılık birleştirilmiş olan ve bunun yerine oluşan agregasyonu "akar".

state nesne fabrikasının bir uygulama detaylı olarak görülmelidir ve dolayısıyla bileşim, herhangi bir yapı olabilir. Ancak, measurements değerleri bir noktada kullanıcıya raporlanacak karşılık gelmektedir. Bu nedenle, kullanım tavsiye OrderedDict bir kompozisyonda bir gelir metrik bildirilen gelmez yerler net olacağını böyle adlandırma oluşur ile.

Ayrıca kullanımına dikkat tff.federated_zip operatörü. state oluşturulmuş yöntem ile contolled amacı olmalıdır tff.FederatedType . Bunun yerine geri dönmüş olsaydı (this_state, inner_state) içinde initialize_fn , dönüş türü imzası olacağını tff.StructType bir 2-tuple içeren tff.FederatedType s. Kullanımı tff.federated_zip "asansörleri" tff.FederatedType en üst seviyeye. Bu benzer şekilde kullanılır next_fn durumu ve ölçümler döndürülecek hazırlanırken.

Son olarak, bunun varsayılan iç toplama ile nasıl kullanılabileceğini görebiliriz:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... ve farklı bir iç toplama ile. Örneğin, bir ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Özet

Bu öğreticide, bir toplama fabrikası olarak temsil edilen genel amaçlı bir toplama yapı taşı oluşturmak için izlenecek en iyi uygulamaları açıkladık. Genellik, tasarım amacından iki şekilde gelir:

  1. Parametreli hesaplamalar. Toplama ile çalışmak üzere tasarlanmış diğer TFF modüllerine takılabilir bağımsız bir yapı taşıdır tff.aggregators gibi onların gerekli toplanmasına parametreleştirmenin, tff.learning.build_federated_averaging_process .
  2. Toplama bileşimi. Bir toplama yapı taşı, daha karmaşık bileşik toplamalar oluşturmak için diğer toplama yapı taşlarıyla oluşturulabilir.