Zapisz datę! Google I / O powraca w dniach 18-20 maja Zarejestruj się teraz
Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Wdrażanie niestandardowych agregacji

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło w serwisie GitHub Pobierz notatnik

W tym samouczku wyjaśniamy zasady projektowania stojące za modułem tff.aggregators i najlepsze praktyki dotyczące implementowania niestandardowej agregacji wartości z klientów na serwer.

Wymagania wstępne. W tym samouczku założono, że znasz już podstawowe koncepcje Federated Core, takie jak miejsca docelowe ( tff.SERVER , tff.CLIENTS ), sposób reprezentowania obliczeń przez TFF ( tff.tf_computation , tff.federated_computation ) i ich sygnatury typów.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Podsumowanie projektu

W TFF „agregacja” odnosi się do tff.CLIENTS zbioru wartości na tff.CLIENTS celu wytworzenia zagregowanej wartości tego samego typu na tff.SERVER . Oznacza to, że każda indywidualna wartość klienta nie musi być dostępna. Na przykład w uczeniu federacyjnym aktualizacje modelu klienta są uśredniane, aby uzyskać zbiorczą aktualizację modelu, która ma być zastosowana do modelu globalnego na serwerze.

Oprócz operatorów realizujących ten cel, takich jak tff.federated_sum , TFF zapewnia tff.templates.AggregationProcess ( proces tff.templates.AggregationProcess ), który formalizuje sygnaturę typu dla obliczeń agregacji, dzięki czemu może uogólniać na bardziej złożone formy niż prosta suma.

Głównymi komponentami modułu tff.aggregatorsfabryki do tworzenia procesu AggregationProcess , które mają być ogólnie użytecznymi i wymiennymi elementami składowymi TFF w dwóch aspektach:

  1. Obliczenia parametryzowane. Agregacja jest niezależnym blokiem konstrukcyjnym, który można podłączyć do innych modułów TFF zaprojektowanych do współpracy z tff.aggregators celu parametryzacji niezbędnej agregacji.

Przykład:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Skład agregacji. Blok budulcowy agregacji można łączyć z innymi blokami budulcowymi agregacji, aby tworzyć bardziej złożone agregacje złożone.

Przykład:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

W pozostałej części tego samouczka wyjaśniono, w jaki sposób osiąga się te dwa cele.

Proces agregacji

Najpierw podsumowujemy proces tff.templates.AggregationProcess i postępujemy zgodnie z wzorcem fabrycznym jego tworzenia.

tff.templates.AggregationProcess to tff.templates.MeasuredProcess z sygnaturami typu określonymi do agregacji. W szczególności funkcje initialize i next mają następujące sygnatury typów:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Stan (typu state_type ) należy umieścić na serwerze. next funkcja przyjmuje jako argument wejściowy stan i wartość do zagregowania (typu value_type ) umieszczoną na klientach. * Oznacza opcjonalne inne argumenty wejściowe, na przykład wagi w średniej ważonej. Zwraca zaktualizowany obiekt stanu, zagregowaną wartość tego samego typu umieszczoną na serwerze i niektóre pomiary.

Należy zauważyć, że zarówno stan, który ma być przekazany między wykonaniami next funkcji, jak i raportowane pomiary mające na celu raportowanie jakichkolwiek informacji w zależności od konkretnego wykonania next funkcji, mogą być puste. Niemniej jednak muszą być wyraźnie określone dla innych części TFF, aby mieć jasną umowę do wykonania.

Inne moduły TFF, na przykład aktualizacje modelu w tff.learning , powinny używać tff.templates.AggregationProcess do parametryzacji sposobu agregowania wartości. Jednak to, jakie dokładnie są agregowane wartości i jakie są ich sygnatury typów, zależy od innych szczegółów trenowanego modelu i algorytmu uczącego się, który to robi.

Aby uniezależnić agregację od innych aspektów obliczeń, używamy wzorca fabrycznego - tworzymy odpowiedni tff.templates.AggregationProcess gdy dostępne są odpowiednie sygnatury typów obiektów, które mają być agregowane, poprzez wywołanie metody create fabryki. Bezpośrednia obsługa procesu agregacji jest więc potrzebna tylko autorom bibliotecznym, którzy są odpowiedzialni za to tworzenie.

