日付を保存! Google I / Oが5月18日から20日に戻ってきます今すぐ登録
このページは Cloud Translation API によって翻訳されました。
Switch to English

カスタムアグリゲーションの実装

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロードする

このチュートリアルでは、 tff.aggregatorsモジュールの背後にある設計原則と、クライアントからサーバーへの値のカスタム集計を実装するためのベストプラクティスについて説明します。

前提条件。このチュートリアルは、配置( tff.SERVERtff.CLIENTS )、TFFが計算を表す方法( tff.tf_computationtff.federated_computation )、およびそれらの型シグネチャなど、 FederatedCoreの基本概念に既に精通していることを前提としています。

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

設計概要

TFFでは、「集約」とは、 tff.CLIENTS一連の値をtff.CLIENTSして、 tff.SERVER同じタイプの集約値を生成することをtff.SERVERます。つまり、個々のクライアントの値が利用可能である必要はありません。たとえば、フェデレーションラーニングでは、クライアントモデルの更新が平均化されて、サーバー上のグローバルモデルに適用される集約モデルの更新が取得されます。

tff.federated_sumなどのこの目標を達成する演算子に加えて、TFFはtff.templates.AggregationProcessステートフルプロセス)を提供します。これは、集計計算の型シグネチャを形式化して、単純な合計よりも複雑な形式に一般化できるようにします。

tff.aggregatorsモジュールの主なコンポーネントは、 AggregationProcessを作成するためのファクトリです。これは、2つの側面でTFFの一般的に有用で交換可能なビルディングブロックとして設計されています。

  1. パラメータ化された計算。アグリゲーションは、 tff.aggregatorstff.aggregatorsして必要なアグリゲーションをパラメーター化するように設計された他のTFFモジュールにプラグインできる独立したビルディングブロックです。

例:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. 集合体の構成。集約ビルディングブロックを他の集約ビルディングブロックと組み合わせて、より複雑な複合集約を作成できます。

例:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

このチュートリアルの残りの部分では、これら2つの目標がどのように達成されるかについて説明します。

集約プロセス

最初にtff.templates.AggregationProcessを要約し、 tff.templates.AggregationProcessファクトリパターンを作成します。

tff.templates.AggregationProcessあるtff.templates.MeasuredProcess集合に対して指定型シグネチャを有します。特に、 initialize関数とnext関数には、次のタイプシグネチャがあります。

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

(タイプstate_type )状態はサーバーに配置する必要があります。 next関数は、クライアントに配置された( value_type型の)集計される状態と値を入力引数として受け取ります。 *は、オプションの他の入力引数を意味します。たとえば、加重平均の重みです。更新された状態オブジェクト、サーバーに配置された同じタイプの集計値、およびいくつかの測定値を返します。

状態の両方が実行の間に渡されることに注意してくださいnext関数、及び報告された測定値は、特定の実行に応じて任意の情報を報告することを意図するものでnextの機能を、空であってもよいです。それにもかかわらず、TFFの他の部分が従うべき明確な契約を持つためには、それらを明示的に指定する必要があります。

他のTFFモジュール、たとえばtff.learningのモデル更新では、 tff.templates.AggregationProcessを使用して、値の集計方法をパラメーター化することが期待されています。ただし、集計された値とその型シグネチャが正確に何であるかは、トレーニングされるモデルの他の詳細と、それを実行するために使用される学習アルゴリズムによって異なります。

集計を計算の他の側面から独立させるために、ファクトリパターンを使用します。ファクトリのcreateメソッドを呼び出して、集計するオブジェクトの関連する型シグネチャが利用可能になったら、適切なtff.templates.AggregationProcessを作成します。したがって、集約プロセスの直接処理は、この作成を担当するライブラリ作成者にのみ必要です。

集約プロセスファクトリ

加重されていない集計と加重された集計には、2つの抽象基本ファクトリクラスがあります。それらのcreateメソッドは、集約される値の型シグネチャをtff.templates.AggregationProcess 、そのような値の集約のためにtff.templates.AggregationProcessを返します。

tff.aggregators.UnweightedAggregationFactoryによって作成されたプロセスは、(1)サーバーでの状態と(2)指定されたタイプvalue_type値の2つの入力引数を取ります。

実装例はtff.aggregators.SumFactoryです。

tff.aggregators.WeightedAggregationFactoryによって作成されたプロセスは、(1)サーバーでの状態、(2)指定されたタイプvalue_type値、および(3)ファクトリのユーザーがcreateメソッドを呼び出すときに指定したタイプweight_type重みの3つの入力引数を取ります。

実装例は、加重平均を計算するtff.aggregators.MeanFactoryです。

