Bir sorunuz mu var? TensorFlow Forum Ziyaret Forumunda toplulukla bağlantı kurun

Özel Toplamaları Uygulama

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Defteri indirin

Bu eğiticide, tff.aggregators modülünün arkasındaki tasarım ilkelerini ve istemcilerden sunucuya özel değer toplamasını uygulamaya yönelik en iyi uygulamaları tff.aggregators .

Ön koşullar. Bu öğretici, yerleştirmeler ( tff.SERVER , tff.CLIENTS ), TFF'nin hesaplamaları nasıl temsil ettiği ( tff.tf_computation , tff.federated_computation ) ve bunların tür imzaları gibi temel Federated Core kavramlarına zaten aşina olduğunuzu varsayar.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Tasarım özeti

TFF olarak, "birleştirme" ile ilgili bir dizi değer hareketi ifade eder tff.CLIENTS aynı tipte bir toplam değeri elde etmek için tff.SERVER . Yani, her bir müşteri değerinin mevcut olması gerekmez. Örneğin, federe öğrenmede, sunucudaki genel modele uygulanacak bir toplu model güncellemesi almak için istemci modeli güncellemelerinin ortalaması alınır.

tff.federated_sum gibi bu hedefi gerçekleştiren operatörlere ek olarak, TFF, toplama hesaplaması için tür imzasını resmileştiren tff.templates.AggregationProcess ( durum bilgisi olan bir süreç ) sağlar, böylece basit bir toplamdan daha karmaşık formlara genelleme yapabilir.

tff.aggregators modülünün ana bileşenleri, iki açıdan TFF'nin genel olarak yararlı ve değiştirilebilir yapı taşları olacak şekilde tasarlanmış AggregationProcess tff.aggregators oluşturulması için fabrikalardır :

  1. Parametreli hesaplamalar. Toplama, gerekli toplamalarını parametrelendirmek için tff.aggregators ile çalışmak üzere tasarlanmış diğer TFF modüllerine takılabilen bağımsız bir yapı taşıdır.

Misal:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Toplama bileşimi. Bir toplama yapı taşı, daha karmaşık bileşik toplamalar oluşturmak için diğer toplama yapı taşlarıyla oluşturulabilir.

Misal:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Bu eğitimin geri kalanı, bu iki hedefe nasıl ulaşıldığını açıklar.

Toplama süreci

Önce tff.templates.AggregationProcess özetliyoruz ve oluşturulması için fabrika modelini takip tff.templates.AggregationProcess .

tff.templates.AggregationProcess , toplama için belirtilen tür imzalarına sahip bir tff.templates.MeasuredProcess . Özellikle, initialize ve next işlevler aşağıdaki tür imzalarına sahiptir:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Durum ( state_type ) sunucuya yerleştirilmelidir. next işlev, girdi bağımsız değişkeni olarak durumu ve istemcilere yerleştirilecek ( value_type ) value_type bir değeri alır. * , İsteğe bağlı diğer girdi argümanları anlamına gelir, örneğin ağırlıklı ortalamadaki ağırlıklar. Güncellenmiş bir durum nesnesi, sunucuya yerleştirilen aynı türden toplanmış değeri ve bazı ölçümleri döndürür.

Devlet hem de çalıştırma arasında geçirilecek bu Not next işlev, ve rapor edilen ölçümler, belirli bir yürütme bağlı olarak herhangi bir bilgi vermek için amaçlanan next fonksiyonu, boş olabilir. Bununla birlikte, TFF'nin diğer bölümlerinin takip etmesi için açık bir sözleşmeye sahip olması için açıkça belirtilmesi gerekir.

Diğer TFF modüllerinin, örneğin tff.learning model güncellemelerinin, değerlerin nasıl tff.templates.AggregationProcess parametreleştirmek için tff.templates.AggregationProcess kullanması beklenir. Bununla birlikte, toplanan değerlerin tam olarak ne olduğu ve tür imzalarının ne olduğu, eğitilen modelin diğer ayrıntılarına ve bunu yapmak için kullanılan öğrenme algoritmasına bağlıdır.

tff.templates.AggregationProcess hesaplamaların diğer yönlerinden bağımsız kılmak için fabrika modelini kullanırız - uygun tff.templates.AggregationProcess oluştururuz. tff.templates.AggregationProcess nesnelerin ilgili tip imzaları mevcut olduğunda, fabrikanın create yöntemini çağırarak. Bu nedenle, toplama işleminin doğrudan işlenmesi, yalnızca bu oluşturma işleminden sorumlu olan kütüphane yazarları için gereklidir.