Fabryki procesów agregacji

Istnieją dwie abstrakcyjne klasy bazowe dla agregacji nieważonej i ważonej. Ich metoda create przyjmuje sygnatury typu wartości do agregacji i zwraca tff.templates.AggregationProcess do agregacji takich wartości.

Proces utworzony przez tff.aggregators.UnweightedAggregationFactory przyjmuje dwa argumenty wejściowe: (1) stan na serwerze i (2) wartość określonego typu value_type .

Przykładową implementacją jest tff.aggregators.SumFactory .

Proces tworzony przez tff.aggregators.WeightedAggregationFactory przyjmuje trzy argumenty wejściowe: (1) stan na serwerze, (2) wartość określonego typu value_type i (3) waga typu weight_type , określona przez użytkownika fabryki podczas wywoływania metody create .

Przykładową implementacją jest tff.aggregators.MeanFactory która oblicza średnią ważoną.

Wzorzec fabryki to sposób, w jaki osiągamy pierwszy cel określony powyżej; ta agregacja jest niezależną cegiełką. Na przykład przy zmianie zmiennych modelu, które można trenować, złożona agregacja niekoniecznie musi się zmieniać; reprezentująca go fabryka zostanie wywołana z innym podpisem typu, gdy zostanie użyta przez metodę taką jak tff.learning.build_federated_averaging_process .

Kompozycje

Przypomnijmy, że ogólny proces agregacji może obejmować (a) pewne wstępne przetwarzanie wartości u klientów, (b) przenoszenie wartości z klienta na serwer oraz (c) pewne przetwarzanie końcowe zagregowanych wartości na serwerze. Drugi cel, o którym mowa powyżej, skład agregacji, jest realizowany wewnątrz modułu tff.aggregators poprzez ustrukturyzowanie implementacji fabryk agregacji w taki sposób, że część (b) może być delegowana do innej fabryki agregacji.

Zamiast implementować całą niezbędną logikę w jednej klasie fabrycznej, implementacje są domyślnie skupione na jednym aspekcie istotnym dla agregacji. W razie potrzeby ten wzór umożliwia nam wymianę bloków konstrukcyjnych pojedynczo.

Przykładem jest ważony tff.aggregators.MeanFactory . Jego implementacja mnoży podane wartości i wagi u klientów, następnie niezależnie sumuje zarówno wartości ważone, jak i wagi, a następnie dzieli sumę ważonych wartości przez sumę wag na serwerze. Zamiast implementować sumowania za pomocą bezpośredniego operatora tff.federated_sum , sumowanie jest delegowane do dwóch wystąpień tff.aggregators.SumFactory .

Taka konstrukcja umożliwia zastąpienie dwóch domyślnych sumowań przez różne fabryki, które różnie realizują sumę. Na przykład tff.aggregators.SecureSumFactory lub niestandardowa implementacja tff.aggregators.UnweightedAggregationFactory . I odwrotnie, time, tff.aggregators.MeanFactory może być wewnętrzną agregacją innej fabryki, takiej jak tff.aggregators.clipping_factory , jeśli wartości mają zostać obcięte przed uśrednieniem.

Zapoznaj się z poprzednimi zalecanymi agregacjami dotyczącymi tuningu, aby zapoznać się z samouczkiem dotyczącym zalecanych zastosowań mechanizmu kompozycji przy użyciu istniejących fabryk w module tff.aggregators .

Najlepsze praktyki na przykładzie

Zamierzamy szczegółowo zilustrować koncepcje tff.aggregators implementując proste przykładowe zadanie i stopniowo je tff.aggregators . Innym sposobem nauki jest przyjrzenie się wdrożeniom istniejących fabryk.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Zamiast sumowania value , przykładowe zadanie polega na zsumowaniu value * 2.0 a następnie podzieleniu sumy przez 2.0 . Wynik agregacji jest zatem matematycznie równoważny z bezpośrednim sumowaniem value i może być traktowany jako składający się z trzech części: (1) skalowanie na klientach (2) sumowanie na klientach (3) nieskalowanie na serwerze.

