このページは Cloud Translation API によって翻訳されました。
Switch to English

TensorFlowを使用した分散トレーニング

TensorFlow.orgで見る Google Colabで実行 GitHubでソースを表示 ノートブックをダウンロード

概観

tf.distribute.Strategyは、トレーニングを複数のGPU、複数のマシン、またはTPUに分散するTensorFlow APIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。

tf.distribute.Strategyは、次の主要な目標を念頭に置いて設計されています。

  • 使いやすく、研究者、MLエンジニアなどの複数のユーザーセグメントをサポートします。
  • すぐに使用できる優れたパフォーマンスを提供します。
  • 戦略間の簡単な切り替え。

tf.distribute.Strategyのような高レベルAPIで使用することができKeras 、またカスタムトレーニングループ(及び、TensorFlowを使用して、一般的に、任意の計算)を配布するために使用することができます。

TensorFlow 2.xでは、プログラムを熱心に実行したり、 tf.functionを使用してグラフで実行したりできます。 tf.distribute.Strategyは、これらの実行モードの両方をサポートすることを意図していますが、 tf.function最適にtf.functionます。 Eagerモードはデバッグ目的でのみ推奨され、 TPUStrategyではサポートされていません。このガイドではほとんどの場合トレーニングについて説明しますが、このAPIは、さまざまなプラットフォームでの評価と予測の配布にも使用できます。

tf.distribute.Strategyの基礎となるコンポーネントを戦略に対応するように変更したため、ごくわずかなコード変更でtf.distribute.Strategyを使用できます。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、チェックポイントが含まれます。

このガイドでは、さまざまなタイプの戦略と、それらをさまざまな状況で使用する方法について説明します。

# Import TensorFlow
import tensorflow as tf

戦略のタイプ

tf.distribute.Strategyは、さまざまな軸に沿った多数のユースケースをカバーすることを意図しています。これらの組み合わせの一部は現在サポートされており、他の組み合わせは将来追加される予定です。これらの軸の一部は次のとおりです。

  • 同期トレーニングと非同期トレーニング:これらは、データの並列処理を使用してトレーニングを分散する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが入力データに対して独立してトレーニングを行い、変数を非同期で更新します。通常、同期トレーニングは、all-reduceおよびasync through parameter serverアーキテクチャでサポートされています。
  • ハードウェアプラットフォーム: 1台のマシン上の複数のGPU、またはネットワーク内の複数のマシン(それぞれ0個以上のGPU)、またはCloud TPUにトレーニングをスケーリングすることができます。

これらの使用例をサポートするために、6つの戦略が利用可能です。次のセクションでは、現時点でTF 2.2のどのシナリオでこれらのどれがサポートされているかを説明します。簡単な概要は次のとおりです。

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API 支えられる支えられる実験的サポート実験的サポートサポートされる予定の2.3ポスト
カスタムトレーニングループ 支えられる支えられる実験的サポート実験的サポートサポートされる予定の2.3ポスト
Estimator API 限定サポートサポートされていません限定サポート限定サポート限定サポート

MirroredStrategy

tf.distribute.MirroredStrategyは、1台のマシン上の複数のGPUでの同期分散トレーニングをサポートしています。 GPUデバイスごとに1つのレプリカを作成します。モデルの各変数は、すべてのレプリカにミラーリングされます。これらの変数が一緒になって、 MirroredVariableと呼ばれる単一の概念変数を形成します。これらの変数は、同一の更新を適用することによって互いに同期されます。

効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を伝達します。 All-reduceは、テンソルを合計してすべてのデバイスに集約し、各デバイスで使用できるようにします。これは非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信の種類に応じて、多くのオールリデュースアルゴリズムと実装が利用可能です。デフォルトでは、NVIDIA NCCLをall-reduce実装として使用します。私たちが提供する他のいくつかのオプションから選択するか、独自のオプションを作成できます。

MirroredStrategyを作成する最も簡単な方法は次のとおりです。

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

これにより、TensorFlowに認識されるすべてのGPUを使用するMirroredStrategyインスタンスが作成され、クロスデバイス通信としてNCCLが使用されます。

マシンで一部のGPUのみを使用する場合は、次のように使用できます。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

クロスデバイス通信をオーバーライドしたい場合は、 tf.distribute.CrossDeviceOpsインスタンスを提供することにより、 cross_device_ops引数を使用して行うことができます。現在、 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDeviceは、デフォルトであるtf.distribute.NcclAllReduce以外の2つのオプションです。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy使用すると、TensorFlowトレーニングをTensor Processing Unit(TPU)で実行できます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊ASICです。これらは、Google Colab、 TensorFlow Research CloudCloud TPUで利用できます。

