独自の統合学習アルゴリズムの構築

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示 ノートブックをダウンロード

始める前に

開始する前に、以下を実行して、環境が正しくセットアップされていることを確認してください。あなたが挨拶が表示されない場合は、を参照してください。インストールの手順についてのガイド。

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio.apply()
import tensorflow as tf
import tensorflow_federated as tff

では画像分類テキスト生成チュートリアル、我々は連合学習(FL)のためのモデルとデータパイプラインを設定する方法を学び、経由連合訓練を実施しtff.learning TFFのAPI層。

これは、FLの研究に関しては氷山の一角にすぎません。このチュートリアルでは、我々はに延期することなく、連合学習アルゴリズムを実装する方法について説明しtff.learning API。私たちは以下を達成することを目指しています:

目標:

  • 連合学習アルゴリズムの一般的な構造を理解します。
  • TFFの連携コアを探検。
  • Federated Coreを使用して、FederatedAveragingを直接実装します。

このチュートリアルでは、自己完結型ですが、我々は読んで最初にお勧めの画像の分類テキスト生成チュートリアルを。

入力データの準備

まず、TFFに含まれているEMNISTデータセットをロードして前処理します。詳細については、参照画像分類のチュートリアルを。

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

我々のモデルにデータセットを供給するために、我々は、データをフラット化し、フォームのタプルに各例を変換する(flattened_image_vector, label)

NUM_CLIENTS = 10
BATCH_SIZE = 20

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch of EMNIST data and return a (features, label) tuple."""
    return (tf.reshape(element['pixels'], [-1, 784]), 
            tf.reshape(element['label'], [-1, 1]))

  return dataset.batch(BATCH_SIZE).map(batch_format_fn)

ここで、少数のクライアントを選択し、上記の前処理をそれらのデータセットに適用します。

client_ids = sorted(emnist_train.client_ids)[:NUM_CLIENTS]
federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
  for x in client_ids
]

モデルの準備

我々は内と同じモデルを使用して画像分類のチュートリアル。 (介して実装このモデルtf.keras )ソフトマックス層が続く単一の隠れ層を有しています。

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])

TFFにこのモデルを使用するために、我々はとしてKerasモデルをラップtff.learning.Model 。これは、私たちは、モデルの実行を可能にする往路TFF内を、そしてモデルの出力を取り出します。詳細については、も参照画像分類のチュートリアルを。

def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=federated_train_data[0].element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

私たちが使用している間tf.keras作成するtff.learning.Model 、TFFは、はるかに一般的なモデルをサポートしています。これらのモデルには、モデルの重みを取得する次の関連属性があります。

  • trainable_variables :学習可能な層に対応するテンソルの反復可能。
  • non_trainable_variables :非トレーニング可能な層に対応するテンソルの反復可能。

我々の目的のために、我々は、のみを使用しますtrainable_variables 。 (私たちのモデルにはそれらしかありません!)。

独自の連合学習アルゴリズムの構築

一方でtff.learning APIは1がフェデレーテッド平均化の多くの亜種を作成することができ、このフレームワークにきちんと適合しない他の連合のアルゴリズムがあります。たとえば、次のような正則、クリッピング、またはより複雑なアルゴリズムを追加することも連合GANトレーニング。また、その代わりに興味があることも連合分析

これらのより高度なアルゴリズムでは、TFFを使用して独自のカスタムアルゴリズムを作成する必要があります。多くの場合、フェデレーションアルゴリズムには4つの主要なコンポーネントがあります。

  1. サーバーからクライアントへのブロードキャストステップ。
  2. ローカルクライアントの更新手順。
  3. クライアントからサーバーへのアップロード手順。
  4. サーバーの更新手順。

TFFにおいて、我々は、一般的に、フェデレーテッド・アルゴリズム表すtff.templates.IterativeProcess (先ほどと呼ぶIterativeProcess全体を)。これが含まれているクラスでinitializeし、 next機能を。ここでは、 initializeサーバーを初期化するために使用され、そしてnext連合アルゴリズムの一つの通信ラウンドを実行します。 FedAvgの反復プロセスがどのようになるかについてのスケルトンを書いてみましょう。

まず、我々は、単に作成し、初期化機能持っtff.learning.Model 、そしてそのトレーニング可能なウェイトを返します。

def initialize_fn():
  model = model_fn()
  return model.trainable_variables

この関数は見栄えがしますが、後で説明するように、「TFF計算」にするために小さな変更を加える必要があります。

また、スケッチしたいnext_fn

def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = client_update(federated_dataset, server_weights_at_client)

  # The server averages these updates.
  mean_client_weights = mean(client_weights)

  # The server updates its model.
  server_weights = server_update(mean_client_weights)

  return server_weights

これらの4つのコンポーネントを個別に実装することに焦点を当てます。まず、純粋なTensorFlowで実装できる部分、つまりクライアントとサーバーの更新手順に焦点を当てます。

TensorFlowブロック

クライアントの更新

私たちは、使用するtff.learning.ModelあなたはTensorFlowモデルを訓練するだろうと本質的に同じ方法で、クライアントのトレーニングを行うこと。特に、我々は、使用するtf.GradientTape 、その後使用してこれらの勾配を適用し、データのバッチで勾配を計算するためにclient_optimizer 。トレーニング可能なウェイトのみに焦点を当てています。

@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
  """Performs training (using the server model weights) on the client's dataset."""
  # Initialize the client model with the current server weights.
  client_weights = model.trainable_variables
  # Assign the server weights to the client model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        client_weights, server_weights)

  # Use the client_optimizer to update the local model.
  for batch in dataset:
    with tf.GradientTape() as tape:
      # Compute a forward pass on the batch of data
      outputs = model.forward_pass(batch)

    # Compute the corresponding gradient
    grads = tape.gradient(outputs.loss, client_weights)
    grads_and_vars = zip(grads, client_weights)

    # Apply the gradient using a client optimizer.
    client_optimizer.apply_gradients(grads_and_vars)

  return client_weights

