質問があります? TensorFlowフォーラム訪問フォーラムでコミュニティとつながる

TensorFlowを使用した分散トレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロードする

概要概要

tf.distribute.Strategyは、トレーニングを複数のGPU、複数のマシン、またはTPUに分散するためのTensorFlowAPIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。

tf.distribute.Strategyは、次の主要な目標を念頭に置いて設計されています。

  • 使いやすく、研究者、MLエンジニアなどを含む複数のユーザーセグメントをサポートします。
  • 箱から出してすぐに優れたパフォーマンスを提供します。
  • 戦略を簡単に切り替えることができます。

tf.distribute.Strategyのような高レベルAPIで使用することができKeras 、またカスタムトレーニングループ(及び、TensorFlowを使用して、一般的に、任意の計算)を配布するために使用することができます。

TensorFlow 2.xでは、プログラムを熱心に実行することも、 tf.functionを使用してグラフで実行することもできます。 tf.distribute.Strategyは、これら両方の実行モードをサポートすることを目的としていますが、 tf.function最適にtf.functionます。イーガーモードはデバッグ目的でのみ推奨され、 TPUStrategyではサポートされていません。このガイドの焦点はトレーニングですが、このAPIは、さまざまなプラットフォームで評価と予測を配布するためにも使用できます。

tf.distribute.Strategyの基盤となるコンポーネントが戦略対応になるように変更されているため、コードをほとんど変更せずにtf.distribute.Strategyを使用できます。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。

このガイドでは、さまざまなタイプの戦略と、さまざまな状況でそれらを使用する方法について学習します。パフォーマンスの問題をデバッグする方法については、 Optimize TensorFlowGPUパフォーマンスガイドを参照してください。

# Import TensorFlow
import tensorflow as tf

戦略の種類

tf.distribute.Strategyは、さまざまな軸に沿った多くのユースケースをカバーすることをtf.distribute.Strategyとしています。これらの組み合わせの一部は現在サポートされており、その他は将来追加される予定です。これらの軸のいくつかは次のとおりです。

  • 同期トレーニングと非同期トレーニング:これらは、データの並列処理を使用してトレーニングを分散する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが独立して入力データをトレーニングし、変数を非同期に更新します。通常、同期トレーニングは、パラメータサーバーアーキテクチャを介したall-reduceおよびasyncを介してサポートされます。
  • ハードウェアプラットフォーム: 1台のマシン上の複数のGPU、ネットワーク内の複数のマシン(それぞれ0個以上のGPU)、またはクラウドTPUにトレーニングを拡張することをお勧めします。

これらのユースケースをサポートするために、6つの戦略が利用可能です。次のセクションでは、TensorFlowのどのシナリオでこれらのどれがサポートされているかを説明します。概要は次のとおりです。

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras APIサポートされていますサポートされていますサポートされています実験的サポートサポートされている計画されたポスト2.4
カスタムトレーニングループサポートされていますサポートされていますサポートされています実験的サポート実験的サポート
Estimator API限定サポートサポートされていません限定サポート限定サポート限定サポート

MirroredStrategy

tf.distribute.MirroredStrategyは、1台のマシン上の複数のGPUでの同期分散トレーニングをサポートします。 GPUデバイスごとに1つのレプリカを作成します。モデル内の各変数は、すべてのレプリカにわたってミラーリングされます。これらの変数が一緒になって、 MirroredVariableと呼ばれる単一の概念変数を形成します。これらの変数は、同一の更新を適用することにより、相互に同期されます。

効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を通信します。 All-reduceは、テンソルを合計することですべてのデバイスに集約し、各デバイスで使用できるようにします。これは、非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信のタイプに応じて、利用可能な多くのall-reduceアルゴリズムと実装があります。デフォルトでは、 All - Reduceの実装としてNVIDIA Collective Communication Library( NCCL )を使用します。他のいくつかのオプションから選択するか、独自に作成することができます。

MirroredStrategyを作成する最も簡単な方法は次のとおりです。

mirrored_strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

これにより、 MirroredStrategyインスタンスが作成され、TensorFlowに表示されるすべてのGPUとNCCLがクロスデバイス通信として使用されます。

マシンで一部のGPUのみを使用する場合は、次のように実行できます。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