Toplama işlemi fabrikaları

Ağırlıksız ve ağırlıklı toplama için iki soyut temel fabrika sınıfı vardır. Bunların create yöntem toplu gereken değerin tipi imzalarını alır ve bir döner tff.templates.AggregationProcess bu değerlerin toplanması için.

tff.aggregators.UnweightedAggregationFactory tarafından oluşturulan işlem iki girdi bağımsız değişkeni alır: (1) sunucuda durum ve (2) belirtilen türde value_type değeri.

Örnek bir uygulama tff.aggregators.SumFactory .

tff.aggregators.WeightedAggregationFactory tarafından oluşturulan işlem üç girdi bağımsız değişkeni alır: (1) sunucuda durum, (2) belirtilen tür value_type değeri ve (3) create yöntemini çağırırken fabrikanın kullanıcısı tarafından belirtilen weight_type türünün ağırlığı.

Örnek bir uygulama, ağırlıklı ortalamayı hesaplayan tff.aggregators.MeanFactory .

Fabrika düzeni, yukarıda belirtilen ilk hedefe nasıl ulaştığımızdır; bu toplama bağımsız bir yapı taşıdır. Örneğin, hangi model değişkenlerinin eğitilebilir olduğunu değiştirirken, karmaşık bir toplamanın mutlaka değişmesi gerekmez; bunu temsil eden fabrika, tff.learning.build_federated_averaging_process gibi bir yöntem tarafından kullanıldığında farklı bir tür imzayla tff.learning.build_federated_averaging_process .

Kompozisyonlar

Genel bir toplama işleminin (a) istemcilerdeki değerlerin bazı ön işlemlerini, (b) değerlerin istemciden sunucuya taşınmasını ve (c) toplanan değerin sunucudaki bazı son işlemlerini kapsayabileceğini hatırlayın. Yukarıda belirtilen ikinci amaç olan agregasyon kompozisyonu, agregasyon fabrikalarının uygulaması, (b) bölümü başka bir agregasyon fabrikasına devredilebilecek şekilde yapılandırılarak tff.aggregators modülü içerisinde gerçekleştirilmektedir.

Tek bir fabrika sınıfında gerekli tüm mantığı uygulamak yerine, uygulamalar varsayılan olarak toplama ile ilgili tek bir yöne odaklanır. Gerektiğinde, bu model yapı taşlarını birer birer değiştirmemizi sağlar.

Bir örnek, ağırlıklı tff.aggregators.MeanFactory . Uygulanması, istemcilerde sağlanan değerleri ve ağırlıkları çarpar, ardından hem ağırlıklı değerleri hem de ağırlıkları bağımsız olarak toplar ve ardından ağırlıklı değerlerin toplamını sunucudaki ağırlıkların toplamına böler. Doğrudan tff.federated_sum operatörünü kullanarak toplamaları uygulamak yerine, toplama iki tff.aggregators.SumFactory örneğine tff.aggregators.SumFactory .

Bu tür bir yapı, iki varsayılan toplamın, toplamı farklı şekilde gerçekleştiren farklı fabrikalarla değiştirilmesini mümkün kılar. Örneğin, bir tff.aggregators.SecureSumFactory veya özel bir uygulama tff.aggregators.UnweightedAggregationFactory . Tersine, zaman, tff.aggregators.MeanFactory , değerler ortalamadan önce tff.aggregators.clipping_factory , tff.aggregators.clipping_factory gibi başka bir fabrikanın dahili bir toplamı olabilir.

tff.aggregators modülündeki mevcut fabrikaları kullanan kompozisyon mekanizmasının tff.aggregators kullanımları için öğrenim öğreticisine yönelik önceki Tuning tarafından önerilen toplamalara bakın.

Örneğe göre en iyi uygulamalar

Basit bir örnek görevi uygulayarak tff.aggregators kavramlarını ayrıntılı olarak açıklayacağız ve aşamalı olarak daha genel hale getireceğiz. Öğrenmenin bir başka yolu da mevcut fabrikaların uygulamasına bakmaktır.

import collections
import tensorflow as tf
import tensorflow_federated as tff