サーバーの更新

FedAvgのサーバー更新は、クライアント更新よりも簡単です。サーバーモデルの重みをクライアントモデルの重みの平均に置き換えるだけの「バニラ」フェデレーション平均を実装します。繰り返しになりますが、トレーニング可能なウェイトのみに焦点を当てています。

@tf.function
def server_update(model, mean_client_weights):
  """Updates the server model weights as the average of the client model weights."""
  model_weights = model.trainable_variables
  # Assign the mean client weights to the server model.
  tf.nest.map_structure(lambda x, y: x.assign(y),
                        model_weights, mean_client_weights)
  return model_weights

スニペットは、単純に返すことによって単純化することができmean_client_weights 。しかし、連合アベレージ利用のより高度な実装mean_client_weightsな勢いや適応など、より洗練された技術、と。

チャレンジ:のバージョンを実装server_update model_weightsとmean_client_weightsの中間点であることをサーバーの重みを更新します。 (注:「中間点」この種のアプローチは、上の最近の作品に似ている先読みオプティマイザ!)。

これまでのところ、純粋なTensorFlowコードのみを記述しました。 TFFを使用すると、既に使い慣れているTensorFlowコードの多くを使用できるため、これは仕様によるものです。しかし、今、私たちは、オーケストレーション・ロジック、おもむくままクライアントにどのようなサーバーブロードキャスト、およびどのようなクライアントのアップロード、そのサーバーへのロジックを指定する必要があります。

これは、TFFの連携のコアが必要なります。

フェデレーションコアの概要

フェデレーテッド・コア(FC)のための基礎として役立つ低レベルインタフェースのセットであるtff.learning API。ただし、これらのインターフェースは学習に限定されません。実際、分散データの分析やその他の多くの計算に使用できます。

大まかに言えば、フェデレーションコアは、コンパクトに表現されたプログラムロジックがTensorFlowコードを分散通信演算子(分散合計やブロードキャストなど)と組み合わせることができるようにする開発環境です。目標は、システム実装の詳細(ポイントツーポイントネットワークメッセージ交換の指定など)を必要とせずに、研究者や実務家がシステム内の分散通信を明示的に制御できるようにすることです。

重要な点の1つは、TFFがプライバシー保護のために設計されていることです。したがって、データが存在する場所を明示的に制御して、中央のサーバーの場所にデータが不要に蓄積されるのを防ぐことができます。

連合データ

TFFの重要な概念は「連合データ」です。これは、分散システム内のデバイスのグループ全体でホストされているデータ項目のコレクションを指します(クライアントデータセットやサーバーモデルの重みなど)。我々は、単一の統合値として、すべてのデバイス間でのデータ項目のコレクション全体をモデル化します。