クロスデバイス通信をオーバーライドする場合は、 tf.distribute.CrossDeviceOpsインスタンスを指定して、 cross_device_ops引数を使用してcross_device_opsできます。現在、 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDeviceは、デフォルトであるtf.distribute.NcclAllReduce以外の2つのオプションです。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy使用すると、TensorFlowトレーニングをTensor Processing Unit(TPU)で実行できます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊なASICです。彼らは上で利用可能なGoogleのコラボTensorFlow研究クラウド、およびクラウドTPU

分散トレーニングアーキテクチャに関しては、 TPUStrategyは同じMirroredStrategyあり、同期分散トレーニングを実装します。 TPUは、 TPUStrategy使用される複数のTPUコアにわたって、効率的なall-reduceおよびその他の集合的な操作の独自の実装を提供します。

TPUStrategyをインスタンス化する方法は次のTPUStrategyです。

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolverインスタンスは、TPUの検索に役立ちます。 Colabでは、引数を指定する必要はありません。

これをクラウドTPUに使用する場合:

  • tpu引数にTPUリソースの名前を指定する必要があります。
  • プログラムの開始時に、tpuシステムを明示的に初期化する必要があります。これは、TPUを計算に使用する前に必要です。 tpuシステムを初期化すると、TPUメモリも消去されるため、状態が失われないように、最初にこの手順を完了することが重要です。

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategyは、 MirroredStrategyと非常によく似ています。複数のワーカーに同期分散トレーニングを実装し、それぞれに複数のGPUが含まれる可能性があります。 tf.distribute.MirroredStrategyと同様に、すべてのワーカーの各デバイスでモデル内のすべての変数のコピーを作成します。

MultiWorkerMirroredStrategyを作成する最も簡単な方法はMultiWorkerMirroredStrategyです。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategyは、クロスデバイス通信用の2つの実装があります。 CommunicationImplementation.RINGRPCベースであり、CPUとGPUの両方をサポートします。 CommunicationImplementation.NCCLはNCCLを使用し、GPUで最先端のパフォーマンスを提供しますが、CPUをサポートしていません。 CollectiveCommunication.AUTOは、選択をTensorflowに延期します。次の方法で指定できます。

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

マルチGPUトレーニングと比較して、マルチワーカートレーニングを開始するための主な違いの1つは、マルチワーカーのセットアップです。 TF_CONFIG環境変数は、クラスターの一部である各ワーカーにクラスター構成を指定するTensorFlowの標準的な方法です。 TF_CONFIG設定の詳細をご覧ください。

ParameterServerStrategy

パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。詳細については、パラメータサーバーのトレーニングチュートリアルをご覧ください。

TensorFlow 2では、パラメーターサーバートレーニングは、 tf.distribute.experimental.coordinator.ClusterCoordinatorクラスを介して中央コーディネーターベースのアーキテクチャを使用します。

この実装では、 workerおよびparameter serverタスクは、コーディネーターからのタスクをリッスンするtf.distribute.Serverを実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。

コーディネーターで実行されているプログラミングでは、 ParameterServerStrategyオブジェクトを使用してトレーニングステップを定義し、 ClusterCoordinatorを使用してトレーニングステップをリモートワーカーにディスパッチします。それらを作成する最も簡単な方法は次のとおりです。

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

TensorFlow 1では、 ParameterServerStrategyは、 tf.compat.v1.distribute.experimental.ParameterServerStrategyシンボルを介したEstimatorでのみ使用できます。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategyは、同期トレーニングも行います。変数はミラーリングされませんが、代わりにCPUに配置され、操作はすべてのローカルGPUに複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。

次の方法でCentralStorageStrategyインスタンスを作成します。

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

これにより、表示されているすべてのGPUとCPUを使用するCentralStorageStrategyインスタンスが作成されます。レプリカの変数への更新は、変数に適用される前に集約されます。

その他の戦略

上記の戦略に加えて、 tf.distribute使用する際のプロトタイピングとデバッグに役立つ可能性のある他の2つの戦略があります。

デフォルトの戦略

デフォルト戦略は、明示的な配布戦略がスコープ内にない場合に存在する配布戦略です。これはtf.distribute.Strategyインターフェースを実装しますが、パススルーであり、実際の配布は提供しません。たとえば、 strategy.run(fn)は単にfn呼び出します。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作する必要があります。あなたはそれを「ノーオペレーション」戦略と考えることができます。