Zgodnie z projektem wyjaśnionym powyżej, logika zostanie zaimplementowana jako podklasa tff.aggregators.UnweightedAggregationFactory , która tworzy odpowiednie tff.templates.AggregationProcess gdy otrzyma value_type do agregacji:

Minimalna implementacja

W przypadku zadania przykładowego niezbędne obliczenia są zawsze takie same, więc nie ma potrzeby używania stanu. Jest więc pusty i reprezentowany jako tff.federated_value((), tff.SERVER) . Na razie to samo dotyczy pomiarów.

Minimalna realizacja zadania jest więc następująca:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

To, czy wszystko działa zgodnie z oczekiwaniami, można sprawdzić za pomocą następującego kodu:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Stanowość i pomiary

Stanowalność jest szeroko stosowana w TFF do reprezentowania obliczeń, które mają być wykonywane iteracyjnie i zmieniać się z każdą iteracją. Na przykład stan obliczenia uczącego zawiera wagi uczonego modelu.

Aby zilustrować, jak używać stanu w obliczeniach agregacji, zmodyfikujemy przykładowe zadanie. Zamiast mnożyć value przez 2.0 , mnożymy ją przez wskaźnik iteracji - liczbę wykonanych agregacji.

Aby to zrobić, potrzebujemy sposobu na śledzenie indeksu iteracji, który osiąga się dzięki koncepcji stanu. W initialize_fn zamiast tworzyć stan pusty, inicjalizujemy stan na zero skalarne. Następnie stan może zostać użyty w next_fn w trzech krokach: (1) next_fn o 1.0 , (2) użyj do pomnożenia value i (3) powróć jako nowy zaktualizowany stan.

Gdy to zrobisz, możesz zauważyć: Ale dokładnie ten sam kod, co powyżej, może być użyty do zweryfikowania wszystkich działań zgodnie z oczekiwaniami. Skąd mam wiedzieć, że coś się faktycznie zmieniło?

Dobre pytanie! W tym miejscu przydatna staje się koncepcja pomiarów. Ogólnie pomiary mogą zgłaszać dowolną wartość związaną z pojedynczym wykonaniem next funkcji, która może być wykorzystana do monitorowania. W tym przypadku może to być summed_value z poprzedniego przykładu. To znaczy wartość przed krokiem „nieskalowania”, która powinna zależeć od indeksu iteracji. Ponownie, niekoniecznie jest to przydatne w praktyce, ale ilustruje odpowiedni mechanizm.

Stanowcza odpowiedź na to zadanie wygląda zatem następująco:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Zauważ, że state który wchodzi w next_fn jako dane wejściowe, jest umieszczany na serwerze. Aby można było z niego korzystać u klientów, należy go najpierw zakomunikować, co odbywa się za pomocą operatora tff.federated_broadcast .

Aby sprawdzić, czy wszystko działa zgodnie z oczekiwaniami, możemy teraz spojrzeć na zgłoszone measurements , które powinny być różne w każdej rundzie wykonywania, nawet jeśli są uruchamiane z tymi samymi client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Typy strukturalne

Wagi modeli modelu szkolonego w uczeniu federacyjnym są zwykle przedstawiane jako zbiór tensorów, a nie jako pojedynczy tensor. W TFF jest to reprezentowane jako tff.StructType i ogólnie przydatne fabryki agregacji muszą być w stanie zaakceptować typy strukturalne.

Jednak w powyższych przykładach pracowaliśmy tylko z obiektem tff.TensorType . Jeśli spróbujemy użyć poprzedniej fabryki do utworzenia procesu agregacji z tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , otrzymamy dziwny błąd, ponieważ TensorFlow spróbuje pomnożyć tf.Tensor i list .

Problem polega na tym, że zamiast pomnożyć strukturę tensorów przez stałą, musimy pomnożyć każdy tensor w strukturze przez stałą. tf.nest rozwiązaniem tego problemu jest użycie modułu tf.nest wewnątrz utworzonych tff.tf_computation .