たとえば、センサーの温度を表すフロートがそれぞれにあるクライアントデバイスがあるとします。私たちは、連合フロートによると、それを表すことができ

federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

フェデレーテッド・タイプは、タイプによって指定されるT加盟成分の(例えばtf.float32 )およびグループGのデバイス。私たちは、例に焦点を当てるGどちらかであるtff.CLIENTStff.SERVER 。そのような連合型は、以下のように表される。 {T}@G以下に示すように、。

str(federated_float_on_clients)
'{float32}@CLIENTS'

なぜ私たちは配置をそれほど気にするのですか? TFFの主な目標は、実際の分散システムにデプロイできるコードを記述できるようにすることです。これは、デバイスのどのサブセットがどのコードを実行し、さまざまなデータがどこにあるかを推論することが重要であることを意味します。

TFFは三つのことに焦点を当て:データ、データが配置され、データがどのように変換されます。最後に、フェデレーテッド・計算中に封入されている間最初の二つは、フェデレーテッド型内にカプセル化されています。

フェデレーション計算

TFFは、その基本的な単位連合計算され、厳密に型指定された関数型プログラミング環境です。これらは、フェデレーション値を入力として受け入れ、フェデレーション値を出力として返すロジックの一部です。

たとえば、クライアントセンサーの温度を平均したいとします。以下を定義できます(フェデレーションフロートを使用)。

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
  return tff.federated_mean(client_temperatures)

あなたは、これは異なっているか、頼むかもしれないtf.function TensorFlowでデコレータ?キー答えはで生成されたコードであることをtff.federated_computation TensorFlowやPythonのコードでもありません。これは、内部プラットフォームに依存しない接着剤言語における分散システムの仕様です。

これは複雑に聞こえるかもしれませんが、TFF計算は、明確に定義された型シグネチャを持つ関数と考えることができます。これらのタイプシグネチャは直接クエリできます。

str(get_average_temperature.type_signature)
'({float32}@CLIENTS -> float32@SERVER)'

このtff.federated_computation 、連合型の引数を受け付け{float32}@CLIENTS 、そして連合型の戻り値{float32}@SERVER 。フェデレーション計算は、サーバーからクライアントへ、クライアントからクライアントへ、またはサーバーからサーバーへと移動する場合もあります。フェデレーション計算は、タイプシグネチャが一致する限り、通常の関数のように構成することもできます。

開発をサポートするために、TFFは、あなたが呼び出すことができますtff.federated_computation Pythonの関数としての。たとえば、

get_average_temperature([68.5, 70.3, 69.8])
69.53334

非熱心な計算とTensorFlow

注意すべき2つの重要な制限があります。 Pythonインタプリタが発生したときにまず、 tff.federated_computationデコレータを、関数が一度にトレースし、将来の使用のためにシリアライズされます。連合学習は分散型であるため、この将来の使用は、リモート実行環境など、他の場所で発生する可能性があります。したがって、TFFの計算は基本的に非熱望しています。この動作は、のようにやや似ているtf.function TensorFlowでデコレータ。

第二に、フェデレーテッド計算のみ(例えば、フェデレーテッド・オペレータから成ることができるtff.federated_mean )、それらはTensorFlow操作を含むことはできません。 TensorFlowコードが飾らブロックに限定されなければならないtff.tf_computation 。最も一般的なTensorFlowコードを直接、そのような数をとり、追加し、次の関数として、装飾することができる0.5そこに。

@tff.tf_computation(tf.float32)
def add_half(x):
  return tf.add(x, 0.5)

これらもなく、配置せず、型シグネチャを持っています。たとえば、

str(add_half.type_signature)
'(float32 -> float32)'

ここでは、重要な違いを参照tff.federated_computationtff.tf_computation 。前者には明示的な配置がありますが、後者にはありません。

我々は使用することができますtff.tf_computation配置を指定することで、連合計算にブロックを。半分を追加する関数を作成しましょう。ただし、クライアントのフェデレーションフロートにのみ追加します。私たちは、使用してこれを行うことができますtff.federated_map与えられた適用され、 tff.tf_computation配置を維持しながら、。

@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def add_half_on_clients(x):
  return tff.federated_map(add_half, x)