ファクトリパターンは、上記の最初の目標を達成する方法です。その集約は独立した構成要素です。たとえば、トレーニング可能なモデル変数を変更する場合、複雑な集計を変更する必要はありません。それを表すファクトリは、 tff.learning.build_federated_averaging_processなどのメソッドで使用されると、異なるタイプのシグネチャで呼び出されtff.learning.build_federated_averaging_process

作文

一般的な集計プロセスでは、(a)クライアントでの値の前処理、(b)クライアントからサーバーへの値の移動、および(c)サーバーでの集計値の後処理をカプセル化できることを思い出してください。上記の2番目の目標であるアグリゲーションコンポジションは、パート(b)を別のアグリゲーションファクトリに委任できるようにアグリゲーションファクトリの実装を構造化することにより、 tff.aggregatorsモジュール内で実現されます。

単一のファクトリクラス内に必要なすべてのロジックを実装するのではなく、実装はデフォルトで、集約に関連する単一の側面に焦点を合わせています。必要に応じて、このパターンにより、ビルディングブロックを一度に1つずつ交換できます。

例として、加重tff.aggregators.MeanFactoryます。その実装は、クライアントで提供された値と重みを乗算し、重み付き値と重みの両方を個別に合計してから、重み付き値の合計をサーバーでの重みの合計で除算します。 tff.federated_sum演算子を直接使用して合計を実装する代わりに、合計はtff.aggregators.SumFactory 2つのインスタンスに委任されます。

このような構造により、2つのデフォルトの合計を、合計を異なる方法で実現する異なるファクトリに置き換えることができます。例えば、 tff.aggregators.SecureSumFactory 、またはのカスタム実装tff.aggregators.UnweightedAggregationFactory 。逆に、平均化する前に値をクリップする場合は、 tff.aggregators.MeanFactory自体をtff.aggregators.clipping_factoryなどの別のファクトリの内部集約にすることができます。

tff.aggregatorsモジュールの既存のファクトリを使用した構成メカニズムの推奨される使用法については、学習チュートリアルについて、以前のチューニング推奨集計を参照してください。

例によるベストプラクティス

簡単なサンプルタスクを実装することにより、 tff.aggregators概念を詳細に説明し、徐々に一般的にします。学ぶ別の方法は、既存の工場の実装を調べることです。

import collections
import tensorflow as tf
import tensorflow_federated as tff

valueを合計する代わりに、タスクの例は、 value * 2.0を合計してから、その合計を2.0除算することです。したがって、集計結果は、 valueを直接合計することと数学的に同等であり、(1)クライアントでのスケーリング(2)クライアント間での合計(3)サーバーでのスケーリング解除の3つの部分で構成されていると考えることができます。

上で説明した設計に従って、ロジックはtff.aggregators.UnweightedAggregationFactoryサブクラスとして実装されtff.aggregators.UnweightedAggregationFactory 。これにより、集計するvalue_typeが指定されると、適切なtff.templates.AggregationProcessが作成されます。

最小限の実装

サンプルタスクの場合、必要な計算は常に同じであるため、stateを使用する必要はありません。したがって、これは空であり、 tff.federated_value((), tff.SERVER)として表されます。今のところ、同じことが測定にも当てはまります。

したがって、タスクの最小限の実装は次のとおりです。

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

すべてが期待どおりに機能するかどうかは、次のコードで確認できます。

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

ステートフルネスと測定

ステートフルネスはTFFで広く使用されており、反復的に実行され、反復ごとに変化すると予想される計算を表します。たとえば、学習計算の状態には、学習中のモデルの重みが含まれます。

集計計算で状態を使用する方法を説明するために、サンプルタスクを変更します。 value2.0を掛ける代わりに、反復インデックス(集計が実行された回数)を掛けます。

そのためには、状態の概念を通じて達成される反復インデックスを追跡する方法が必要です。 initialize_fnでは、空の状態を作成する代わりに、状態をスカラーゼロに初期化します。次に、状態をnext_fnで3つのステップで使用できます。(1) 1.0ずつインクリメントする、(2) valueを乗算するために使用する、(3)新しい更新された状態として返す。

これが完了すると、次のことに注意してください。ただし、上記とまったく同じコードを使用して、すべての動作を期待どおりに検証できます。何かが実際に変わったことをどうやって知ることができますか?

良い質問!ここで、測定の概念が役立ちます。一般に、測定値は、 next関数の1回の実行に関連する任意の値を報告できます。これは、監視に使用できます。この場合、前の例のsummed_valueすることができます。つまり、「スケール解除」ステップの前の値であり、反復インデックスに依存する必要があります。繰り返しますが、これは実際には必ずしも有用ではありませんが、関連するメカニズムを示しています。

したがって、タスクに対するステートフルな回答は次のようになります。

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