分散トレーニングアーキテクチャの点では、 TPUStrategyは同じMirroredStrategyです。同期分散トレーニングを実装しています。 TPUは、 TPUStrategy使用される複数のTPUコアにまたがる効率的なall-reduceおよびその他の集合演算の独自の実装を提供します。

TPUStrategyをインスタンス化する方法は次のTPUStrategyです。

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolverインスタンスは、TPUを見つけるのに役立ちます。 Colabでは、引数を指定する必要はありません。

これをCloud TPUに使用する場合:

  • tpu引数でTPUリソースの名前を指定する必要があります。
  • プログラムの開始時に、tpuシステムを明示的に初期化する必要があります。これは、TPUを計算に使用する前に必要です。 tpuシステムを初期化すると、TPUメモリも消去されるため、状態が失われないようにするには、最初にこの手順を完了することが重要です。

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategyMirroredStrategyに非常に似ています。複数のワーカーに同期分散トレーニングを実装し、それぞれに複数のGPUが存在する可能性があります。 MirroredStrategyと同様に、すべてのワーカーの各デバイスのモデルにすべての変数のコピーを作成します。

変数の同期を維持するために使用されるマルチワーカーの全還元通信メソッドとしてCollectiveOpsを使用します。集合演算とは、TensorFlowグラフの単一の演算であり、ハードウェア、ネットワークトポロジ、テンソルサイズに応じて、TensorFlowランタイムの全縮約アルゴリズムを自動的に選択できます。

また、追加のパフォーマンス最適化も実装します。たとえば、小さなテンソルの複数のすべての削減を大きなテンソルの少ないすべての削減に変換する静的最適化が含まれています。さらに、プラグインアーキテクチャを持つように設計しています。将来的には、ハードウェアに合わせて調整されたアルゴリズムをプラグインできるようになります。集合オペレーションは、ブロードキャストやオールギャザーなどの他の集合操作も実装することに注意してください。

MultiWorkerMirroredStrategyを作成する最も簡単な方法はMultiWorkerMirroredStrategyです。

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy現在、 MultiWorkerMirroredStrategy 2つの異なる実装から選択できます。 CollectiveCommunication.RINGは、gRPCを通信レイヤーとして使用してリングベースの集合を実装します。 CollectiveCommunication.NCCLは、 NvidiaのNCCLを使用して集合を実装します。 CollectiveCommunication.AUTOは、選択をランタイムに委ねます。集合的な実装の最良の選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続によって異なります。次の方法で指定できます。

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

マルチGPUトレーニングと比較した場合のマルチワーカートレーニングの主な違いの1つは、マルチワーカーのセットアップです。 TF_CONFIG環境変数は、 TF_CONFIGの標準的な方法で、クラスターの一部である各ワーカーにクラスター構成を指定します。 TF_CONFIGの設定の詳細。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategyは同期トレーニングも行います。変数はミラーリングされず、代わりにCPUに配置され、操作はすべてのローカルGPU間で複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。

CentralStorageStrategyインスタンスを作成するには:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

これにより、表示可能なすべてのGPUとCPUを使用するCentralStorageStrategyインスタンスが作成されます。レプリカの変数への更新は、変数に適用される前に集約されます。

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategyは、複数のマシンでのパラメーターサーバートレーニングをサポートしています。この設定では、一部のマシンはワーカーとして指定され、一部はパラメーターサーバーとして指定されています。モデルの各変数は、1つのパラメーターサーバーに配置されます。計算は、すべてのワーカーのすべてのGPUにわたって複製されます。

コードに関しては、他の戦略と似ています。

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

マルチワーカートレーニングの場合、 TF_CONFIGはクラスター内のパラメーターサーバーとワーカーの構成を指定する必要があります。詳細については、 以下のTF_CONFIGを参照してください

その他の戦略

上記の戦略に加えて、 tf.distribute APIを使用する場合のプロトタイピングとデバッグに役立つ2つの戦略があります。

デフォルトの戦略

デフォルトの戦略は、明示的な配信戦略が範囲内にない場合に存在する配信戦略です。これはtf.distribute.Strategyインターフェースを実装しますが、パススルーであり、実際の配布は提供しません。たとえば、 strategy.run(fn)は単にfn呼び出します。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作します。あなたはそれを「ノーオペレーション」戦略と考えることができます。