この機能はほとんど同じであるadd_half 、それだけで配置して値を受け入れることを除いてtff.CLIENTS同じ配置で、戻り値。これは、タイプシグネチャで確認できます。

str(add_half_on_clients.type_signature)
'({float32}@CLIENTS -> {float32}@CLIENTS)'

要約すれば:

  • TFFはフェデレーション値で動作します。
  • 各連合値はタイプ(例えば。で、連合型を持つtf.float32 )と配置(例えば。 tff.CLIENTS )。
  • フェデレーション値で装飾されなければならない連合計算を用いて形質転換することができるtff.federated_computationと連合型シグネチャを。
  • TensorFlowコードを持つブロックに含まれている必要がありtff.tf_computationデコレータ。
  • これらのブロックは、フェデレーション計算に組み込むことができます。

独自の連合学習アルゴリズムの構築、再検討

連合コアを垣間見ることができたので、独自の連合学習アルゴリズムを構築できます。上記のことを覚えておいてください、私たちは、定義されたinitialize_fnnext_fn当社アルゴリズムのを。 next_fnの使用になりますclient_updateserver_update私たちは純粋なTensorFlowコードを使用して定義します。

しかし、私たちのアルゴリズム連合計算を行うために、我々は両方が必要になりますnext_fninitialize_fnもそれぞれにtff.federated_computation

TensorFlowフェデレーションブロック

初期化計算の作成

初期化関数は、非常に簡単になります。私たちは、使用してモデルを作成しますmodel_fn 。しかし、私たちが使用して私たちのTensorFlowコードを分離する必要があることを覚えておいてくださいtff.tf_computation

@tff.tf_computation
def server_init():
  model = model_fn()
  return model.trainable_variables

私たちは、その後、使用して計算連合に直接これを渡すことができtff.federated_value

@tff.federated_computation
def initialize_fn():
  return tff.federated_value(server_init(), tff.SERVER)

作成next_fn

ここで、クライアントとサーバーの更新コードを使用して、実際のアルゴリズムを記述します。私たちは、最初に、私たちになりますclient_updatetff.tf_computationクライアントデータセットとサーバーの重みを受け入れ、更新されたクライアントの重みテンソルを出力します。

関数を適切に装飾するには、対応するタイプが必要になります。幸い、サーバーの重みのタイプは、モデルから直接抽出できます。

whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

データセットタイプのシグネチャを見てみましょう。 28 x 28の画像(整数ラベル付き)を取り、それらを平坦化したことを思い出してください。

str(tf_dataset_type)
'<float32[?,784],int32[?,1]>*'

また、当社の使用してモデルの重みの種類を抽出することができserver_init上記の関数を。

model_weights_type = server_init.type_signature.result

タイプシグニチャを調べると、モデルのアーキテクチャを確認できます。

str(model_weights_type)
'<float32[784,10],float32[10]>'

私たちは今、私たちの作成することができtff.tf_computationクライアントアップデートのために。

@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
  model = model_fn()
  client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
  return client_update(model, tf_dataset, server_weights, client_optimizer)

tff.tf_computationサーバーのアップデートのバージョンは、我々が既に抽出しましタイプを使用して、同様の方法で定義することができます。

@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
  model = model_fn()
  return server_update(model, mean_client_weights)

最後に、ではなく、少なくとも、我々は作成する必要がありtff.federated_computationすべて一緒にこれをもたらします。この関数は、1つの(配置とサーバの重みに対応する2つの連合の値を受け入れるtff.SERVER )、他方は(配置とクライアントデータセットに対応tff.CLIENTS )。

これらのタイプは両方とも上記で定義されていることに注意してください。私たちは、単にそれらを使用して適切な配置与える必要がtff.FederatedType

federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)

FLアルゴリズムの4つの要素を覚えていますか?

  1. サーバーからクライアントへのブロードキャストステップ。
  2. ローカルクライアントの更新手順。
  3. クライアントからサーバーへのアップロード手順。
  4. サーバーの更新手順。

上記を構築したので、各部分を1行のTFFコードとしてコンパクトに表すことができます。この単純さのために、フェデレーションタイプなどを指定するために特別な注意を払う必要がありました。

