![]() | ![]() | ![]() | ![]() |
概要概要
tf.distribute.Strategy
は、トレーニングを複数のGPU、複数のマシン、またはTPUに分散するためのTensorFlowAPIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。
tf.distribute.Strategy
は、次の主要な目標を念頭に置いて設計されています。
- 使いやすく、研究者、MLエンジニアなどを含む複数のユーザーセグメントをサポートします。
- 箱から出してすぐに優れたパフォーマンスを提供します。
- 戦略を簡単に切り替えることができます。
tf.distribute.Strategy
のような高レベルAPIで使用することができKeras 、またカスタムトレーニングループ(及び、TensorFlowを使用して、一般的に、任意の計算)を配布するために使用することができます。
TensorFlow 2.xでは、プログラムを熱心に実行することも、 tf.function
を使用してグラフで実行することもできます。 tf.distribute.Strategy
は、これら両方の実行モードをサポートすることを目的としていますが、 tf.function
最適にtf.function
ます。イーガーモードはデバッグ目的でのみ推奨され、 TPUStrategy
ではサポートされていません。このガイドの焦点はトレーニングですが、このAPIは、さまざまなプラットフォームで評価と予測を配布するためにも使用できます。
tf.distribute.Strategy
の基盤となるコンポーネントが戦略対応になるように変更されたため、コードをほとんど変更せずにtf.distribute.Strategy
を使用できます。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。
このガイドでは、さまざまなタイプの戦略と、さまざまな状況でそれらを使用する方法について説明します。パフォーマンスの問題をデバッグする方法については、 Optimize TensorFlowGPUパフォーマンスガイドを参照してください。
# Import TensorFlow
import tensorflow as tf
戦略の種類
tf.distribute.Strategy
は、さまざまな軸に沿った多くのユースケースをカバーすることをtf.distribute.Strategy
としています。これらの組み合わせの一部は現在サポートされており、その他は将来追加される予定です。これらの軸のいくつかは次のとおりです。
- 同期トレーニングと非同期トレーニング:これらは、データの並列処理を使用してトレーニングを分散する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが独立して入力データをトレーニングし、変数を非同期に更新します。通常、同期トレーニングは、パラメータサーバーアーキテクチャを介したall-reduceおよびasyncを介してサポートされます。
- ハードウェアプラットフォーム: 1台のマシン上の複数のGPU、ネットワーク内の複数のマシン(それぞれ0個以上のGPU)、またはクラウドTPUにトレーニングを拡張することをお勧めします。
これらのユースケースをサポートするために、6つの戦略が利用可能です。次のセクションでは、TFのどのシナリオでこれらのどれがサポートされているかを説明します。概要は次のとおりです。
トレーニングAPI | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy |
---|---|---|---|---|---|
Keras API | サポートされています | サポートされています | 実験的サポート | 実験的サポート | サポートされている計画されたポスト2.3 |
カスタムトレーニングループ | サポートされています | サポートされています | 実験的サポート | 実験的サポート | サポートされている計画されたポスト2.3 |
Estimator API | 限定サポート | サポートされていません | 限定サポート | 限定サポート | 限定サポート |
MirroredStrategy
tf.distribute.MirroredStrategy
は、1台のマシン上の複数のGPUでの同期分散トレーニングをサポートします。 GPUデバイスごとに1つのレプリカを作成します。モデル内の各変数は、すべてのレプリカ間でミラーリングされます。これらの変数が一緒になって、 MirroredVariable
と呼ばれる単一の概念変数を形成します。これらの変数は、同一の更新を適用することにより、相互に同期されます。
効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を通信します。 All-reduceは、テンソルを合計することですべてのデバイスに集約し、各デバイスで使用できるようにします。これは非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信のタイプに応じて、利用可能な多くのall-reduceアルゴリズムと実装があります。デフォルトでは、all-reduce実装としてNVIDIANCCLを使用します。他のいくつかのオプションから選択するか、独自に作成することができます。
MirroredStrategy
を作成する最も簡単な方法は次のとおりです。
mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
これにより、TensorFlowに表示されるすべてのGPUを使用するMirroredStrategy
インスタンスが作成され、NCCLがクロスデバイス通信として使用されます。
マシンで一部のGPUのみを使用する場合は、次のように実行できます。
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0 INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
クロスデバイス通信をオーバーライドする場合は、 tf.distribute.CrossDeviceOps
インスタンスを指定して、 cross_device_ops
引数を使用してcross_device_ops
できます。現在、 tf.distribute.HierarchicalCopyAllReduce
とtf.distribute.ReductionToOneDevice
は、デフォルトのtf.distribute.NcclAllReduce
以外の2つのオプションです。
mirrored_strategy = tf.distribute.MirroredStrategy(
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
TPUStrategy
tf.distribute.TPUStrategy
使用すると、TensorFlowトレーニングをTensor Processing Unit(TPU)で実行できます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊なASICです。彼らは、Googleのコラボ、上で利用可能なTensorFlow研究クラウドとクラウドTPU 。
分散トレーニングアーキテクチャに関しては、 TPUStrategy
は同じMirroredStrategy
あり、同期分散トレーニングを実装します。 TPUは、 TPUStrategy
使用される複数のTPUコアにわたって、効率的なall-reduceおよびその他の集合操作の独自の実装を提供します。
TPUStrategy
をインスタンス化する方法は次のTPUStrategy
です。
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
TPUClusterResolver
インスタンスは、TPUの検索に役立ちます。 Colabでは、引数を指定する必要はありません。
これをクラウドTPUに使用する場合:
-
tpu
引数にTPUリソースの名前を指定する必要があります。 - プログラムの開始時に、tpuシステムを明示的に初期化する必要があります。これは、TPUを計算に使用する前に必要です。 tpuシステムを初期化すると、TPUメモリも消去されるため、状態が失われないように、最初にこの手順を完了することが重要です。
MultiWorkerMirroredStrategy
tf.distribute.MultiWorkerMirroredStrategy
は、 MirroredStrategy
と非常によく似ています。それぞれが潜在的に複数のGPUを備えた複数のワーカーにまたがる同期分散トレーニングを実装します。 tf.distribute.MirroredStrategy
と同様に、すべてのワーカーの各デバイスでモデル内のすべての変数のコピーを作成します。
MultiWorkerMirroredStrategy
を作成する最も簡単な方法はMultiWorkerMirroredStrategy
です。
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',) INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
は、クロスデバイス通信用の2つの実装があります。 CommunicationImplementation.RING
はRPCベースであり、CPUとGPUの両方をサポートします。 CommunicationImplementation.NCCL
はNvidiaのNCCLを使用し、GPUで最先端のパフォーマンスを提供しますが、CPUをサポートしていません。 CollectiveCommunication.AUTO
は、選択をTensorflowに延期します。次の方法で指定できます。
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',) INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL
マルチGPUトレーニングと比較して、マルチワーカートレーニングを開始するための主な違いの1つは、マルチワーカーのセットアップです。 TF_CONFIG
環境変数は、クラスターの一部である各ワーカーにクラスター構成を指定するTensorFlowの標準的な方法です。 TF_CONFIGの設定の詳細をご覧ください。
ParameterServerStrategy
パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。詳細については、パラメータサーバーのトレーニングチュートリアルを参照してください。
TensorFlow 2パラメーターサーバーのトレーニングでは、 tf.distribute.experimental.coordinator.ClusterCoordinator
クラスを介して中央コーディネーターベースのアーキテクチャを使用します。
この実装では、 worker
タスクとparameter server
タスクは、コーディネーターからのタスクをリッスンするtf.distribute.Server
を実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。
コーディネーターで実行されているプログラミングでは、 ParameterServerStrategy
オブジェクトを使用してトレーニングステップを定義し、 ClusterCoordinator
を使用してトレーニングステップをリモートワーカーにディスパッチします。それらを作成する最も簡単な方法は次のとおりです。
strategy = tf.distribute.experimental.ParameterServerStrategy(
tf.distribute.cluster_resolver.TFConfigClusterResolver(),
variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
strategy)
あなたが使用している場合TF_CONFIG環境変数を設定する必要があります注意してくださいTFConfigClusterResolver
。それはに似てTF_CONFIGにMultiWorkerMirroredStrategy
が、追加の注意事項があります。
TF 1では、 ParameterServerStrategy
は、 tf.compat.v1.distribute.experimental.ParameterServerStrategy
シンボルを介したtf.compat.v1.distribute.experimental.ParameterServerStrategy
のみ使用できます。
CentralStorageStrategy
tf.distribute.experimental.CentralStorageStrategy
は同期トレーニングも行います。変数はミラーリングされず、代わりにCPUに配置され、操作はすべてのローカルGPUに複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。
次の方法でCentralStorageStrategy
インスタンスを作成します。
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
これにより、表示されているすべてのGPUとCPUを使用するCentralStorageStrategy
インスタンスが作成されます。レプリカの変数への更新は、変数に適用される前に集約されます。
その他の戦略
上記の戦略に加えて、 tf.distribute
使用する際のプロトタイピングとデバッグに役立つ可能性のある他の2つの戦略があります。
デフォルト戦略
デフォルトの戦略は、明示的な配布戦略がスコープ内にない場合に存在する配布戦略です。これはtf.distribute.Strategy
インターフェースを実装しますが、パススルーであり、実際の配布は提供しません。たとえば、 strategy.run(fn)
は単にfn
呼び出します。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作する必要があります。あなたはそれを「ノーオペレーション」戦略と考えることができます。
デフォルトの戦略はシングルトンであり、それ以上のインスタンスを作成することはできません。これは、明示的なストラテジーのスコープ外でtf.distribute.get_strategy()
使用して取得できます(明示的なストラテジーのスコープ内で現在のストラテジーを取得するために使用できるのと同じAPI)。
default_strategy = tf.distribute.get_strategy()
この戦略は2つの主な目的を果たします。
- これにより、ディストリビューション対応のライブラリコードを無条件に記述できます。たとえば、
tf.optimizer
では、tf.optimizer
tf.distribute.get_strategy()
を使用して、その戦略を使用して勾配を減らすことができます。これにより、tf.optimizer
を呼び出すことができる戦略オブジェクトが常に返されます。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None) # reduce some values
1.0
- ライブラリコードと同様に、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず動作するエンドユーザーのプログラムを作成するために使用できます。これを説明するサンプルコードスニペット:
if tf.config.list_physical_devices('gpu'):
strategy = tf.distribute.MirroredStrategy()
else: # use default strategy
strategy = tf.distribute.get_strategy()
with strategy.scope():
# do something interesting
print(tf.Variable(1.))
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
OneDeviceStrategy
tf.distribute.OneDeviceStrategy
は、すべての変数と計算を単一の指定されたデバイスに配置するための戦略です。
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
この戦略は、いくつかの点でデフォルトの戦略とは異なります。デフォルトの戦略では、分散戦略なしでTensorFlowを実行する場合と比較した場合、変数配置ロジックは変更されません。ただし、 OneDeviceStrategy
を使用する場合、そのスコープで作成されたすべての変数は、指定されたデバイスに明示的に配置されます。さらに、 OneDeviceStrategy.run
を介してOneDeviceStrategy.run
関数も、指定されたデバイスに配置されます。
この戦略を通じて配布された入力は、指定されたデバイスにプリフェッチされます。デフォルトの戦略では、入力の分散はありません。
デフォルトの戦略と同様に、この戦略は、実際に複数のデバイス/マシンに配布される他の戦略に切り替える前に、コードをテストするためにも使用できます。これにより、デフォルトの戦略よりもいくらか多くの配布戦略機構が実行されますが、 MirroredStrategy
やTPUStrategy
などを使用する場合ほどではありません。戦略がないかのように動作するコードが必要な場合は、デフォルトの戦略を使用してください。
これまで、利用可能なさまざまな戦略と、それらをインスタンス化する方法を見てきました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法を示します。
tf.distribute.Strategy
でtf.keras.Model.fit
を使用する
tf.distribute.Strategy
は、 tf.keras
によるKerasAPI仕様の実装であるtf.kerasに統合されています。 tf.keras
は、モデルを構築およびトレーニングするための高レベルAPIです。 tf.keras
バックエンドに統合することで、 model.fit
を使用してmodel.fit
トレーニングフレームワークで記述されたトレーニングをシームレスに配布できるようにmodel.fit
。
コードで変更する必要があるものは次のとおりです。
- 適切な
tf.distribute.Strategy
インスタンスを作成します。 - Kerasモデル、オプティマイザー、メトリックの作成を
strategy.scope
内に移動します。
シーケンシャル、機能、サブクラスのすべてのタイプのKerasモデルをサポートします。
これを行うためのコードスニペットは、1つの密なレイヤーを持つ非常に単純なKerasモデルです。
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
この例ではMirroredStrategy
使用しているため、複数のGPUを搭載したマシンでこれを実行できます。 strategy.scope()
は、トレーニングの配布に使用する戦略をKerasに示します。このスコープ内にモデル/オプティマイザー/メトリックを作成すると、通常の変数の代わりに分散変数を作成できます。これを設定すると、通常どおりにモデルを適合させることができます。 MirroredStrategy
は、利用可能なGPUでのモデルのトレーニングの複製、グラデーションの集計などを処理します。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). 10/10 [==============================] - 3s 2ms/step - loss: 0.9810 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). Epoch 2/2 10/10 [==============================] - 0s 2ms/step - loss: 0.4336 10/10 [==============================] - 1s 2ms/step - loss: 0.2299 0.22988776862621307
ここで、tf.data.Dataset
はトレーニングと評価の入力を提供します。 numpy配列を使用することもできます。
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2 10/10 [==============================] - 1s 2ms/step - loss: 0.1636 Epoch 2/2 10/10 [==============================] - 0s 2ms/step - loss: 0.0723 <tensorflow.python.keras.callbacks.History at 0x7f89057a2470>
どちらの場合(データセットまたはnumpy)でも、指定された入力の各バッチは、複数のレプリカ間で均等に分割されます。たとえば、2つのGPUでMirroredStrategy
を使用する場合、サイズ10の各バッチは2つのGPUに分割され、各ステップで5つの入力例を受け取ります。 GPUを追加すると、各エポックのトレーニングが速くなります。通常、追加の計算能力を効果的に利用するために、アクセラレータを追加するときにバッチサイズを増やす必要があります。モデルによっては、学習率を再調整する必要もあります。 strategy.num_replicas_in_sync
を使用して、レプリカの数を取得できます。
# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)
LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
現在サポートされているものは何ですか?
トレーニングAPI | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | ParameterServerStrategy | CentralStorageStrategy |
---|---|---|---|---|---|
Keras API | サポートされています | サポートされています | 実験的サポート | 実験的サポート | 実験的サポート |
例とチュートリアル
上記のKerasとのエンドツーエンドの統合を説明するチュートリアルと例のリストを次に示します。
-
MirroredStrategy
を使用してMNISTをトレーニングするためのチュートリアル。 - MultiWorkerMirroredStrategyを使用してMNISTを
MultiWorkerMirroredStrategy
するためのチュートリアル。 - TPUStrategyを使用した
TPUStrategy
トレーニングに関するガイド。 -
ParameterServerStrategy
したTensorFlow2でのパラメーターサーバートレーニングのチュートリアル。 - さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ。
カスタムトレーニングループでのtf.distribute.Strategy
使用
あなたが見てきたように、使用してtf.distribute.Strategy
Kerasのでmodel.fit
あなたのコードの唯一の数行を変更する必要があります。もう少し努力すれば、カスタムトレーニングループでtf.distribute.Strategy
を使用することもできます。
EstimatorやKerasで可能なよりも柔軟性とトレーニングループの制御が必要な場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、ラウンドごとに異なる数のジェネレーターまたはディスクリミネーターのステップを実行することができます。同様に、高レベルのフレームワークは強化学習トレーニングにはあまり適していません。
tf.distribute.Strategy
クラスは、カスタムトレーニングループをサポートするためのメソッドのコアセットを提供します。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるはずです。
ここでは、以前と同じKerasモデルを使用した簡単なトレーニング例のこのユースケースを示す簡単なスニペットを示します。
まず、戦略の範囲内でモデルとオプティマイザーを作成します。これにより、モデルとオプティマイザーで作成された変数がミラーリングされた変数になります。
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
optimizer = tf.keras.optimizers.SGD()
次に、入力データセットを作成し、 tf.distribute.Strategy.experimental_distribute_dataset
を呼び出して、戦略に基づいてデータセットを配布します。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
次に、トレーニングの1つのステップを定義します。tf.GradientTape
を使用して勾配を計算し、オプティマイザーを使用してそれらの勾配を適用してモデルの変数を更新します。このトレーニングステップを配布するには、関数train_step
入れて、前に作成したdist_dataset
から取得したデータセット入力とともにtf.distrbute.Strategy.run
に渡します。
loss_object = tf.keras.losses.BinaryCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)
def train_step(inputs):
features, labels = inputs
with tf.GradientTape() as tape:
predictions = model(features, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
@tf.function
def distributed_train_step(dist_inputs):
per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
上記のコードで注意すべき他のいくつかのこと:
-
tf.nn.compute_average_loss
を使用して損失を計算しました。tf.nn.compute_average_loss
は、例ごとの損失を合計し、その合計をglobal_batch_sizeで除算します。後で各レプリカで勾配が計算された後、それらを合計することによってレプリカ全体で集計されるため、これは重要です。 -
tf.distribute.Strategy.reduce
APIを使用して、tf.distribute.Strategy.reduce
によって返された結果を集計しtf.distribute.Strategy.run
。tf.distribute.Strategy.run
は、ストラテジー内の各ローカルレプリカから結果を返します。この結果を利用するには、複数の方法があります。それらをreduce
て、集計値を取得できます。tf.distribute.Strategy.experimental_local_results
を実行して、結果に含まれる値のリストをローカルレプリカごとに1つずつ取得することもできます。 -
apply_gradients
が配布戦略スコープ内で呼び出されると、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。
最後に、トレーニングステップを定義しdist_dataset
、 dist_dataset
を反復処理して、トレーニングをループで実行できます。
for dist_inputs in dist_dataset:
print(distributed_train_step(dist_inputs))
tf.Tensor(0.88850546, shape=(), dtype=float32) tf.Tensor(0.8815902, shape=(), dtype=float32) tf.Tensor(0.87474185, shape=(), dtype=float32) tf.Tensor(0.8679599, shape=(), dtype=float32) tf.Tensor(0.86124384, shape=(), dtype=float32) tf.Tensor(0.8545932, shape=(), dtype=float32) tf.Tensor(0.8480073, shape=(), dtype=float32) tf.Tensor(0.8414857, shape=(), dtype=float32) tf.Tensor(0.8350279, shape=(), dtype=float32) tf.Tensor(0.8286335, shape=(), dtype=float32) tf.Tensor(0.8223018, shape=(), dtype=float32) tf.Tensor(0.81603223, shape=(), dtype=float32) tf.Tensor(0.8098244, shape=(), dtype=float32) tf.Tensor(0.8036777, shape=(), dtype=float32) tf.Tensor(0.7975916, shape=(), dtype=float32) tf.Tensor(0.79156566, shape=(), dtype=float32) tf.Tensor(0.78559923, shape=(), dtype=float32) tf.Tensor(0.77969193, shape=(), dtype=float32) tf.Tensor(0.773843, shape=(), dtype=float32) tf.Tensor(0.7680521, shape=(), dtype=float32)
上記の例では、 dist_dataset
を繰り返し処理して、トレーニングに入力を提供しました。また、numpy入力をサポートするためにtf.distribute.Strategy.make_experimental_numpy_dataset
も提供しています。このAPIを使用して、 tf.distribute.Strategy.experimental_distribute_dataset
呼び出す前にデータセットを作成できます。
データを反復処理するもう1つの方法は、反復子を明示的に使用することです。データセット全体を反復処理するのではなく、特定のステップ数で実行する場合に、これを実行することをお勧めします。反復上記今最初のイテレータを作成するように変更され、その後、明示的に呼んでnext
の入力データを取得することに。
iterator = iter(dist_dataset)
for _ in range(10):
print(distributed_train_step(next(iterator)))
tf.Tensor(0.7623187, shape=(), dtype=float32) tf.Tensor(0.7566423, shape=(), dtype=float32) tf.Tensor(0.7510221, shape=(), dtype=float32) tf.Tensor(0.7454578, shape=(), dtype=float32) tf.Tensor(0.739949, shape=(), dtype=float32) tf.Tensor(0.7344949, shape=(), dtype=float32) tf.Tensor(0.72909516, shape=(), dtype=float32) tf.Tensor(0.7237492, shape=(), dtype=float32) tf.Tensor(0.7184567, shape=(), dtype=float32) tf.Tensor(0.7132167, shape=(), dtype=float32)
これは、 tf.distribute.Strategy
使用してカスタムトレーニングループを配布する最も単純なケースをカバーしています。
現在サポートされているものは何ですか?
トレーニングAPI | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | ParameterServerStrategy | CentralStorageStrategy |
---|---|---|---|---|---|
カスタムトレーニングループ | サポートされています | サポートされています | 実験的サポート | 実験的サポート | 実験的サポート |
例とチュートリアル
カスタムトレーニングループで配布戦略を使用する例を次に示します。
-
MirroredStrategy
を使用してMNISTをトレーニングするためのチュートリアル。 - TPUStrategyを使用した
TPUStrategy
トレーニングに関するガイド。 - さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ。
その他のトピック
このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。
TF_CONFIG環境変数の設定
マルチワーカートレーニングの場合、前述のように、クラスターで実行されているバイナリごとにTF_CONFIG
環境変数を設定する必要があります。 TF_CONFIG
環境変数は、クラスターを構成するタスク、それらのアドレス、およびクラスター内の各タスクの役割を指定するJSON文字列です。 tensorflow / ecosystemリポジトリは、トレーニングタスクのTF_CONFIG
を設定するKubernetesテンプレートを提供します。
TF_CONFIGには、クラスターとタスクの2つのコンポーネントがあります。 clusterは、workerなどのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。マルチワーカートレーニングでは、通常のワーカーが行うことに加えて、チェックポイントの保存やTensorBoardの要約ファイルの作成などのもう少し責任を負うワーカーが通常1人います。このようなワーカーは「チーフ」ワーカーと呼ばれ、インデックス0のワーカーがチーフワーカーとして任命されるのが通例です(実際、これがtf.distribute.Strategyの実装方法です)。一方、タスクは現在のタスクの情報を提供します。最初のコンポーネントクラスターはすべてのワーカーで同じであり、2番目のコンポーネントタスクはワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。
TF_CONFIG
一例はTF_CONFIG
です。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"]
},
"task": {"type": "worker", "index": 1}
})
このTF_CONFIG
は、ホストとポートとともに、クラスター内に3つのワーカーと2つのpsタスクがあることを指定します。 「タスク」の部分は、クラスター内の現在のタスクの役割であるワーカー1(2番目のワーカー)を指定します。クラスター内の有効な役割は、「チーフ」、「ワーカー」、「ps」、および「エバリュエーター」です。 tf.distribute.experimental.ParameterServerStrategy
を使用する場合を除いて、「ps」ジョブはありません。
次は何ですか?
tf.distribute.Strategy
は積極的に開発中です。それを試して、 GitHubの問題を使用してフィードバックを提供してください。