デフォルトの戦略はシングルトンであり、それ以上インスタンスを作成することはできません。明示的な戦略のスコープ外でtf.distribute.get_strategy()使用して取得できます(明示的な戦略のスコープ内で現在の戦略を取得するために使用できるのと同じAPI)。

default_strategy = tf.distribute.get_strategy()

この戦略には2つの主な目的があります。

  • これにより、無条件で配布対応ライブラリコードを記述できます。たとえば、オプティマイザでは、 tf.distribute.get_strategy()を実行して、勾配を減らすための戦略を使用できます。これは、常に削減APIを呼び出すことができる戦略オブジェクトを返します。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • ライブラリコードと同様に、エンドユーザーのプログラムを記述して、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず作業を行うことができます。これを示すサンプルコードスニペット:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategyは、すべての変数と計算を1つの指定されたデバイスに配置する戦略です。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

この戦略は、いくつかの点でデフォルトの戦略とは異なります。デフォルトの戦略では、分散戦略なしでTensorFlowを実行する場合と比較すると、変数配置ロジックは変更されません。ただし、 OneDeviceStrategyを使用する場合、そのスコープで作成されたすべての変数は、指定されたデバイスに明示的に配置されます。さらに、 OneDeviceStrategy.runを介してOneDeviceStrategy.run関数も、指定されたデバイスに配置されます。

この戦略を通じて分散された入力は、指定されたデバイスにプリフェッチされます。デフォルトの戦略では、入力分布はありません。

デフォルトの戦略と同様に、この戦略は、実際に複数のデバイス/マシンに分散する他の戦略に切り替える前にコードをテストするためにも使用できます。これにより、デフォルトの戦略よりもいくらか分散戦略のTPUStrategyますが、 MirroredStrategyTPUStrategyなどを使用する場合ほど完全には機能しません。戦略がないかのように動作するコードが必要な場合は、デフォルトの戦略を使用してください。

これまで、利用可能なさまざまな戦略とは何か、およびそれらをインスタンス化する方法について説明しました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法について説明します。このガイドでは短いコードスニペットを示し、エンドツーエンドで実行できる完全なチュートリアルにリンクします。

tf.distribute.Strategyでのtf.keras.Model.fit使用

tf.distribute.Strategyを、 tf.keras API仕様の実装であるtf.distribute.Strategyに統合しました。 tf.kerasは、モデルを構築およびトレーニングするための高レベルAPIです。 tf.kerasバックエンドに統合することで、 model.fitを使用してmodel.fitトレーニングフレームワークで記述されたトレーニングをシームレスに配布できます。

コードで変更する必要があるものは次のとおりです。

  1. 適切なtf.distribute.Strategyインスタンスを作成します。
  2. Kerasモデル、オプティマイザー、およびメトリックの作成をstrategy.scope内に移動します。

順次、機能、サブクラスのすべてのタイプのKerasモデルをサポートしています。

これは、1つの高密度レイヤーを持つ非常に単純なKerasモデルでこれを行うためのコードスニペットです。

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

この例では、 MirroredStrategyを使用したため、これを複数のGPUが搭載されたマシンで実行できます。 strategy.scope()は、トレーニングの配布に使用する戦略をKerasに示します。このスコープ内でモデル/オプティマイザー/メトリックを作成すると、通常の変数の代わりに分散変数を作成できます。これがセットアップされると、通常のようにモデルを適合させることができます。 MirroredStrategyは、利用可能なGPUでのモデルのトレーニングの複製、勾配の集計などを行います。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 3.4059
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 1.5054
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 0.9349

0.9348920583724976

ここでは、 tf.data.Datasetを使用してトレーニングと評価の入力を提供しました。 numpy配列を使用することもできます:

import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.6654
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.2941

<tensorflow.python.keras.callbacks.History at 0x7f8dd86cbcc0>

どちらの場合(データセットまたはnumpy)でも、指定された入力の各バッチは、複数のレプリカ間で均等に分割されます。たとえば、2つのGPUでMirroredStrategyを使用する場合、サイズ10の各バッチは2つのGPU間で分割され、各ステップで各ステップで5つの入力例を受け取ります。 GPUを追加すると、各エポックはより速くトレーニングします。通常、アクセラレータを追加するときにバッチサイズを増やして、追加のコンピューティング能力を効果的に使用する必要があります。また、モデルに応じて、学習率を再調整する必要があります。 strategy.num_replicas_in_syncを使用して、レプリカの数を取得できます。

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