@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
  # Broadcast the server weights to the clients.
  server_weights_at_client = tff.federated_broadcast(server_weights)

  # Each client computes their updated weights.
  client_weights = tff.federated_map(
      client_update_fn, (federated_dataset, server_weights_at_client))

  # The server averages these updates.
  mean_client_weights = tff.federated_mean(client_weights)

  # The server updates its model.
  server_weights = tff.federated_map(server_update_fn, mean_client_weights)

  return server_weights

私たちは、今持っているtff.federated_computation 、およびアルゴリズムの1つのステップを実行するためのアルゴリズムの初期化の両方のために。私たちのアルゴリズムを終了するには、我々はにこれらを渡すtff.templates.IterativeProcess

federated_algorithm = tff.templates.IterativeProcess(
    initialize_fn=initialize_fn,
    next_fn=next_fn
)

型シグネチャで見てみましょうinitializeし、 next私たちの反復プロセスの機能。

str(federated_algorithm.initialize.type_signature)
'( -> <float32[784,10],float32[10]>@SERVER)'

これは事実反映federated_algorithm.initialize (784・バイ・10質量行列、及び10個のバイアス単位で)単層モデルを返す引数なしの関数です。

str(federated_algorithm.next.type_signature)
'(<server_weights=<float32[784,10],float32[10]>@SERVER,federated_dataset={<float32[?,784],int32[?,1]>*}@CLIENTS> -> <float32[784,10],float32[10]>@SERVER)'

ここでは、我々はそれを見federated_algorithm.nextサーバモデルとクライアントのデータを受け取り、更新サーバモデルを返します。

アルゴリズムの評価

いくつかのラウンドを実行して、損失がどのように変化するかを見てみましょう。まず、第二のチュートリアルで説明した集中型アプローチを使用して評価関数を定義します。

最初に一元化された評価データセットを作成し、次にトレーニングデータに使用したのと同じ前処理を適用します。

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients()
central_emnist_test = preprocess(central_emnist_test)

次に、サーバーの状態を受け入れ、Kerasを使用してテストデータセットを評価する関数を記述します。あなたしているお馴染みの場合tf.Kerasノートの使用が、これはすべて、見覚えがあるでしょうset_weights

def evaluate(server_state):
  keras_model = create_keras_model()
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]  
  )
  keras_model.set_weights(server_state)
  keras_model.evaluate(central_emnist_test)

それでは、アルゴリズムを初期化して、テストセットで評価してみましょう。

server_state = federated_algorithm.initialize()
evaluate(server_state)
2042/2042 [==============================] - 2s 767us/step - loss: 2.8479 - sparse_categorical_accuracy: 0.1027

数ラウンドトレーニングして、何かが変わるかどうか見てみましょう。

for round in range(15):
  server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)
2042/2042 [==============================] - 2s 738us/step - loss: 2.5867 - sparse_categorical_accuracy: 0.0980

損失関数のわずかな減少が見られます。ジャンプは小さいですが、15回のトレーニングラウンドのみを実行し、クライアントの小さなサブセットで実行しました。より良い結果を得るには、数千回ではないにしても数百回のラウンドを行う必要があるかもしれません。

アルゴリズムの変更

この時点で、立ち止まって、私たちが達成したことについて考えてみましょう。純粋なTensorFlowコード(クライアントとサーバーの更新用)をTFFのFederated Coreからのフェデレーション計算と組み合わせることにより、FederatedAveragingを直接実装しました。

より洗練された学習を実行するために、上記の内容を変更するだけです。特に、上記の純粋なTFコードを編集することで、クライアントがトレーニングを実行する方法、またはサーバーがモデルを更新する方法を変更できます。

課題:追加勾配クリッピングをするclient_update機能。

より大きな変更を加えたい場合は、サーバーにさらに多くのデータを保存してブロードキャストさせることもできます。たとえば、サーバーはクライアントの学習率を保存し、時間の経過とともに減衰させることもできます。この中で使用されるタイプの署名に変更が必要であろうことに注意tff.tf_computation上記コール。

ハーダーチャレンジ:クライアントにレート崩壊を学習してフェデレーテッド平均化を実装します。

この時点で、このフレームワークに実装できるものにどれほどの柔軟性があるかを理解し始めるかもしれません。 (上記の困難な課題への回答を含む)のアイデアのためには、ソース・コードを見ることができるtff.learning.build_federated_averaging_process 、または様々なチェックアウト研究プロジェクトをTFFを使用しました。