Wersja poprzedniej wersji ExampleTaskFactory zgodnej z typami strukturalnymi wygląda więc następująco:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Ten przykład podkreśla wzorzec, który może być przydatny do naśladowania podczas tworzenia struktury kodu TFF. Gdy nie mamy do czynienia z bardzo prostymi operacjami, kod staje się bardziej czytelny, gdy tff.tf_computation które będą używane jako bloki konstrukcyjne wewnątrz tff.federated_computation zostaną utworzone w oddzielnym miejscu. Wewnątrz tff.federated_computation te bloki konstrukcyjne są połączone tylko za pomocą operatorów wewnętrznych.

Aby sprawdzić, czy działa zgodnie z oczekiwaniami:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Wewnętrzne agregacje

Ostatnim krokiem jest opcjonalne umożliwienie delegowania rzeczywistej agregacji do innych fabryk, aby umożliwić łatwe zestawianie różnych technik agregacji.

Osiąga się to poprzez utworzenie opcjonalnego argumentu inner_factory w konstruktorze naszej ExampleTaskFactory . Jeśli nie zostanie określony, tff.aggregators.SumFactory jest tff.aggregators.SumFactory , który stosuje operator tff.federated_sum użyty bezpośrednio w poprzedniej sekcji.

Kiedy create jest inner_factory create, możemy najpierw wywołać funkcję create of inner_factory aby utworzyć wewnętrzny proces agregacji z tym samym value_type .

Stan naszego procesu zwracany przez initialize_fn to kompozycja dwóch części: stanu utworzonego przez „ten” proces oraz stanu właśnie utworzonego procesu wewnętrznego.

Implementacja next_fn różni się tym, że rzeczywista agregacja jest delegowana do next funkcji procesu wewnętrznego oraz w jaki sposób składa się wynik końcowy. Stan ponownie składa się z „tego” i „wewnętrznego” stanu, a pomiary są złożone w podobny sposób, jak OrderedDict .

Poniżej przedstawiono implementację takiego wzorca.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Podczas delegowania do funkcji inner_process.next zwracana struktura to tff.templates.MeasuredProcessOutput , z tymi samymi trzema polami - state , result i measurements . Podczas tworzenia ogólnej struktury zwrotów złożonego procesu agregacji, pola state i measurements powinny być generalnie składane i zwracane razem. W przeciwieństwie do result pole wynikowe odpowiada agregowanej wartości i zamiast tego „przepływa” przez złożoną agregację.

Obiekt state należy traktować jako szczegół implementacyjny fabryki, a zatem skład może mieć dowolną strukturę. Jednak measurements odpowiadają wartościom, które w pewnym momencie zostaną przekazane użytkownikowi. Dlatego zalecamy używanie OrderedDict ze złożonym nazewnictwem w taki sposób, aby było jasne, skąd w kompozycji pochodzi raportowana metryka.

Zwróć też uwagę na użycie operatora tff.federated_zip . Obiekt state kontrolowany przez utworzony proces powinien mieć wartość tff.FederatedType . Gdybyśmy zamiast tego zwrócili (this_state, inner_state) w initialize_fn , jego sygnatura zwracanego typu byłaby tff.StructType zawierającą 2-krotkę tff.FederatedType s. Użycie tff.federated_zip "podnosi" tff.FederatedType na najwyższy poziom. Jest to podobnie używane w next_fn podczas przygotowywania stanu i pomiarów do zwrócenia.

Wreszcie możemy zobaczyć, jak można tego użyć z domyślną agregacją wewnętrzną:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... iz inną agregacją wewnętrzną. Na przykład ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

streszczenie

W tym samouczku wyjaśniliśmy najlepsze praktyki, których należy przestrzegać, aby utworzyć blok konstrukcyjny agregacji ogólnego przeznaczenia, reprezentowany jako fabryka agregacji. Ogólność wynika z założeń projektowych na dwa sposoby:

  1. Obliczenia parametryzowane. Agregacja to niezależny blok konstrukcyjny, który można podłączyć do innych modułów TFF zaprojektowanych do współpracy z tff.aggregators celu parametryzacji niezbędnej agregacji, takiej jak tff.learning.build_federated_averaging_process .
  2. Skład agregacji. Blok konstrukcyjny agregacji można łączyć z innymi elementami składowymi agregacji, aby tworzyć bardziej złożone agregacje złożone.