デフォルトの戦略はシングルトンであり、それ以上のインスタンスを作成することはできません。これは、明示的なストラテジーのスコープ外でtf.distribute.get_strategy使用して取得できます(明示的なストラテジーのスコープ内で現在のストラテジーを取得するために使用できるのと同じAPI)。

default_strategy = tf.distribute.get_strategy()

この戦略は2つの主な目的を果たします。

  • これにより、配布対応のライブラリコードを無条件に記述できます。例えば、中tf.optimizer S使用することができますtf.distribute.get_strategy戦略を低減させるための勾配を、それは常にあなたが削減APIを呼び出すことができた上で戦略オブジェクトを返すことと使用を。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • ライブラリコードと同様に、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず動作するエンドユーザーのプログラムを作成するために使用できます。これを示すサンプルコードスニペットは次のとおりです。
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategyは、すべての変数と計算を単一の指定されたデバイスに配置するための戦略です。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

この戦略は、いくつかの点でデフォルト戦略とは異なります。デフォルトストラテジーでは、分散ストラテジーなしでTensorFlowを実行する場合と比較して、変数配置ロジックは変更されません。ただし、 OneDeviceStrategyを使用する場合、そのスコープで作成されたすべての変数は、指定されたデバイスに明示的に配置されます。さらに、 OneDeviceStrategy.runを介してOneDeviceStrategy.run関数も、指定されたデバイスに配置されます。

この戦略を通じて配布された入力は、指定されたデバイスにプリフェッチされます。デフォルト戦略では、入力分布はありません。

デフォルト戦略と同様に、この戦略は、実際に複数のデバイス/マシンに配布される他の戦略に切り替える前に、コードをテストするためにも使用できます。これにより、デフォルトの戦略よりもいくらか多くの配布戦略機構が実行されますが、 MirroredStrategyTPUStrategyなどを使用するほどではありません。戦略がないかのように動作するコードが必要な場合は、デフォルトの戦略を使用してください。

これまで、利用可能なさまざまな戦略と、それらをインスタンス化する方法を見てきました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法を示します。

tf.distribute.Strategytf.keras.Model.fitを使用する

tf.distribute.Strategyは、TensorFlowによるKerasAPI仕様の実装であるtf.kerasに統合されていますtf.kerasは、モデルを構築およびトレーニングするための高レベルAPIです。 tf.kerasバックエンドに統合することで、 Model.fitを使用してKerasトレーニングフレームワークで記述されたトレーニングをシームレスに配布できます。

コードで変更する必要があるものは次のとおりです。

  1. 適切なtf.distribute.Strategyインスタンスを作成します。
  2. Kerasモデル、オプティマイザー、メトリックの作成をstrategy.scope内に移動します。

TensorFlow配布戦略は、シーケンシャル、ファンクショナル、サブクラスのすべてのタイプのKerasモデルをサポートします。

これは、1つの密なレイヤーを持つ非常に単純なKerasモデルに対してこれを行うためのコードスニペットです。

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

この例ではMirroredStrategy使用しているため、複数のGPUを搭載したマシンでこれを実行できます。 strategy.scope()は、トレーニングの配布に使用する戦略をKerasに示します。このスコープ内にモデル/オプティマイザー/メトリックを作成すると、通常の変数の代わりに分散変数を作成できます。これを設定すると、通常どおりにモデルを適合させることができます。 MirroredStrategyは、利用可能なGPUでのモデルのトレーニングの複製、グラデーションの集計などを処理します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 0.3648
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1613
10/10 [==============================] - 1s 2ms/step - loss: 0.1001
0.1001412644982338

ここで、tf.data.Datasetはトレーニングと評価の入力を提供します。 NumPy配列を使用することもできます。

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 1s 2ms/step - loss: 0.0713
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0315
<tensorflow.python.keras.callbacks.History at 0x7f18ac0bdcd0>

どちらの場合も( DatasetまたはNumpyを使用)、指定された入力の各バッチは複数のレプリカ間で均等に分割されます。たとえば、2つのGPUでMirroredStrategyを使用している場合、サイズ10の各バッチは2つのGPUに分割され、各ステップで5つの入力例を受け取ります。 GPUを追加すると、各エポックのトレーニングが速くなります。通常、追加の計算能力を効果的に利用するために、アクセラレータを追加するにつれてバッチサイズを増やす必要があります。モデルによっては、学習率を再調整する必要もあります。レプリカの数を取得するには、 strategy.num_replicas_in_syncを使用できます。

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras APIサポートされていますサポートされています実験的サポート実験的サポート実験的サポート