現在何がサポートされていますか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API 支えられる支えられる実験的サポート実験的サポートサポート予定2.3

例とチュートリアル

上記のKerasとの統合を示すチュートリアルと例のリストを次に示します。

  1. MirroredStrategy MNISTをトレーニングするためのチュートリアル
  2. MultiWorkerMirroredStrategyを使用してMultiWorkerMirroredStrategyをトレーニングするチュートリアル
  3. TPUStrategyを使用したTPUStrategyトレーニングに関するガイド
  4. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlow Model Garden リポジトリ

カスタムトレーニングループでのtf.distribute.Strategy使用

あなたが見てきたように、使用してtf.distribute.Strategy Kerasのでmodel.fitあなたのコードの唯一の数行を変更する必要があります。もう少し努力して、カスタムトレーニングループでtf.distribute.Strategyを使用することもできます。

EstimatorやKerasよりも柔軟でトレーニングループを制御する必要がある場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、各ラウンドで異なる数のジェネレータステップまたはディスクリミネータステップを実行することができます。同様に、高レベルのフレームワークは強化学習のトレーニングにはあまり適していません。

カスタムトレーニングループをサポートするために、 tf.distribute.Strategyクラスを通じてメソッドのコアセットを提供してtf.distribute.Strategyます。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるはずです。

ここでは、以前と同じKerasモデルを使用した簡単なトレーニング例のこの使用例を示す短いスニペットを示します。

まず、戦略のスコープ内にモデルとオプティマイザを作成します。これにより、モデルとオプティマイザで作成されたすべての変数がミラー変数になります。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

次に、入力データセットを作成し、 tf.distribute.Strategy.experimental_distribute_datasetを呼び出して、戦略に基づいてデータセットを配布します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

次に、トレーニングの1つのステップを定義します。 tf.GradientTapeを使用して勾配を計算し、オプティマイザーを使用してこれらの勾配を適用してモデルの変数を更新します。このトレーニングステップを分散するには、関数train_stepを入れて、前に作成したdist_datasetから取得したデータセット入力とともにtf.distrbute.Strategy.runに渡します。

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上記のコードで注意すべきその他の事項:

  1. tf.nn.compute_average_lossを使用して損失を計算しました。 tf.nn.compute_average_lossは、例ごとの損失を合計し、その合計をglobal_batch_sizeで割ります。後で各レプリカで勾配が計算された後、それらを合計することによってレプリカ全体で勾配が集計されるため、これは重要です。
  2. tf.distribute.Strategy.reduce APIを使用して、 tf.distribute.Strategy.reduceによって返された結果を集計しtf.distribute.Strategy.runtf.distribute.Strategy.runは、戦略の各ローカルレプリカから結果を返します。この結果を使用する方法は複数あります。それらをreduceて、集約値を取得できます。 tf.distribute.Strategy.experimental_local_resultstf.distribute.Strategy.experimental_local_resultsして、ローカルレプリカごとに1つずつ、結果に含まれる値のリストを取得することもできます。
  3. apply_gradientsが配布戦略スコープ内で呼び出されると、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。

最後に、トレーニングステップを定義しdist_datasetdist_datasetを反復処理して、トレーニングをループで実行できます。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.3262986, shape=(), dtype=float32)
tf.Tensor(0.32475147, shape=(), dtype=float32)
tf.Tensor(0.3232167, shape=(), dtype=float32)
tf.Tensor(0.32169423, shape=(), dtype=float32)
tf.Tensor(0.32018384, shape=(), dtype=float32)
tf.Tensor(0.3186855, shape=(), dtype=float32)
tf.Tensor(0.317199, shape=(), dtype=float32)
tf.Tensor(0.31572425, shape=(), dtype=float32)
tf.Tensor(0.31426117, shape=(), dtype=float32)
tf.Tensor(0.31280956, shape=(), dtype=float32)
tf.Tensor(0.3113694, shape=(), dtype=float32)
tf.Tensor(0.30994043, shape=(), dtype=float32)
tf.Tensor(0.30852267, shape=(), dtype=float32)
tf.Tensor(0.30711594, shape=(), dtype=float32)
tf.Tensor(0.30572012, shape=(), dtype=float32)
tf.Tensor(0.30433518, shape=(), dtype=float32)
tf.Tensor(0.3029609, shape=(), dtype=float32)
tf.Tensor(0.30159724, shape=(), dtype=float32)
tf.Tensor(0.30024406, shape=(), dtype=float32)
tf.Tensor(0.29890123, shape=(), dtype=float32)