value toplamak yerine, örnek görev, value * 2.0 toplamak ve ardından toplamı 2.0 bölmektir. Toplama sonucu, matematiksel olarak value doğrudan toplamaya eşdeğerdir ve üç bölümden oluştuğu düşünülebilir: (1) istemcilerde ölçeklendirme (2) istemciler arasında toplama (3) sunucuda ölçek kaldırma.

Yukarıda açıklanan tasarımın ardından mantık, toplanacak bir value_type verildiğinde uygun tff.templates.AggregationProcess oluşturan tff.aggregators.UnweightedAggregationFactory bir alt sınıfı olarak uygulanacaktır:

Minimum uygulama

Örnek görev için, gerekli hesaplamalar her zaman aynıdır, bu nedenle durumu kullanmaya gerek yoktur. Dolayısıyla boştur ve tff.federated_value((), tff.SERVER) olarak temsil edilir. Aynısı şimdilik ölçümler için de geçerlidir.

Görevin asgari uygulaması şu şekildedir:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Her şeyin beklendiği gibi çalışıp çalışmadığı aşağıdaki kodla doğrulanabilir:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Durumsallık ve ölçümler

Durum bilgisi, TFF'de yinelemeli olarak yürütülmesi ve her yinelemeyle birlikte değişmesi beklenen hesaplamaları temsil etmek için yaygın olarak kullanılır. Örneğin, bir öğrenme hesaplamasının durumu öğrenilmekte olan modelin ağırlıklarını içerir.

Bir toplama hesaplamasında durumun nasıl kullanılacağını göstermek için, örnek görevi değiştiriyoruz. value 2.0 ile çarpmak yerine, onu yineleme indeksi ile - toplamanın çalıştırılma sayısı - ile çarparız.

Bunu yapmak için, durum kavramıyla elde edilen yineleme indeksini takip etmenin bir yoluna ihtiyacımız var. initialize_fn , boş bir durum yaratmak yerine, durumu skaler sıfır olarak başlatıyoruz. Ardından, durum next_fn üç adımda kullanılabilir: (1) 1.0 artırma, (2) value çarpmak için kullanma ve (3) yeni güncellenmiş durum olarak geri dönme.

Bu yapıldıktan sonra şunu not edebilirsiniz: Ancak yukarıdaki ile tam olarak aynı kod, tüm çalışmaları beklendiği gibi doğrulamak için kullanılabilir. Bir şeyin gerçekten değiştiğini nasıl bilebilirim?

İyi soru! Burası ölçüm kavramının işe yaradığı yerdir. Genel olarak ölçümler, izleme için kullanılabilecek bir next fonksiyonun tek bir uygulamasına ilişkin herhangi bir değeri rapor edebilir. Bu durumda, önceki örnekteki summed_value olabilir. Yani yineleme indeksine bağlı olması gereken "ölçek kaldırma" adımından önceki değer. Yine, bu pratikte mutlaka yararlı değildir, ancak ilgili mekanizmayı göstermektedir.

Göreve verilen durum bilgisi olan cevap şu şekilde görünür:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

O Not state girer next_fn girdi olarak sunucuya yerleştirilir. İstemcilerde kullanmak için, önce tff.federated_broadcast operatörü kullanılarak tff.federated_broadcast .

Tüm çalışmaları beklendiği gibi doğrulamak için, artık aynı client_data ile çalıştırılsa bile her yürütme turunda farklı olması gereken rapor edilen measurements client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Yapılandırılmış tipler

Birleşik öğrenmede eğitilmiş bir modelin model ağırlıkları genellikle tek bir tensörden ziyade bir tensör koleksiyonu olarak temsil edilir. TFF'de bu, tff.StructType olarak temsil edilir ve genellikle yararlı toplama fabrikalarının yapılandırılmış türleri kabul edebilmesi gerekir.

Ancak, yukarıdaki örneklerde sadece bir tff.TensorType nesnesiyle çalıştık. Bir tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) ile toplama işlemini oluşturmak için önceki fabrikayı kullanmaya tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) garip bir hata alırız çünkü TensorFlow, bir tf.Tensor ile bir list tf.Tensor çalışacaktır.

Sorun şu ki, tensörlerin yapısını bir sabitle çarpmak yerine, yapıdaki her tensörü bir sabitle çarpmamız gerekiyor. Bu sorunun olağan çözümü, oluşturulan tff.tf_computation s içindeki tf.nest modülünü kullanmaktır.