例とチュートリアル

上記のKerasとのエンドツーエンドの統合を説明するチュートリアルと例のリストを次に示します。

  1. MirroredStrategyを使用してMNISTをトレーニングするためのチュートリアル
  2. MultiWorkerMirroredStrategyを使用してMNISTをMultiWorkerMirroredStrategyするためのチュートリアル
  3. TPUStrategyを使用したTPUStrategyトレーニングに関するガイド
  4. ParameterServerStrategyしたTensorFlow2でのパラメーターサーバートレーニングのチュートリアル
  5. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ

カスタムトレーニングループでのtf.distribute.Strategy使用

あなたが見てきたように、使用してtf.distribute.Strategy Kerasのでmodel.fitあなたのコードの唯一の数行を変更する必要があります。もう少し努力すれば、カスタムトレーニングループでtf.distribute.Strategyを使用することもできます。

EstimatorやKerasで可能なよりも柔軟性とトレーニングループの制御が必要な場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、ラウンドごとに異なる数のジェネレーターまたはディスクリミネーターのステップを実行することができます。同様に、高レベルのフレームワークは強化学習トレーニングにはあまり適していません。

tf.distribute.Strategyクラスは、カスタムトレーニングループをサポートするためのメソッドのコアセットを提供します。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるはずです。

ここでは、以前と同じKerasモデルを使用した簡単なトレーニング例のこのユースケースを示す簡単なスニペットが表示されます。

まず、戦略の範囲内でモデルとオプティマイザーを作成します。これにより、モデルとオプティマイザーで作成された変数がミラーリングされた変数になります。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

次に、入力データセットを作成し、 tf.distribute.Strategy.experimental_distribute_datasetを呼び出して、戦略に基づいてデータセットを配布します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

次に、トレーニングの1つのステップを定義します。tf.GradientTapeを使用して勾配を計算し、オプティマイザーを使用してそれらの勾配を適用してモデルの変数を更新します。このトレーニングステップを配布するには、関数train_step入れて、前に作成したdist_datasetから取得したデータセット入力とともにtf.distrbute.Strategy.runに渡します。

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上記のコードで注意すべき他のいくつかのこと:

  1. tf.nn.compute_average_lossを使用して損失を計算しました。 tf.nn.compute_average_lossは、例ごとの損失を合計し、その合計をglobal_batch_sizeで除算します。後で各レプリカで勾配が計算された後、それらを合計することによってレプリカ全体で集計されるため、これは重要です。
  2. また、 tf.distribute.Strategy.reduce APIを使用して、 tf.distribute.Strategy.reduceによって返された結果を集計しtf.distribute.Strategy.runtf.distribute.Strategy.runは、ストラテジー内の各ローカルレプリカから結果を返します。この結果を利用するには、複数の方法があります。それらをreduceて、集計値を取得できます。 tf.distribute.Strategy.experimental_local_resultsを実行して、結果に含まれる値のリストをローカルレプリカごとに1つずつ取得することもできます。
  3. 配布戦略スコープ内でapply_gradientsを呼び出すと、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。

最後に、トレーニングステップを定義したら、 dist_datasetを反復処理して、トレーニングをループで実行できます。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.5716536, shape=(), dtype=float32)
tf.Tensor(0.56787133, shape=(), dtype=float32)
tf.Tensor(0.564126, shape=(), dtype=float32)
tf.Tensor(0.5604175, shape=(), dtype=float32)
tf.Tensor(0.5567452, shape=(), dtype=float32)
tf.Tensor(0.5531087, shape=(), dtype=float32)
tf.Tensor(0.54950774, shape=(), dtype=float32)
tf.Tensor(0.5459418, shape=(), dtype=float32)
tf.Tensor(0.5424106, shape=(), dtype=float32)
tf.Tensor(0.5389137, shape=(), dtype=float32)
tf.Tensor(0.53545076, shape=(), dtype=float32)
tf.Tensor(0.5320213, shape=(), dtype=float32)
tf.Tensor(0.528625, shape=(), dtype=float32)
tf.Tensor(0.5252616, shape=(), dtype=float32)
tf.Tensor(0.52193063, shape=(), dtype=float32)
tf.Tensor(0.5186317, shape=(), dtype=float32)
tf.Tensor(0.5153646, shape=(), dtype=float32)
tf.Tensor(0.51212865, shape=(), dtype=float32)
tf.Tensor(0.5089238, shape=(), dtype=float32)
tf.Tensor(0.5057497, shape=(), dtype=float32)

