Google I / O가 5 월 18 ~ 20 일에 돌아옵니다! 공간을 예약하고 일정을 짜세요 지금 등록하세요
이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

사용자 지정 집계 구현

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

이 자습서에서는 tff.aggregators 모듈의 설계 원칙과 클라이언트에서 서버로의 사용자 지정 값 집계 구현을위한 모범 사례를 설명합니다.

전제 조건. 이 자습서에서는 배치 ( tff.SERVER , tff.CLIENTS ), TFF가 계산을 나타내는 방법 ( tff.tf_computation , tff.federated_computation ) 및 해당 유형 서명과 같은 Federated Core 의 기본 개념에 이미 익숙하다고 가정합니다.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

디자인 요약

TFF에서 "응집"에 대한 값들의 세트의 이동을 지칭 tff.CLIENTS 동일한 유형의 집계 값을 생성하기 tff.SERVER . 즉, 각 개별 클라이언트 값을 사용할 수 없습니다. 예를 들어 연합 학습에서 클라이언트 모델 업데이트는 서버의 글로벌 모델에 적용 할 집계 모델 업데이트를 얻기 위해 평균화됩니다.

tff.federated_sum 과 같은이 목표를 달성하는 연산자 외에도 TFF는 집계 계산을위한 형식 서명을 공식화하는 tff.templates.AggregationProcess ( 상태 저장 프로세스 )를 제공하므로 단순한 합계보다 더 복잡한 형식으로 일반화 할 수 있습니다.

tff.aggregators 모듈의 주요 구성 요소는 AggregationProcess 생성을위한 팩토리 로, 두 가지 측면에서 TFF의 일반적으로 유용하고 대체 가능한 빌딩 블록으로 설계되었습니다.

  1. 매개 변수화 된 계산. 집계는 필요한 집계를 매개 tff.aggregators 하기 위해 tff.aggregators 와 함께 작동하도록 설계된 다른 TFF 모듈에 연결할 수있는 독립적 인 빌딩 블록입니다.

예:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. 집계 구성. 집계 빌딩 블록을 다른 집계 빌딩 블록과 함께 구성하여 더 복잡한 복합 집계를 만들 수 있습니다.

예:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

이 자습서의 나머지 부분에서는 이러한 두 가지 목표를 달성하는 방법을 설명합니다.

집계 프로세스

먼저 tff.templates.AggregationProcess 요약하고 생성을위한 팩토리 패턴을 따릅니다.

tff.templates.AggregationProcess 는 집계를 위해 지정된 유형 서명이있는 tff.templates.MeasuredProcess 입니다. 특히 initializenext 함수에는 다음과 같은 유형 서명이 있습니다.

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

state ( state_type 유형의) 상태는 서버에 있어야합니다. next 함수는 클라이언트에 배치 할 상태 및 집계 할 값 ( value_type 유형)을 입력 인수로 사용합니다. * 는 선택적인 다른 입력 인수를 의미합니다 (예 : 가중 평균의 가중치). 업데이트 된 상태 개체, 서버에 배치 된 동일한 유형의 집계 된 값 및 일부 측정 값을 반환합니다.

국가 모두의 실행 사이에 전달되는 것을 참고 next 기능,보고 된 측정이 특정 실행에 따라 정보보고하도록 next 기능을 비어있을 수 있습니다. 그럼에도 불구하고 TFF의 다른 부분이 명확한 계약을 따를 수 있도록 명시 적으로 지정해야합니다.

다른 TFF 모듈은, 인스턴스의 모델 업데이트 tff.learning , 사용하는 것으로 예상된다 tff.templates.AggregationProcess 값을 집계하는 방법을 매개 변수화 할 수 있습니다. 그러나 집계 된 값이 정확히 무엇이며 유형 시그니처가 무엇인지는 학습중인 모델의 다른 세부 정보와이를 수행하는 데 사용되는 학습 알고리즘에 따라 다릅니다.

집계를 계산의 다른 측면과 독립적으로 만들기 위해 팩토리 패턴을 사용합니다. 팩토리의 create 메서드를 호출하여 집계 할 객체의 관련 유형 서명을 사용할 수 tff.templates.AggregationProcess 되면 적절한 tff.templates.AggregationProcesscreate . 따라서 집계 프로세스의 직접 처리는이 생성을 담당하는 라이브러리 작성자에게만 필요합니다.