Yapılandırılmış türlerle uyumlu önceki ExampleTaskFactory sürümü bu nedenle aşağıdaki gibi görünür:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Bu örnek, TFF kodunu yapılandırırken takip edilmesi yararlı olabilecek bir modeli vurgular. Çok basit işlemlerle tff.federated_computation , bir tff.federated_computation içinde yapı taşları olarak kullanılacak tff.tf_computation s ayrı bir yerde yaratıldığında kod daha okunaklı hale gelir. tff.federated_computation , bu yapı blokları yalnızca iç operatörler kullanılarak bağlanır.

Beklendiği gibi çalıştığını doğrulamak için:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

İç toplamalar

Son adım, farklı birleştirme tekniklerinin kolay bir şekilde oluşturulmasına olanak sağlamak için, isteğe bağlı olarak gerçek kümelemenin diğer fabrikalara devredilmesine olanak sağlamaktır.

Bu isteğe bağlı bir oluşturularak gerçekleştirilir inner_factory bizim kurucuları içindedir argüman ExampleTaskFactory . Belirtilen Değilse, tff.aggregators.SumFactory uygulanacağı, kullanılan tff.federated_sum önceki bölümde doğrudan kullanılan operatöre.

Ne zaman create denir, öncelikle arayabilecek create ait inner_factory aynı iç toplama işlemini oluşturmak için value_type .

initialize_fn ile döndürülen sürecimizin durumu iki bölümden oluşan bir bileşimdir: "bu" işlem tarafından oluşturulan durum ve yeni oluşturulan iç sürecin durumu.

next_fn uygulanması, gerçek toplamanın iç sürecin bir next işlevine devredilmesi ve nihai çıktının nasıl oluşturulduğu next_fn farklılık gösterir. Durum yine "bu" ve "iç" durumdan oluşur ve ölçümler, OrderedDict benzer şekilde OrderedDict .

Aşağıda bu tür bir modelin uygulaması yer almaktadır.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

inner_process.next işlevine delege ederken, elde inner_process.next dönüş yapısı aynı üç alana sahip bir tff.templates.MeasuredProcessOutput - state , result ve measurements . Oluşturulan toplama sürecinin genel getiri yapısını oluştururken, state ve measurements alanları genel olarak bir araya getirilmeli ve döndürülmelidir. Bunun tersine, result alanı, toplanan değere karşılık gelir ve bunun yerine, oluşturulan toplamadan "akar".

state nesne fabrikasının bir uygulama detaylı olarak görülmelidir ve dolayısıyla bileşim, herhangi bir yapı olabilir. Ancak measurements , bir noktada kullanıcıya bildirilecek değerlere karşılık gelir. Bu nedenle, bir kompozisyonda rapor edilen bir metriğin nereden geldiği net olacak şekilde birleşik adlandırma ile OrderedDict kullanmanızı OrderedDict .

tff.federated_zip operatörünün kullanımına da dikkat edin. Oluşturulan işlem tarafından kontrol edilen state nesnesi bir tff.FederatedType olmalıdır. Bunun yerine geri dönmüş olsaydı (this_state, inner_state) içinde initialize_fn , dönüş türü imzası olacağını tff.StructType bir 2-tuple içeren tff.FederatedType s. Kullanımı tff.federated_zip "asansörleri" tff.FederatedType en üst seviyeye. Bu, durum ve döndürülecek ölçümler hazırlanırken benzer şekilde next_fn kullanılır.

Son olarak, bunun varsayılan iç toplama ile nasıl kullanılabileceğini görebiliriz:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... ve farklı bir iç kümelenme ile. Örneğin, bir ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Özet

Bu eğiticide, bir toplama fabrikası olarak temsil edilen genel amaçlı bir toplama yapı taşı oluşturmak için izlenecek en iyi uygulamaları açıkladık. Genellik, tasarım amacından iki şekilde geçer:

  1. Parametreli hesaplamalar. Toplama, tff.aggregators gibi gerekli toplamalarını parametrelendirmek için tff.aggregators ile çalışmak üzere tasarlanmış diğer TFF modüllerine takılabilen bağımsız bir yapı tff.learning.build_federated_averaging_process .
  2. Toplama bileşimi. Bir toplama yapı taşı, daha karmaşık bileşik toplamalar oluşturmak için diğer toplama yapı taşlarıyla oluşturulabilir.