上記の例では、 dist_datasetを繰り返し処理して、トレーニングに入力を提供しました。 NumPy入力をサポートするためのtf.distribute.Strategy.make_experimental_numpy_datasetも提供されます。このAPIを使用して、 tf.distribute.Strategy.experimental_distribute_dataset呼び出す前にデータセットを作成できます。

データを反復処理するもう1つの方法は、反復子を明示的に使用することです。データセット全体を反復処理するのではなく、特定のステップ数で実行する場合に、これを実行することをお勧めします。反復上記今最初のイテレータを作成するように変更され、その後、明示的に呼んでnextの入力データを取得することに。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.5026059, shape=(), dtype=float32)
tf.Tensor(0.49949214, shape=(), dtype=float32)
tf.Tensor(0.496408, shape=(), dtype=float32)
tf.Tensor(0.4933532, shape=(), dtype=float32)
tf.Tensor(0.49032742, shape=(), dtype=float32)
tf.Tensor(0.48733026, shape=(), dtype=float32)
tf.Tensor(0.48436147, shape=(), dtype=float32)
tf.Tensor(0.4814206, shape=(), dtype=float32)
tf.Tensor(0.4785076, shape=(), dtype=float32)
tf.Tensor(0.475622, shape=(), dtype=float32)

これは、 tf.distribute.Strategy使用してカスタムトレーニングループを配布する最も単純なケースをカバーしています。

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
カスタムトレーニングループサポートされていますサポートされています実験的サポート実験的サポート実験的サポート

例とチュートリアル

カスタムトレーニングループで配布戦略を使用する例を次に示します。

  1. MirroredStrategyを使用してMNISTをトレーニングするためのチュートリアル
  2. TPUStrategyを使用したTPUStrategyトレーニングに関するガイド
  3. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ

その他のトピック

このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。

TF_CONFIG環境変数の設定

マルチワーカートレーニングの場合、前述のように、クラスターで実行されているバイナリごとにTF_CONFIG環境変数を設定する必要があります。 TF_CONFIG環境変数は、クラスターを構成するタスク、それらのアドレス、およびクラスター内の各タスクの役割を指定するJSON文字列です。 tensorflow/ecosystemリポジトリは、トレーニングタスク用にTF_CONFIGを設定するKubernetesテンプレートを提供します。

TF_CONFIG 、クラスターとタスクの2つのコンポーネントがあります。

  • クラスターは、トレーニングクラスターに関する情報を提供します。これは、ワーカーなどのさまざまなタイプのジョブで構成されるdictです。マルチワーカートレーニングでは、通常のワーカーが行うことに加えて、チェックポイントの保存やTensorBoardの要約ファイルの作成などのもう少し責任を負うワーカーが通常1人います。このようなワーカーは「チーフ」ワーカーと呼ばれ、インデックス0のワーカーがチーフワーカーとして任命されるのが通例です(実際、これがtf.distribute.Strategy実装方法tf.distribute.Strategy )。
  • 一方、タスクは現在のタスクの情報を提供します。最初のコンポーネントクラスターはすべてのワーカーで同じであり、2番目のコンポーネントタスクはワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。

TF_CONFIG一例はTF_CONFIGです。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

このTF_CONFIGは、ホストとポートとともに、 "cluster"に3つのワーカーと2つの"ps"タスクがあることを指定します。 "task"部分は、 "cluster"現在のタスクの役割、つまりワーカー1(2番目のワーカー)を指定します。クラスター内の有効な役割は、 "chief""worker""ps" 、および"evaluator"です。 tf.distribute.experimental.ParameterServerStrategyを使用する場合を除いて、 "ps"ジョブはありません。

次は何ですか?

tf.distribute.Strategyは積極的に開発中です。それを試して、 GitHubの問題を使用しフィードバックを提供してください。