집계 프로세스 공장

비가 중 및 가중 집계를위한 두 개의 추상 기본 팩토리 클래스가 있습니다. 그들의 create 메소드는 집계 할 값의 유형 서명을 취하고 이러한 값의 집계를 위해 tff.templates.AggregationProcess 를 리턴합니다.

tff.aggregators.UnweightedAggregationFactory 의해 생성 된 프로세스는 (1) 서버의 상태 및 (2) 지정된 유형 value_type 값이라는 두 가지 입력 인수를 사용합니다.

구현의 예는 tff.aggregators.SumFactory 입니다.

에 의해 생성 된 프로세스 tff.aggregators.WeightedAggregationFactory 서버에서 (1) 상태를 지정 형 (2) 값 : 3 개 개의 입력 인자 얻어 value_type 및 형식 (3) 중량 weight_type 공장의 사용자 지정에 따라 호출 할 때, create 방법.

예제 구현은 가중 평균을 계산하는 tff.aggregators.MeanFactory 입니다.

공장 패턴은 위에서 언급 한 첫 번째 목표를 달성하는 방법입니다. 그 집계는 독립적 인 빌딩 블록입니다. 예를 들어, 학습 가능한 모델 변수를 변경할 때 복잡한 집계를 반드시 변경할 필요는 없습니다. 이를 나타내는 팩토리는 tff.learning.build_federated_averaging_process 와 같은 메소드에서 사용할 때 다른 유형 서명으로 호출됩니다.

작곡

일반 집계 프로세스는 (a) 클라이언트에서 값의 일부 전처리, (b) 클라이언트에서 서버로 값 이동, (c) 서버에서 집계 된 값의 일부 후 처리를 캡슐화 할 수 있습니다. 위에서 언급 한 두 번째 목표 인 집계 구성은 (b) 부분이 다른 집계 팩토리에 위임 될 수 있도록 집계 팩토리의 구현을 구성함으로써 tff.aggregators 모듈 내에서 실현됩니다.

단일 팩토리 클래스 내에서 필요한 모든 로직을 구현하는 대신, 구현은 기본적으로 집계와 관련된 단일 측면에 중점을 둡니다. 필요한 경우이 패턴을 통해 한 번에 하나씩 빌딩 블록을 교체 할 수 있습니다.

가중 된 tff.aggregators.MeanFactory 가 그 예입니다. 구현은 클라이언트에서 제공된 값과 가중치를 곱한 다음 가중치 값과 가중치를 독립적으로 합한 다음 가중치 합계를 서버의 가중치 합계로 나눕니다. tff.federated_sum 연산자를 직접 사용하여 합계를 구현하는 대신 합계가 tff.aggregators.SumFactory 두 인스턴스에 위임됩니다.

이러한 구조는 두 개의 기본 합계를 다른 공장으로 대체하여 합계를 다르게 실현할 수 있도록합니다. 예를 들어, tff.aggregators.SecureSumFactory , 또는의 사용자 지정 구현 tff.aggregators.UnweightedAggregationFactory . 반대로 시간, tff.aggregators.MeanFactory 는 평균화하기 전에 값을 tff.aggregators.clipping_factory 경우 tff.aggregators.clipping_factory 와 같은 다른 팩토리의 내부 집계가 될 수 있습니다.

tff.aggregators 모듈의 기존 팩토리를 사용하는 작성 메커니즘의 권장 사용에 대한 학습 튜토리얼을 위한 이전 튜닝 권장 집계를 참조하십시오.

사례 별 모범 사례

간단한 예제 작업을 구현하여 tff.aggregators 개념을 자세히 설명하고 점진적으로 더 일반적으로 만들 것입니다. 배우는 또 다른 방법은 기존 공장의 구현을 보는 것입니다.

import collections
import tensorflow as tf
import tensorflow_federated as tff

