![]() | ![]() | ![]() | ![]() |
概要
tf.distribute.Strategy
は、トレーニングを複数のGPU、複数のマシン、またはTPUに分散するためのTensorFlowAPIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。
tf.distribute.Strategy
は、次の主要な目標を念頭に置いて設計されています。
- 使いやすく、研究者、機械学習エンジニアなどを含む複数のユーザーセグメントをサポートします。
- 箱から出してすぐに優れたパフォーマンスを提供します。
- 戦略を簡単に切り替えることができます。
Keras Model.fit
などの高レベルAPIでtf.distribute.Strategy
を使用してトレーニングを配布したり、カスタムトレーニングループ(および一般的にはTensorFlowを使用した計算)を配布したりできます。
TensorFlow 2.xでは、プログラムを熱心に実行することも、 tf.function
を使用してグラフで実行することもできます。 tf.distribute.Strategy
は、これら両方の実行モードをサポートすることを目的としていますが、 tf.function
で最適に機能します。熱心なモードはデバッグ目的でのみ推奨され、 tf.distribute.TPUStrategy
ではサポートされていません。このガイドの焦点はトレーニングですが、このAPIは、さまざまなプラットフォームで評価と予測を配布するためにも使用できます。
TensorFlowの基盤となるコンポーネントが戦略対応になるように変更されているため、コードをほとんど変更せずにtf.distribute.Strategy
を使用できます。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。
このガイドでは、さまざまなタイプの戦略と、さまざまな状況でそれらを使用する方法について学習します。パフォーマンスの問題をデバッグする方法については、 Optimize TensorFlowGPUパフォーマンスガイドをご覧ください。
TensorFlowを設定する
import tensorflow as tf
戦略の種類
tf.distribute.Strategy
は、さまざまな軸に沿った多くのユースケースをカバーすることを目的としています。これらの組み合わせの一部は現在サポートされており、その他は将来追加される予定です。これらの軸のいくつかは次のとおりです。
- 同期トレーニングと非同期トレーニング:これらは、データ並列処理を使用してトレーニングを分散する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが独立して入力データをトレーニングし、変数を非同期で更新します。通常、同期トレーニングは、パラメーターサーバーアーキテクチャを介したall-reduceおよびasyncを介してサポートされます。
- ハードウェアプラットフォーム:トレーニングを1台のマシン上の複数のGPU、ネットワーク内の複数のマシン(それぞれ、0個以上のGPU)、またはクラウドTPUに拡張することができます。
これらのユースケースをサポートするために、TensorFlowにはMirroredStrategy
、 TPUStrategy
、 MultiWorkerMirroredStrategy
、 ParameterServerStrategy
、 CentralStorageStrategy
、およびその他の利用可能な戦略があります。次のセクションでは、TensorFlowのどのシナリオでこれらのどれがサポートされているかを説明します。ここに簡単な概要があります:
トレーニングAPI | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy |
---|---|---|---|---|---|
Keras Model.fit | サポートされています | サポートされています | サポートされています | 実験的サポート | 実験的サポート |
カスタムトレーニングループ | サポートされています | サポートされています | サポートされています | 実験的サポート | 実験的サポート |
Estimator API | 限定サポート | サポートされていません | 限定サポート | 限定サポート | 限定サポート |
MirroredStrategy
tf.distribute.MirroredStrategy
は、1台のマシン上の複数のGPUでの同期分散トレーニングをサポートします。 GPUデバイスごとに1つのレプリカを作成します。モデル内の各変数は、すべてのレプリカにミラーリングされます。これらの変数が一緒になって、 MirroredVariable
と呼ばれる単一の概念変数を形成します。これらの変数は、同一の更新を適用することにより、相互に同期されます。
効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を通信します。 All-reduceは、テンソルを合計することですべてのデバイスに集約し、各デバイスで使用できるようにします。これは、非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信のタイプに応じて、利用可能な多くのall-reduceアルゴリズムと実装があります。デフォルトでは、All-Reduceの実装としてNVIDIA Collective Communication Library( NCCL )を使用します。他のいくつかのオプションから選択するか、独自に作成することができます。
MirroredStrategy
を作成する最も簡単な方法は次のとおりです。
mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
これにより、 MirroredStrategy
インスタンスが作成され、TensorFlowに表示されるすべてのGPUとNCCLがクロスデバイス通信として使用されます。
マシンで一部のGPUのみを使用する場合は、次のように実行できます。
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0 INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
クロスデバイス通信をオーバーライドする場合は、 tf.distribute.CrossDeviceOps
のインスタンスを指定して、 cross_device_ops
引数を使用してオーバーライドできます。現在、 tf.distribute.HierarchicalCopyAllReduce
とtf.distribute.ReductionToOneDevice
は、デフォルトであるtf.distribute.NcclAllReduce
以外の2つのオプションです。
mirrored_strategy = tf.distribute.MirroredStrategy(
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
TPUStrategy
tf.distribute.TPUStrategy
を使用すると、TensorFlowトレーニングをTensor Processing Unit(TPU)で実行できます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊なASICです。これらは、 Google Colab 、 TPU Research Cloud 、およびCloudTPUで利用できます。
分散トレーニングアーキテクチャに関しては、 TPUStrategy
は同じMirroredStrategy
であり、同期分散トレーニングを実装します。 TPUは、 TPUStrategy
で使用される複数のTPUコアにわたって、効率的なall-reduceおよびその他の集合的な操作の独自の実装を提供します。
TPUStrategy
をインスタンス化する方法は次のとおりです。
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
TPUClusterResolver
インスタンスは、TPUの検索に役立ちます。 Colabでは、引数を指定する必要はありません。
これをクラウドTPUに使用する場合:
-
tpu
引数でTPUリソースの名前を指定する必要があります。 - プログラムの開始時に、TPUシステムを明示的に初期化する必要があります。これは、TPUを計算に使用する前に必要です。 TPUシステムを初期化すると、TPUメモリも消去されるため、状態が失われないように、最初にこの手順を完了することが重要です。
MultiWorkerMirroredStrategy
tf.distribute.MultiWorkerMirroredStrategy
は、 MirroredStrategy
と非常によく似ています。複数のワーカーに同期分散トレーニングを実装し、それぞれに複数のGPUが含まれる可能性があります。 tf.distribute.MirroredStrategy
と同様に、すべてのワーカーの各デバイスでモデル内のすべての変数のコピーを作成します。
MultiWorkerMirroredStrategy
を作成する最も簡単な方法は次のとおりです。
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
には、クロスデバイス通信用の2つの実装があります。 CommunicationImplementation.RING
はRPCベースであり、CPUとGPUの両方をサポートします。 CommunicationImplementation.NCCL
はNCCLを使用し、GPUで最先端のパフォーマンスを提供しますが、CPUをサポートしていません。 CollectiveCommunication.AUTO
は、選択をTensorflowに延期します。次の方法で指定できます。
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL
マルチGPUトレーニングと比較して、マルチワーカートレーニングを開始するための重要な違いの1つは、マルチワーカーのセットアップです。 'TF_CONFIG'
環境変数は、クラスターの一部である各ワーカーにクラスター構成を指定するTensorFlowの標準的な方法です。詳細については、このドキュメントの「 TF_CONFIGの設定」セクションを参照してください。
MultiWorkerMirroredStrategy
の詳細については、次のチュートリアルを検討してください。
ParameterServerStrategy
パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。詳細については、パラメータサーバーのトレーニングチュートリアルをご覧ください。
TensorFlow 2では、パラメーターサーバートレーニングは、 tf.distribute.experimental.coordinator.ClusterCoordinator
クラスを介して中央コーディネーターベースのアーキテクチャを使用します。
この実装では、 worker
およびparameter server
タスクは、コーディネーターからのタスクをリッスンするtf.distribute.Server
を実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。
コーディネーターで実行されているプログラミングでは、 ParameterServerStrategy
オブジェクトを使用してトレーニングステップを定義し、 ClusterCoordinator
を使用してトレーニングステップをリモートワーカーにディスパッチします。それらを作成する最も簡単な方法は次のとおりです。
strategy = tf.distribute.experimental.ParameterServerStrategy(
tf.distribute.cluster_resolver.TFConfigClusterResolver(),
variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
strategy)
ParameterServerStrategy
の詳細については、KerasModel.fitを使用したパラメーターサーバーのトレーニングとカスタムトレーニングループのチュートリアルをご覧ください。
TensorFlow 1では、 ParameterServerStrategy
は、 tf.compat.v1.distribute.experimental.ParameterServerStrategy
シンボルを介したEstimatorでのみ使用できます。
CentralStorageStrategy
tf.distribute.experimental.CentralStorageStrategy
は、同期トレーニングも行います。変数はミラーリングされません。代わりに、変数はCPUに配置され、操作はすべてのローカルGPUに複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。
次の方法でCentralStorageStrategy
のインスタンスを作成します。
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
これにより、表示されているすべてのGPUとCPUを使用するCentralStorageStrategy
インスタンスが作成されます。レプリカの変数への更新は、変数に適用される前に集約されます。
その他の戦略
上記の戦略に加えて、 tf.distribute
を使用する際のプロトタイピングとデバッグに役立つ可能性のある他の2つの戦略があります。
デフォルト戦略
デフォルト戦略は、明示的な配布戦略が範囲内にない場合に存在する配布戦略です。これはtf.distribute.Strategy
インターフェースを実装しますが、パススルーであり、実際の配布を提供しません。たとえば、 Strategy.run(fn)
は単にfn
を呼び出します。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作する必要があります。あなたはそれを「ノーオペレーション」戦略と考えることができます。
デフォルト戦略はシングルトンであり、それ以上のインスタンスを作成することはできません。これは、明示的な戦略のスコープ外でtf.distribute.get_strategy
を使用して取得できます(明示的な戦略のスコープ内で現在の戦略を取得するために使用できるのと同じAPI)。
default_strategy = tf.distribute.get_strategy()
この戦略は2つの主な目的を果たします。
- これにより、配布対応のライブラリコードを無条件に記述できます。たとえば、
tf.optimizer
では、tf.distribute.get_strategy
を使用して、その戦略を使用して勾配を減らすことができます。これにより、Strategy.reduce
を呼び出すことができる戦略オブジェクトが常に返されます。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None) # reduce some values
1.0プレースホルダー18
- ライブラリコードと同様に、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず動作するエンドユーザーのプログラムを作成するために使用できます。これを示すサンプルコードスニペットは次のとおりです。
if tf.config.list_physical_devices('GPU'):
strategy = tf.distribute.MirroredStrategy()
else: # Use the Default Strategy
strategy = tf.distribute.get_strategy()
with strategy.scope():
# Do something interesting
print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) MirroredVariable:{ 0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0> }
OneDeviceStrategy
tf.distribute.OneDeviceStrategy
は、すべての変数と計算を単一の指定されたデバイスに配置するための戦略です。
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
この戦略は、いくつかの点でデフォルト戦略とは異なります。デフォルト戦略では、分散戦略なしでTensorFlowを実行する場合と比較した場合、変数配置ロジックは変更されません。ただし、 OneDeviceStrategy
を使用する場合、そのスコープで作成されたすべての変数は、指定されたデバイスに明示的に配置されます。さらに、 OneDeviceStrategy.run
を介して呼び出された関数も、指定されたデバイスに配置されます。
この戦略を通じて分散された入力は、指定されたデバイスにプリフェッチされます。デフォルト戦略では、入力分布はありません。
デフォルト戦略と同様に、この戦略は、実際に複数のデバイス/マシンに配布される他の戦略に切り替える前に、コードをテストするためにも使用できます。これにより、デフォルト戦略よりもいくらか多くの配布戦略機構が実行されますが、たとえば、 MirroredStrategy
やTPUStrategy
を完全に使用できるわけではありません。戦略がないかのように動作するコードが必要な場合は、デフォルト戦略を使用します。
これまで、さまざまな戦略と、それらをインスタンス化する方法について学習しました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法を示します。
KerasModel.fitでtf.distribute.Strategyを使用する
tf.distribute.Strategy
は、TensorFlowによるKerasAPI仕様の実装であるtf.keras
に統合されています。 tf.keras
は、モデルを構築およびトレーニングするための高レベルAPIです。 tf.keras
バックエンドに統合することで、 Model.fitを使用してKerasトレーニングフレームワークで記述されたトレーニングをシームレスに配布できます。
コードで変更する必要があるものは次のとおりです。
- 適切な
tf.distribute.Strategy
のインスタンスを作成します。 - Kerasモデル、オプティマイザー、メトリクスの作成を
strategy.scope
内に移動します。
TensorFlow分散戦略は、すべてのタイプのKerasモデル(シーケンシャル、ファンクショナル、サブクラス)をサポートします。
これを行うためのコードスニペットは、1つのDense
レイヤーを持つ非常に単純なKerasモデルです。
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
この例ではMirroredStrategy
を使用しているため、複数のGPUを搭載したマシンでこれを実行できます。 strategy.scope()
は、トレーニングの配布に使用する戦略をKerasに示します。このスコープ内にモデル/オプティマイザー/メトリクスを作成すると、通常の変数の代わりに分散変数を作成できます。これを設定すると、通常どおりにモデルを適合させることができます。 MirroredStrategy
は、利用可能なGPUでのモデルのトレーニングの複製、グラデーションの集計などを処理します。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). 2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). 10/10 [==============================] - 3s 2ms/step - loss: 2.2552 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). Epoch 2/2 10/10 [==============================] - 0s 2ms/step - loss: 0.9968 2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } 10/10 [==============================] - 1s 2ms/step - loss: 0.6190 0.6190494298934937
ここで、 tf.data.Dataset
はトレーニングと評価の入力を提供します。 NumPy配列を使用することもできます。
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2 2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9" op: "FlatMapDataset" input: "PrefetchDataset/_8" attr { key: "Targuments" value { list { } } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_slice_batch_indices_997" } } } attr { key: "output_shapes" value { list { shape { dim { size: 10 } } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`. 10/10 [==============================] - 1s 2ms/step - loss: 0.4406 Epoch 2/2 10/10 [==============================] - 0s 2ms/step - loss: 0.1947 <keras.callbacks.History at 0x7fb81813d2d0>プレースホルダー27
どちらの場合も( Dataset
またはNumPyを使用)、指定された入力の各バッチは複数のレプリカ間で均等に分割されます。たとえば、2つのGPUでMirroredStrategy
を使用している場合、サイズ10の各バッチは2つのGPUに分割され、各ステップで5つの入力例を受け取ります。 GPUを追加すると、各エポックのトレーニングが速くなります。通常、追加の計算能力を有効に活用するために、アクセラレータを追加するときにバッチサイズを増やす必要があります。モデルによっては、学習率を再調整する必要もあります。 strategy.num_replicas_in_sync
を使用して、レプリカの数を取得できます。
# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)
LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
現在サポートされているものは何ですか?
トレーニングAPI | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | ParameterServerStrategy | CentralStorageStrategy |
---|---|---|---|---|---|
Keras Model.fit | サポートされています | サポートされています | サポートされています | 実験的サポート | 実験的サポート |
例とチュートリアル
上記のModel.fit
とのエンドツーエンドの統合を説明するチュートリアルと例のリストを次に示します。
- チュートリアル:
Model.fit
とMirroredStrategy
を使用したトレーニング。 - チュートリアル:
Model.fit
とMultiWorkerMirroredStrategy
を使用したトレーニング。 - ガイド:
Model.fit
およびTPUStrategy
の使用例が含まれています。 - チュートリアル:
Model.fit
およびParameterServerStrategy
を使用したパラメーターサーバーのトレーニング。 - チュートリアル:
Model.fit
とTPUStrategy
を使用してGLUEベンチマークから多くのタスクのBERTを微調整します。 - さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ。
カスタムトレーニングループでtf.distribute.Strategyを使用する
上で示したように、Keras Model.fit
でtf.distribute.Strategy
を使用するには、コードの数行を変更するだけで済みます。もう少し努力すれば、カスタムトレーニングループでtf.distribute.Strategy
を使用することもできます。
EstimatorやKerasよりも柔軟性とトレーニングループの制御が必要な場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、ラウンドごとに異なる数のジェネレーターまたはディスクリミネーターのステップを実行することができます。同様に、高レベルのフレームワークは強化学習トレーニングにはあまり適していません。
tf.distribute.Strategy
クラスは、カスタムトレーニングループをサポートするためのメソッドのコアセットを提供します。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるようになります。
以下は、以前と同じKerasモデルを使用した簡単なトレーニング例のこのユースケースを示す簡単なスニペットです。
まず、戦略の範囲内でモデルとオプティマイザーを作成します。これにより、モデルとオプティマイザーで作成された変数がミラーリングされた変数になります。
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
optimizer = tf.keras.optimizers.SGD()
次に、入力データセットを作成し、 tf.distribute.Strategy.experimental_distribute_dataset
を呼び出して、戦略に基づいてデータセットを配布します。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } }
次に、トレーニングの1つのステップを定義します。 tf.GradientTape
を使用して勾配を計算し、オプティマイザーを使用してそれらの勾配を適用してモデルの変数を更新します。このトレーニングステップを配布するには、関数train_step
に入れて、前に作成したdist_dataset
から取得したデータセット入力とともにtf.distribute.Strategy.run
に渡します。
loss_object = tf.keras.losses.BinaryCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)
def train_step(inputs):
features, labels = inputs
with tf.GradientTape() as tape:
predictions = model(features, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
@tf.function
def distributed_train_step(dist_inputs):
per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
上記のコードで注意すべき他のいくつかのこと:
-
tf.nn.compute_average_loss
を使用して損失を計算しました。tf.nn.compute_average_loss
は、例ごとの損失を合計し、その合計をglobal_batch_size
で除算します。後で各レプリカで勾配が計算された後、それらを合計することによってレプリカ全体で集約されるため、これは重要です。 - また、
tf.distribute.Strategy.reduce
APIを使用して、tf.distribute.Strategy.run
によって返された結果を集計しました。tf.distribute.Strategy.run
は、ストラテジー内の各ローカルレプリカから結果を返します。この結果を利用するには、複数の方法があります。それらをreduce
て、集計値を取得できます。tf.distribute.Strategy.experimental_local_results
を実行して、結果に含まれる値のリストをローカルレプリカごとに1つずつ取得することもできます。 - 配布戦略スコープ内で
apply_gradients
を呼び出すと、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。
最後に、トレーニングステップを定義したら、 dist_dataset
を反復処理して、トレーニングをループで実行できます。
for dist_inputs in dist_dataset:
print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32) tf.Tensor(0.18628375, shape=(), dtype=float32) tf.Tensor(0.18570684, shape=(), dtype=float32) tf.Tensor(0.18513316, shape=(), dtype=float32) tf.Tensor(0.1845627, shape=(), dtype=float32) tf.Tensor(0.18399543, shape=(), dtype=float32) tf.Tensor(0.18343134, shape=(), dtype=float32) tf.Tensor(0.18287037, shape=(), dtype=float32) tf.Tensor(0.18231256, shape=(), dtype=float32) tf.Tensor(0.18175781, shape=(), dtype=float32) tf.Tensor(0.18120615, shape=(), dtype=float32) tf.Tensor(0.18065754, shape=(), dtype=float32) tf.Tensor(0.18011193, shape=(), dtype=float32) tf.Tensor(0.17956935, shape=(), dtype=float32) tf.Tensor(0.17902976, shape=(), dtype=float32) tf.Tensor(0.17849308, shape=(), dtype=float32) tf.Tensor(0.17795937, shape=(), dtype=float32) tf.Tensor(0.17742859, shape=(), dtype=float32) tf.Tensor(0.17690066, shape=(), dtype=float32) tf.Tensor(0.17637561, shape=(), dtype=float32)
上記の例では、 dist_dataset
を繰り返し処理して、トレーニングに入力を提供しました。 NumPy入力をサポートするためのtf.distribute.Strategy.make_experimental_numpy_dataset
も提供されます。このAPIを使用して、 tf.distribute.Strategy.experimental_distribute_dataset
を呼び出す前にデータセットを作成できます。
データを反復処理するもう1つの方法は、反復子を明示的に使用することです。データセット全体を反復処理するのではなく、特定のステップ数で実行する場合に、これを実行することをお勧めします。上記の反復は、最初にイテレータを作成し、 next
それを明示的に呼び出して入力データを取得するように変更されます。
iterator = iter(dist_dataset)
for _ in range(10):
print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32) tf.Tensor(0.17533402, shape=(), dtype=float32) tf.Tensor(0.17481743, shape=(), dtype=float32) tf.Tensor(0.17430364, shape=(), dtype=float32) tf.Tensor(0.17379259, shape=(), dtype=float32) tf.Tensor(0.17328428, shape=(), dtype=float32) tf.Tensor(0.17277871, shape=(), dtype=float32) tf.Tensor(0.17227581, shape=(), dtype=float32) tf.Tensor(0.17177561, shape=(), dtype=float32) tf.Tensor(0.17127804, shape=(), dtype=float32)プレースホルダー36
これは、 tf.distribute.Strategy
を使用してカスタムトレーニングループを配布する最も単純なケースをカバーしています。
現在サポートされているものは何ですか?
トレーニングAPI | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | ParameterServerStrategy | CentralStorageStrategy |
---|---|---|---|---|---|
カスタムトレーニングループ | サポートされています | サポートされています | サポートされています | 実験的サポート | 実験的サポート |
例とチュートリアル
カスタムトレーニングループで配布戦略を使用する例を次に示します。
- チュートリアル:カスタムトレーニングループと
MirroredStrategy
を使用したトレーニング。 - チュートリアル:カスタムトレーニングループと
MultiWorkerMirroredStrategy
を使用したトレーニング。 - ガイド:
TPUStrategy
を使用したカスタムトレーニングループの例が含まれています。 - チュートリアル:カスタムトレーニングループと
ParameterServerStrategy
を使用したパラメーターサーバートレーニング。 - さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ。
その他のトピック
このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。
TF_CONFIG環境変数の設定
マルチワーカートレーニングの場合、前述のように、クラスターで実行されているバイナリごとに'TF_CONFIG'
環境変数を設定する必要があります。 'TF_CONFIG'
環境変数は、クラスターを構成するタスク、それらのアドレス、およびクラスター内の各タスクの役割を指定するJSON文字列です。 tensorflow/ecosystem
リポジトリは、トレーニングタスク用に'TF_CONFIG'
を設定するKubernetesテンプレートを提供します。
'TF_CONFIG'
には、クラスターとタスクの2つのコンポーネントがあります。
- クラスターは、トレーニングクラスターに関する情報を提供します。これは、ワーカーなどのさまざまなタイプのジョブで構成されるdictです。マルチワーカートレーニングでは、通常のワーカーが行うことに加えて、チェックポイントの保存やTensorBoardのサマリーファイルの作成など、もう少し責任を負うワーカーが1人います。このようなワーカーは「チーフ」ワーカーと呼ばれ、インデックス
0
のワーカーがチーフワーカーとして任命されるのが通例です(実際、これがtf.distribute.Strategy
の実装方法です)。 - 一方、タスクは現在のタスクに関する情報を提供します。最初のコンポーネントクラスターはすべてのワーカーで同じであり、2番目のコンポーネントタスクはワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。
'TF_CONFIG'
一例は次のとおりです。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"]
},
"task": {"type": "worker", "index": 1}
})
この'TF_CONFIG'
は、ホストとポートとともに、 "cluster"
に3つのワーカーと2つの"ps"
タスクがあることを指定します。 "task"
の部分は、 "cluster"
の現在のタスクの役割を指定します—ワーカー1
(2番目のワーカー)。クラスタ内の有効なロールは、 "chief"
、 "worker"
、 "ps"
、および"evaluator"
です。 tf.distribute.experimental.ParameterServerStrategy
を使用する場合を除いて、 "ps"
ジョブはありません。
次は何ですか?
tf.distribute.Strategy
は積極的に開発中です。試してみて、 GitHubの問題を使用してフィードバックを提供してください。