上記の例では、 dist_datasetを反復処理して、 dist_datasetへの入力を提供しました。また、numpy入力をサポートするtf.distribute.Strategy.make_experimental_numpy_datasetも提供しています。このAPIを使用して、 tf.distribute.Strategy.experimental_distribute_dataset呼び出す前にデータセットを作成できます。

データを反復処理するもう1つの方法は、明示的に反復子を使用することです。これは、データセット全体を反復するのではなく、特定の数のステップを実行する場合に実行できます。反復上記今最初のイテレータを作成するように変更され、その後、明示的に呼んでnextの入力データを取得することに。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.2975687, shape=(), dtype=float32)
tf.Tensor(0.2962464, shape=(), dtype=float32)
tf.Tensor(0.29493415, shape=(), dtype=float32)
tf.Tensor(0.29363185, shape=(), dtype=float32)
tf.Tensor(0.2923394, shape=(), dtype=float32)
tf.Tensor(0.29105672, shape=(), dtype=float32)
tf.Tensor(0.28978375, shape=(), dtype=float32)
tf.Tensor(0.28852034, shape=(), dtype=float32)
tf.Tensor(0.2872664, shape=(), dtype=float32)
tf.Tensor(0.28602186, shape=(), dtype=float32)

これは、 tf.distribute.Strategy APIを使用してカスタムトレーニングループを配布する最も単純なケースをカバーしています。現在、これらのAPIの改善を進めています。このユースケースでは、コードを適合させるためにより多くの作業が必要になるため、将来的には別の詳細なガイドを公開する予定です。

現在何がサポートされていますか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
カスタムトレーニングループ支えられる支えられる実験的サポート実験的サポートサポート予定2.3

例とチュートリアル

カスタムトレーニングループで配布戦略を使用するいくつかの例を次に示します。

  1. MirroredStrategyを使用してMNISTをトレーニングするチュートリアル
  2. TPUStrategyを使用したTPUStrategyトレーニングに関するガイド
  3. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlow Model Garden リポジトリ

Estimatorでのtf.distribute.Strategy使用(制限付きサポート)

tf.estimatorは、最初は非同期パラメータサーバーアプローチをサポートしていた分散トレーニングTensorFlow APIです。 tf.distribute.Strategyと同様に、 tf.distribute.Strategytf.Estimator統合しtf.Estimator 。トレーニングにEstimatorを使用している場合は、コードをほとんど変更することなく、分散トレーニングに簡単に変更できます。これにより、Estimatorユーザーは、TPUを使用するだけでなく、複数のGPUと複数のワーカーで同期分散トレーニングを実行できるようになりました。ただし、Estimatorでのこのサポートは制限されています。詳細については、以下現在サポートされている機能」セクションを参照してください。

Estimatorでのtf.distribute.Strategyの使用法は、 tf.distribute.Strategyとは少し異なります。 strategy.scopeを使用する代わりに、EstimatorのRunConfig戦略オブジェクトを渡します。

以下は、 LinearRegressorれたEstimator LinearRegressorMirroredStrategyを使用してこれを示すコードのスニペットです。

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpbnyibny6
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpbnyibny6', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8dd83bc518>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f8dd83bc518>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

ここでは既成のEstimatorを使用していますが、同じコードがカスタムEstimatorでも機能します。 train_distributeはトレーニングの配布方法を決定し、 eval_distributeは評価の配布方法を決定します。これは、トレーニングと評価の両方に同じ戦略を使用するKerasとの別の違いです。

これで、入力関数を使用してこのEstimatorをトレーニングおよび評価できます。

def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd8317ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd8317ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpbnyibny6/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmpbnyibny6/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd83c6730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f8dd83c6730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-09-10T01:28:07Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpbnyibny6/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.24097s
INFO:tensorflow:Finished evaluation at 2020-09-10-01:28:07
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmpbnyibny6/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

EstimatorとKerasの間でここで強調するもう1つの違いは、入力処理です。 Kerasでは、データセットの各バッチが複数のレプリカ間で自動的に分割されると述べました。ただし、Estimatorでは、バッチの自動分割も、異なるワーカー間でのデータの自動シャーディングも行いません。ワーカーとデバイス間でデータを分散する方法を完全に制御できます。データの分散方法を指定するには、 input_fnを指定する必要があります。