value 을 합산하는 대신 예제 작업은 value * 2.0 을 합한 다음 합을 2.0 나누는 것입니다. 따라서 집계 결과는 value 을 직접 합산하는 것과 수학적으로 동일하며 (1) 클라이언트에서 스케일링 (2) 클라이언트에서 합산 ​​(3) 서버에서 스케일링 해제의 세 부분으로 구성되어 있다고 생각할 수 있습니다.

위에서 설명한 디자인에 따라 로직은 집계 할 value_type 이 주어지면 적절한 tff.templates.AggregationProcess 를 생성하는 tff.aggregators.UnweightedAggregationFactory 의 하위 클래스로 구현됩니다.

최소한의 구현

예제 작업의 경우 필요한 계산은 항상 동일하므로 상태를 사용할 필요가 없습니다. 따라서 비어 있으며 tff.federated_value((), tff.SERVER) 됩니다. 지금은 측정도 마찬가지입니다.

따라서 작업의 최소 구현은 다음과 같습니다.

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

모든 것이 예상대로 작동하는지 여부는 다음 코드로 확인할 수 있습니다.

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

상태 저장 및 측정

상태 저장은 TFF에서 광범위하게 사용되어 반복적으로 실행되고 각 반복에 따라 변경 될 것으로 예상되는 계산을 나타냅니다. 예를 들어, 학습 계산 상태에는 학습중인 모델의 가중치가 포함됩니다.

집계 계산에서 상태를 사용하는 방법을 설명하기 위해 예제 작업을 수정합니다. value2.0 을 곱하는 대신에 집계가 실행 된 횟수 인 반복 인덱스를 곱합니다.

이를 위해서는 상태 개념을 통해 달성되는 반복 인덱스를 추적 할 수있는 방법이 필요합니다. initialize_fn 에서 빈 상태를 만드는 대신 상태를 스칼라 0으로 초기화합니다. 그런 다음 상태는 next_fn 에서 (1) 1.0 씩 증가, (2) value 을 곱하는 데 사용, (3) 새로 업데이트 된 상태로 반환의 세 단계로 사용할 수 있습니다.

이 작업이 완료되면 다음 사항에 유의할 수 있습니다. 그러나 위와 똑같은 코드를 사용하여 예상대로 모든 작업을 확인할 수 있습니다. 실제로 변경된 것이 있는지 어떻게 알 수 있습니까?

좋은 질문! 여기에서 측정 개념이 유용 해집니다. 일반적으로 측정은 모니터링에 사용할 수있는 next 기능의 단일 실행과 관련된 모든 값을보고 할 수 있습니다. 이 경우 이전 예의 summed_value 일 수 있습니다. 즉, 반복 인덱스에 따라 달라져야하는 "크기 조정 해제"단계 이전의 값입니다. 다시 말하지만 이것은 실제로 유용하지는 않지만 관련 메커니즘을 보여줍니다.

따라서 작업에 대한 상태 저장 답변은 다음과 같습니다.

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

입력으로 next_fn 이되는 state 는 서버에 배치됩니다. 클라이언트에서 사용하려면 tff.federated_broadcast 연산자를 사용하여 통신해야합니다.

모든 작업이 예상대로 작동하는지 확인하기 위해 이제보고 된 measurements 볼 수 있습니다. 이는 동일한 client_data 실행하더라도 각 실행 라운드마다 달라야합니다.

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

구조화 된 유형

연합 학습에서 훈련 된 모델의 모델 가중치는 일반적으로 단일 텐서가 아닌 텐서 모음으로 표시됩니다. TFF에서 이것은 tff.StructType 으로 표시되며 일반적으로 유용한 집계 팩토리는 구조화 된 유형을 수용 할 수 있어야합니다.

그러나 위의 예에서는 tff.TensorType 객체로만 작업했습니다. 이전 팩토리를 사용하여 tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) 집계 프로세스를 만들려고하면 이상한 오류가 발생합니다. TensorFlow는 tf.Tensorlist 를 곱하려고 list .

문제는 텐서의 구조에 상수를 곱하는 대신 구조의 각 텐서에 상수를 곱해야한다는 것입니다. 이 문제에 대한 일반적인 해결책은 생성 된 tff.tf_computation 내부에서 tf.nest 모듈을 사용하는 tf.nest 입니다.

구조화 된 유형과 호환되는 이전 ExampleTaskFactory 의 버전은 다음과 같습니다.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