入力としてnext_fn入るstateは、サーバーに配置されることに注意してください。クライアントで使用するには、最初に通信する必要があります。これは、 tff.federated_broadcast演算子を使用してtff.federated_broadcastます。

すべてが期待どおりに機能することを確認するために、報告されたmeasurementsを確認できます。これは、同じclient_data実行された場合でも、実行の各ラウンドで異なるはずclient_data

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

構造化タイプ

連合学習で訓練されたモデルのモデルの重みは、通常、単一のテンソルではなく、テンソルのコレクションとして表されます。 TFFでは、これはtff.StructTypeとして表され、一般的に有用な集約ファクトリは構造化タイプを受け入れることができる必要があります。

ただし、上記の例では、 tff.TensorTypeオブジェクトのみを使用しました。以前のファクトリを使用してtff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])で集計プロセスを作成しようとすると、奇妙なエラーが発生します。 TensorFlowは、 tf.Tensorlistを乗算しようとしlist

問題は、テンソルの構造に定数を掛ける代わりに、構造内の各テンソルに定数を掛ける必要があることです。この問題の通常の解決策は、作成されたtff.tf_computation内でtf.nestモジュールを使用するtf.nestです。

したがって、構造化タイプと互換性のある以前のExampleTaskFactoryのバージョンは次のようになります。

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

この例では、TFFコードを構造化するときに従うのに役立つ可能性のあるパターンを強調しています。非常に単純な操作を処理しない場合、 tff.tf_computation内のビルディングブロックとして使用されるtff.federated_computationが別の場所に作成されると、コードが読みやすくなります。 tff.federated_computation内では、これらのビルディングブロックは組み込み演算子を使用してのみ接続されます。

期待どおりに機能することを確認するには:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

内部集計

最後のステップは、さまざまな集約手法を簡単に構成できるようにするために、オプションで実際の集約を他のファクトリに委任できるようにすることです。

これは、 ExampleTaskFactoryコンストラクターでオプションのinner_factory引数を作成することで実現されます。指定しない場合、 tff.aggregators.SumFactoryが使用されます。これは、前のセクションで直接使用されたtff.federated_sum演算子を適用します。

createが呼び出されると、最初にinner_factory createを呼び出して、同じvalue_typeで内部集計プロセスを作成できます。

initialize_fnによって返されるプロセスの状態は、「this」プロセスによって作成された状態と、作成されたばかりの内部プロセスの状態の2つの部分で構成されます。

next_fnの実装は、実際の集計が内部プロセスのnext関数に委任される点と、最終出力の構成方法が異なります。状態も「this」状態と「inner」状態で構成され、測定値はOrderedDictと同様の方法で構成されます。

以下は、そのようなパターンの実装です。

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

inner_process.next関数に委任すると、取得する戻り構造はtff.templates.MeasuredProcessOutputであり、同じ3つのフィールド( stateresultmeasurementsます。構成された集計プロセスの全体的な戻り構造を作成する場合、 stateフィールドとmeasurementsフィールドは通常、一緒に構成されて返される必要があります。対照的に、 resultフィールドは集計される値に対応し、代わりに構成された集計を「フロースルー」します。

stateオブジェクトは、ファクトリの実装の詳細と見なす必要があるため、構成は任意の構造にすることができます。ただし、 measurements値は、ある時点でユーザーに報告される値に対応します。したがって、報告されたメトリックが構成のどこから来ているのかが明確になるように、構成された名前を付けてOrderedDictを使用することをお勧めします。

tff.federated_zip演算子の使用にも注意してください。作成されたプロセスによって制御されるstateオブジェクトは、 tff.FederatedType必要がありtff.FederatedType 。私たちが代わりに戻っていた場合(this_state, inner_state)initialize_fn 、その戻り値の型シグネチャは次のようになりtff.StructTypeの2要素のタプル含むtff.FederatedType秒。 tff.federated_zip使用すると、 tff.FederatedTypeがトップレベルに「リフト」されます。これは、 next_fnれる状態と測定値を準備するときに、 next_fnでも同様に使用されます。

最後に、これをデフォルトの内部集計でどのように使用できるかを確認できます。

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

...そして異なる内部集合体で。たとえば、 ExampleTaskFactory

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

概要

このチュートリアルでは、アグリゲーションファクトリとして表される汎用アグリゲーションビルディングブロックを作成するために従うべきベストプラクティスについて説明しました。一般性は、次の2つの方法で設計意図によってもたらされます。

  1. パラメータ化された計算。凝集はで動作するように設計された他のTFFモジュールにプラグインすることができる独立したビルディングブロックであるtff.aggregatorsようなそれらの必要な集合をパラメータ化するために、 tff.learning.build_federated_averaging_process
  2. 集合体の構成。集約ビルディングブロックを他の集約ビルディングブロックと組み合わせて、より複雑な複合集約を作成できます。