input_fnはワーカーごとに1回呼び出されるため、ワーカーごとに1つのデータセットが提供されます。次に、そのデータセットの1つのバッチがそのワーカーの1つのレプリカに供給され、1つのワーカーのN個のレプリカに対してN個のバッチが消費されます。つまり、 input_fnによって返されるデータセットは、サイズPER_REPLICA_BATCH_SIZEバッチを提供する必要があります。また、ステップのグローバルバッチサイズは、 PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_syncとして取得できます。

マルチワーカーのトレーニングを行う場合、データをワーカー間で分割するか、それぞれにランダムシードを使用してシャッフルする必要があります。これを行う方法の例は、Estimatorを使用したマルチワーカートレーニングで確認できます。

同様に、マルチワーカーとパラメーターサーバーの戦略も使用できます。コードは同じままですが、 tf.estimator.train_and_evaluateを使用して、クラスターで実行されている各バイナリにTF_CONFIG環境変数を設定する必要があります。

現在何がサポートされていますか?

TPUStrategyを除くすべての戦略を使用したEstimatorでのトレーニングのサポートは限られていTPUStrategy 。基本的なトレーニングと評価は機能するはずですが、足場などの多くの高度な機能はまだ機能しません。この統合にはいくつかのバグがある可能性もあります。現時点では、このサポートを積極的に改善する予定はなく、代わりにKerasとカスタムトレーニングループのサポートに重点を置いています。可能であれば、代わりにこれらのAPIでtf.distributeを使用することをおtf.distributeします。

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Estimator API 限定サポートサポートされていません限定サポート限定サポート限定サポート

例とチュートリアル

Estimatorを使用したさまざまな戦略のエンドツーエンドの使用法を示すいくつかの例を次に示します。

  1. 見積もりとマルチワーカー養成使用して、複数の労働者とMNISTを訓練するMultiWorkerMirroredStrategy
  2. Kubernetesテンプレートを使用したテンソルフロー/エコシステムでのマルチワーカートレーニングのエンドツーエンドの例 。この例は、 tf.keras.estimator.model_to_estimatorモデルから開始し、 tf.keras.estimator.model_to_estimator APIを使用してEstimatorに変換します。
  3. 公式ResNet50のいずれかを使用して訓練することができるモデル、 MirroredStrategyまたはMultiWorkerMirroredStrategy

その他の話題

このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。

TF_CONFIG環境変数の設定

マルチワーカートレーニングの場合、前述のように、クラスターで実行されている各バイナリに対してTF_CONFIG環境変数を設定する必要があります。 TF_CONFIG環境変数は、クラスターを構成するタスク、そのアドレス、およびクラスター内の各タスクの役割を指定するJSON文字列です。トレーニングタスク用にTF_CONFIGを設定するKubernetesテンプレートをtensorflow / ecosystemリポジトリにTF_CONFIGします。

TF_CONFIGには、クラスターとタスクという2つのコンポーネントがあります。 clusterは、トレーニングクラスターに関する情報を提供します。これは、ワーカーなどのさまざまなタイプのジョブで構成される辞書です。マルチワーカートレーニングでは、通常、通常のワーカーの作業に加えて、チェックポイントの保存やTensorBoardの要約ファイルの書き込みなど、もう少し責任を負うワーカーが1人います。このようなワーカーは「チーフ」ワーカーと呼ばれ、通常、インデックス0のワーカーがチーフワーカーとして指定されます(実際、これがtf.distribute.Strategyの実装方法です)。一方、タスクは現在のタスクの情報を提供します。最初のコンポーネントクラスタはすべてのワーカーで同じであり、2番目のコンポーネントタスクは各ワーカーで異なり、そのワーカーのタイプとインデックスを指定します。

TF_CONFIG一例はTF_CONFIGです。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

このTF_CONFIGは、クラスター内に3つのワーカーと2つのpsタスク、およびそれらのホストとポートがあることを指定します。 「タスク」の部分は、クラスター内の現在のタスクの役割、ワーカー1(2番目のワーカー)を指定します。クラスタでの有効な役割は、「チーフ」、「ワーカー」、「ps」、および「エバリュエーター」です。 tf.distribute.experimental.ParameterServerStrategyを使用する場合を除き、「ps」ジョブはありません。

次は何ですか?

tf.distribute.Strategyは積極的に開発中です。ぜひお試しいただき、 GitHubの問題を使用しフィードバックを提供してください。