이 예제는 TFF 코드를 구조화 할 때 따라야 할 유용한 패턴을 강조합니다. 매우 간단한 작업을 처리하지 않을 때 tff.tf_computation 내부의 빌딩 블록으로 사용될 tff.federated_computation 이 별도의 위치에 생성 될 때 코드가 더 읽기 쉬워집니다. tff.federated_computation 내부에서 이러한 빌딩 블록은 내장 연산자를 사용해서 만 연결됩니다.

예상대로 작동하는지 확인하려면 :

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

내부 집계

마지막 단계는 다른 집계 기술을 쉽게 구성 할 수 있도록 실제 집계를 다른 팩토리에 위임 할 수 있도록 선택적으로 활성화하는 것입니다.

이 옵션으로 생성함으로써 달성 inner_factory 우리의 생성자에 인수 ExampleTaskFactory . 지정되지 않으면, tff.aggregators.SumFactory 적용되는 사용 tff.federated_sum 이전 섹션에서 직접 사용할 연산자.

create 가 호출되면 먼저 inner_factory create 를 호출하여 동일한 value_type 내부 집계 프로세스를 생성 할 수 있습니다.

initialize_fn 이 반환하는 프로세스의 상태는 "this"프로세스에 의해 생성 된 상태와 방금 생성 된 내부 프로세스의 상태라는 두 부분으로 구성됩니다.

next_fn 의 구현은 실제 집계가 내부 프로세스의 next 기능에 위임된다는 점과 최종 출력이 구성되는 방식이 다릅니다. 상태는 다시 "this"및 "inner"상태로 구성되고 측정은 OrderedDict 와 유사한 방식으로 구성됩니다.

다음은 이러한 패턴의 구현입니다.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

받는 사람 위임 할 때 inner_process.next 기능, 우리가 얻을 수익 구조는이다 tff.templates.MeasuredProcessOutput - 같은 세 개의 필드, state , resultmeasurements . 구성된 집계 프로세스의 전체 반환 구조를 생성 할 때 statemeasurements 필드는 일반적으로 구성되고 함께 반환되어야합니다. 반대로 result 필드는 집계되는 값에 해당하며 대신 구성된 집계를 "흐르는"것입니다.

state 객체는 팩토리의 구현 세부 사항으로 간주되어야하며, 따라서 구성은 모든 구조가 될 수 있습니다. 그러나 measurements 값은 특정 시점에서 사용자에게보고되는 값에 해당합니다. 따라서 구성에서보고 된 측정 항목의 출처가 명확하도록 구성된 명명과 함께 OrderedDict 를 사용하는 것이 좋습니다.

tff.federated_zip 연산자의 사용도 참고하십시오. 생성 된 프로세스에 의해 제어되는 state 객체는 tff.FederatedType 이어야합니다. 대신 initialize_fn 에서 (this_state, inner_state) 를 반환했다면 반환 유형 서명은 tff.StructType 의 2- 튜플을 포함하는 tff.FederatedType 됩니다. tff.federated_zip 사용하면 tff.FederatedType 이 최상위 수준으로 "상승"됩니다. 이는 반환 할 상태 및 측정을 준비 할 때 next_fn 에서 유사하게 사용됩니다.

마지막으로 이것이 기본 내부 집계와 함께 사용되는 방법을 볼 수 있습니다.

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... 그리고 다른 내부 집계를 사용합니다. 예를 들어, ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

요약

이 자습서에서는 집계 팩토리로 표현되는 범용 집계 빌딩 블록을 만들기 위해 따라야 할 모범 사례를 설명했습니다. 일반성은 두 가지 방식으로 설계 의도를 통해 제공됩니다.

  1. 매개 변수화 된 계산. 집계는 tff.aggregators 와 함께 작동하도록 설계된 다른 TFF 모듈에 연결하여 필요한 집계 (예 : tff.learning.build_federated_averaging_process 를 매개 변수화 할 수있는 독립 빌딩 블록입니다.
  2. 집계 구성. 집계 빌딩 블록을 다른 집계 빌딩 블록과 함께 구성하여 더 복잡한 복합 집계를 만들 수 있습니다.