このページは Cloud Translation API によって翻訳されました。
Switch to English

TensorFlowを使用した分散トレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロード

概要概要

tf.distribute.Strategyは、トレーニングを複数のGPU、複数のマシン、またはTPUに分散するためのTensorFlowAPIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。

tf.distribute.Strategyは、次の主要な目標を念頭に置いて設計されています。

  • 使いやすく、研究者、MLエンジニアなどを含む複数のユーザーセグメントをサポートします。
  • 箱から出してすぐに優れたパフォーマンスを提供します。
  • 戦略を簡単に切り替えることができます。

tf.distribute.Strategyのような高レベルAPIで使用することができKeras 、またカスタムトレーニングループ(及び、TensorFlowを使用して、一般的に、任意の計算)を配布するために使用することができます。

TensorFlow 2.xでは、プログラムを熱心に実行することも、 tf.functionを使用してグラフで実行することもできます。 tf.distribute.Strategyは、これら両方の実行モードをサポートすることを目的としていますが、 tf.function最適にtf.functionます。イーガーモードはデバッグ目的でのみ推奨され、 TPUStrategyではサポートされていません。このガイドの焦点はトレーニングですが、このAPIは、さまざまなプラットフォームで評価と予測を配布するためにも使用できます。

tf.distribute.Strategyの基盤となるコンポーネントが戦略対応になるように変更されたため、コードをほとんど変更せずにtf.distribute.Strategyを使用できます。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。

このガイドでは、さまざまなタイプの戦略と、さまざまな状況でそれらを使用する方法について説明します。パフォーマンスの問題をデバッグする方法については、 Optimize TensorFlowGPUパフォーマンスガイドを参照してください。

# Import TensorFlow
import tensorflow as tf

戦略の種類

tf.distribute.Strategyは、さまざまな軸に沿った多くのユースケースをカバーすることをtf.distribute.Strategyとしています。これらの組み合わせの一部は現在サポートされており、その他は将来追加される予定です。これらの軸のいくつかは次のとおりです。

  • 同期トレーニングと非同期トレーニング:これらは、データの並列処理を使用してトレーニングを分散する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが独立して入力データをトレーニングし、変数を非同期に更新します。通常、同期トレーニングは、パラメータサーバーアーキテクチャを介したall-reduceおよびasyncを介してサポートされます。
  • ハードウェアプラットフォーム:トレーニングを1台のマシン上の複数のGPU、ネットワーク内の複数のマシン(それぞれ0個以上のGPU)、またはクラウドTPUに拡張することができます。

これらのユースケースをサポートするために、6つの戦略が利用可能です。次のセクションでは、TFのどのシナリオでこれらのどれがサポートされているかを説明します。概要は次のとおりです。

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras APIサポートされていますサポートされています実験的サポート実験的サポートサポートされている計画されたポスト2.3
カスタムトレーニングループサポートされていますサポートされています実験的サポート実験的サポートサポートされている計画されたポスト2.3
Estimator API限定サポートサポートされていません限定サポート限定サポート限定サポート

MirroredStrategy

tf.distribute.MirroredStrategyは、1台のマシン上の複数のGPUでの同期分散トレーニングをサポートします。 GPUデバイスごとに1つのレプリカを作成します。モデル内の各変数は、すべてのレプリカにわたってミラーリングされます。これらの変数が一緒になって、 MirroredVariableと呼ばれる単一の概念変数を形成します。これらの変数は、同一の更新を適用することによって互いに同期されます。

効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を通信します。 All-reduceは、テンソルを合計することですべてのデバイスに集約し、各デバイスで使用できるようにします。これは非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信のタイプに応じて、利用可能な多くのall-reduceアルゴリズムと実装があります。デフォルトでは、all-reduce実装としてNVIDIANCCLを使用します。他のいくつかのオプションから選択するか、独自に作成することができます。

MirroredStrategyを作成する最も簡単な方法は次のとおりです。

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

これにより、TensorFlowに表示されるすべてのGPUを使用するMirroredStrategyインスタンスが作成され、NCCLがクロスデバイス通信として使用されます。

マシンで一部のGPUのみを使用する場合は、次のように実行できます。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

クロスデバイス通信をオーバーライドする場合は、 tf.distribute.CrossDeviceOpsインスタンスを指定して、 cross_device_ops引数を使用してcross_device_opsできます。現在、 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDeviceは、デフォルトのtf.distribute.NcclAllReduce以外の2つのオプションです。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy使用すると、TensorFlowトレーニングをTensor Processing Unit(TPU)で実行できます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊なASICです。彼らは、Googleのコラボ、上で利用可能なTensorFlow研究クラウドクラウドTPU

分散トレーニングアーキテクチャに関しては、 TPUStrategyは同じMirroredStrategyあり、同期分散トレーニングを実装します。 TPUは、 TPUStrategy使用される複数のTPUコアにわたって、効率的なall-reduceおよびその他の集合操作の独自の実装を提供します。

TPUStrategyをインスタンス化する方法は次のTPUStrategyです。

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolverインスタンスは、TPUの検索に役立ちます。 Colabでは、引数を指定する必要はありません。

これをクラウドTPUに使用する場合:

  • tpu引数にTPUリソースの名前を指定する必要があります。
  • プログラムの開始時に、tpuシステムを明示的に初期化する必要があります。これは、TPUを計算に使用する前に必要です。 tpuシステムを初期化すると、TPUメモリも消去されるため、状態が失われないように、最初にこの手順を完了することが重要です。

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategyは、 MirroredStrategyと非常によく似ています。複数のワーカーに同期分散トレーニングを実装し、それぞれに複数のGPUが含まれる可能性があります。 MirroredStrategyと同様に、すべてのワーカーの各デバイスでモデル内のすべての変数のコピーを作成します。

CollectiveOpsを、変数の同期を維持するために使用されるマルチワーカーの全削減通信方式として使用します。コレクティブオペレーションは、ハードウェア、ネットワークトポロジ、テンソルサイズに応じて、TensorFlowランタイムでall-reduceアルゴリズムを自動的に選択できるTensorFlowグラフの単一のオペレーションです。

また、追加のパフォーマンス最適化を実装します。たとえば、小さなテンソルでの複数のすべての縮小を、大きなテンソルでのより少ないすべての縮小に変換する静的最適化が含まれています。さらに、プラグインアーキテクチャを持つように設計されているため、将来的には、ハードウェアに合わせて調整されたアルゴリズムをプラグインできるようになります。集合操作は、ブロードキャストやオールギャザーなどの他の集合操作も実装することに注意してください。

MultiWorkerMirroredStrategyを作成する最も簡単な方法はMultiWorkerMirroredStrategyです。

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy現在、集合操作の2つの異なる実装から選択できます。 CollectiveCommunication.RINGは、通信レイヤーとしてgRPCを使用してリングベースのコレクティブを実装します。 CollectiveCommunication.NCCLは、 NvidiaのNCCLを使用してコレクティブを実装します。 CollectiveCommunication.AUTOは、選択をランタイムに延期します。集合的な実装の最適な選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続によって異なります。次の方法で指定できます。

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

マルチGPUトレーニングと比較して、マルチワーカートレーニングを開始するための主な違いの1つは、マルチワーカーのセットアップです。 TF_CONFIG環境変数は、クラスターの一部である各ワーカーにクラスター構成を指定するためのTensorFlowの標準的な方法です。 TF_CONFIGの設定の詳細をご覧ください。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategyは、同期トレーニングも行います。変数はミラーリングされず、代わりにCPUに配置され、操作はすべてのローカルGPUに複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。

次の方法でCentralStorageStrategyインスタンスを作成します。

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

これにより、表示されているすべてのGPUとCPUを使用するCentralStorageStrategyインスタンスが作成されます。レプリカの変数への更新は、変数に適用される前に集約されます。

ParameterServerStrategy

TF1では、 ParameterServerStrategyは、 tf.compat.v1.distribute.experimental.ParameterServerStrategyシンボルを介したtf.compat.v1.distribute.experimental.ParameterServerStrategyのみ使用可能であり、TF2での使用法はtf.compat.v1.distribute.experimental.ParameterServerStrategyリリースで開発されています。この戦略では、一部のマシンはワーカーとして指定され、一部はパラメーターサーバーとして指定されます。モデルの各変数は、1つのパラメーターサーバーに配置されます。計算はすべてのワーカーに複製されます。

その他の戦略

上記の戦略に加えて、 tf.distribute使用する際のプロトタイピングとデバッグに役立つ可能性のある他の2つの戦略があります。

デフォルトの戦略

デフォルトの戦略は、明示的な配布戦略がスコープ内にない場合に存在する配布戦略です。これはtf.distribute.Strategyインターフェースを実装しますが、パススルーであり、実際の配布は提供しません。たとえば、 strategy.run(fn)は単にfn呼び出します。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作する必要があります。あなたはそれを「ノーオペレーション」戦略と考えることができます。

デフォルトの戦略はシングルトンであり、それ以上のインスタンスを作成することはできません。これは、明示的なストラテジーのスコープ外でtf.distribute.get_strategy()使用して取得できます(明示的なストラテジーのスコープ内で現在のストラテジーを取得するために使用できるのと同じAPI)。

default_strategy = tf.distribute.get_strategy()

この戦略は2つの主な目的を果たします。

  • これにより、ディストリビューション対応のライブラリコードを無条件に記述できます。たとえば、 tf.optimizerでは、 tf.optimizer tf.distribute.get_strategy()を使用して、その戦略を使用して勾配を減らすことができます。これにより、 tf.optimizerを呼び出すことができる戦略オブジェクトが常に返されます。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • ライブラリコードと同様に、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず動作するエンドユーザーのプログラムを作成するために使用できます。これを説明するサンプルコードスニペット:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategyは、すべての変数と計算を単一の指定されたデバイスに配置するための戦略です。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

この戦略は、いくつかの点でデフォルトの戦略とは異なります。デフォルトの戦略では、分散戦略なしでTensorFlowを実行する場合と比較した場合、変数配置ロジックは変更されません。ただし、 OneDeviceStrategyを使用する場合、そのスコープで作成されたすべての変数は、指定されたデバイスに明示的に配置されます。さらに、 OneDeviceStrategy.runを介してOneDeviceStrategy.run関数も、指定されたデバイスに配置されます。

この戦略を通じて配布された入力は、指定されたデバイスにプリフェッチされます。デフォルトの戦略では、入力の分散はありません。

デフォルトの戦略と同様に、この戦略は、実際に複数のデバイス/マシンに配布される他の戦略に切り替える前に、コードをテストするためにも使用できます。これにより、デフォルトの戦略よりもいくらか多くの配布戦略機構が実行されますが、 MirroredStrategyTPUStrategyなどを使用するほどではありません。戦略がないかのように動作するコードが必要な場合は、デフォルトの戦略を使用してください。

これまで、利用可能なさまざまな戦略と、それらをインスタンス化する方法を見てきました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法を示します。

tf.distribute.Strategytf.keras.Model.fitを使用する

tf.distribute.Strategyは、 tf.kerasによるKerasAPI仕様の実装であるtf.kerasに統合されていますtf.kerasは、モデルを構築およびトレーニングするための高レベルAPIです。 tf.kerasバックエンドに統合することで、 model.fitを使用してmodel.fitトレーニングフレームワークで記述されたトレーニングをシームレスに配布できるようにmodel.fit

コードで変更する必要があるものは次のとおりです。

  1. 適切なtf.distribute.Strategyインスタンスを作成します。
  2. Kerasモデル、オプティマイザー、メトリックの作成をstrategy.scope内に移動します。

シーケンシャル、機能、サブクラスのすべてのタイプのKerasモデルをサポートします。

これを行うためのコードスニペットは、1つの密なレイヤーを持つ非常に単純なKerasモデルです。

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

この例ではMirroredStrategy使用しているため、複数のGPUを搭載したマシンでこれを実行できます。 strategy.scope()は、トレーニングの配布に使用する戦略をKerasに示します。このスコープ内にモデル/オプティマイザー/メトリックを作成すると、通常の変数の代わりに分散変数を作成できます。これを設定すると、通常どおりにモデルを適合させることができます。 MirroredStrategyは、利用可能なGPUでのモデルのトレーニングの複製、勾配の集約などを処理します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 2.0391
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9013
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 0.5597

0.5597171783447266

ここで、 tf.data.Datasetはトレーニングと評価の入力を提供します。 numpy配列を使用することもできます。

import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.3984
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1761

<tensorflow.python.keras.callbacks.History at 0x7f674c585a90>

どちらの場合(データセットまたはnumpy)でも、指定された入力の各バッチは、複数のレプリカ間で均等に分割されます。たとえば、2つのGPUでMirroredStrategyを使用する場合、サイズ10の各バッチは2つのGPUに分割され、各ステップで5つの入力例を受け取ります。 GPUを追加すると、各エポックのトレーニングが速くなります。通常、追加の計算能力を効果的に利用するために、アクセラレータを追加するときにバッチサイズを増やす必要があります。モデルによっては、学習率を再調整する必要もあります。レプリカの数を取得するには、 strategy.num_replicas_in_syncを使用できます。

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras APIサポートされていますサポートされています実験的サポート実験的サポート2.3以降に計画されたサポート

例とチュートリアル

上記の統合をKerasでエンドツーエンドで説明するチュートリアルと例のリストを次に示します。

  1. MirroredStrategyを使用してMNISTをトレーニングするためのチュートリアル
  2. MultiWorkerMirroredStrategyを使用してMNISTをMultiWorkerMirroredStrategyするためのチュートリアル
  3. TPUStrategyを使用したTPUStrategyトレーニングに関するガイド
  4. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ

カスタムトレーニングループでのtf.distribute.Strategy使用

あなたが見てきたように、使用してtf.distribute.Strategy Kerasのでmodel.fitあなたのコードの唯一の数行を変更する必要があります。もう少し努力すれば、カスタムトレーニングループでtf.distribute.Strategyを使用することもできます。

EstimatorやKerasで可能なよりも柔軟性とトレーニングループの制御が必要な場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、ラウンドごとに異なる数のジェネレーターまたはディスクリミネーターのステップを実行することができます。同様に、高レベルのフレームワークは強化学習トレーニングにはあまり適していません。

tf.distribute.Strategyクラスは、カスタムトレーニングループをサポートするためのメソッドのコアセットを提供します。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるはずです。

ここでは、以前と同じKerasモデルを使用した簡単なトレーニング例のこのユースケースを示す簡単なスニペットを示します。

まず、戦略の範囲内でモデルとオプティマイザーを作成します。これにより、モデルとオプティマイザーで作成された変数がミラーリングされた変数になります。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

次に、入力データセットを作成し、 tf.distribute.Strategy.experimental_distribute_datasetを呼び出して、戦略に基づいてデータセットを配布します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

次に、トレーニングの1つのステップを定義します。 tf.GradientTapeを使用して勾配を計算し、オプティマイザーを使用してそれらの勾配を適用してモデルの変数を更新します。このトレーニングステップを配布するには、関数train_step入れて、前に作成したdist_datasetから取得したデータセット入力とともにtf.distrbute.Strategy.runに渡します。

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上記のコードで注意すべき他のいくつかのこと:

  1. tf.nn.compute_average_lossを使用して損失を計算しました。 tf.nn.compute_average_lossは、例ごとの損失を合計し、その合計をglobal_batch_sizeで除算します。後で各レプリカで勾配が計算された後、それらを合計することによってレプリカ全体で集計されるため、これは重要です。
  2. tf.distribute.Strategy.reduce APIを使用して、 tf.distribute.Strategy.reduceによって返された結果を集計しtf.distribute.Strategy.runtf.distribute.Strategy.runは、ストラテジー内の各ローカルレプリカから結果を返します。この結果を利用するには、複数の方法があります。それらをreduceて、集計値を取得できます。 tf.distribute.Strategy.experimental_local_resultsを実行して、結果に含まれる値のリストをローカルレプリカごとに1つずつ取得することもできます。
  3. apply_gradientsが配布戦略スコープ内で呼び出されると、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。

最後に、トレーニングステップを定義しdist_datasetdist_datasetを反復処理して、トレーニングをループで実行できます。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.80679005, shape=(), dtype=float32)
tf.Tensor(0.8006732, shape=(), dtype=float32)
tf.Tensor(0.7946168, shape=(), dtype=float32)
tf.Tensor(0.78862023, shape=(), dtype=float32)
tf.Tensor(0.782683, shape=(), dtype=float32)
tf.Tensor(0.77680445, shape=(), dtype=float32)
tf.Tensor(0.77098423, shape=(), dtype=float32)
tf.Tensor(0.7652217, shape=(), dtype=float32)
tf.Tensor(0.7595164, shape=(), dtype=float32)
tf.Tensor(0.75386775, shape=(), dtype=float32)
tf.Tensor(0.7482752, shape=(), dtype=float32)
tf.Tensor(0.74273825, shape=(), dtype=float32)
tf.Tensor(0.7372565, shape=(), dtype=float32)
tf.Tensor(0.7318293, shape=(), dtype=float32)
tf.Tensor(0.7264561, shape=(), dtype=float32)
tf.Tensor(0.7211364, shape=(), dtype=float32)
tf.Tensor(0.71586984, shape=(), dtype=float32)
tf.Tensor(0.7106557, shape=(), dtype=float32)
tf.Tensor(0.7054935, shape=(), dtype=float32)
tf.Tensor(0.7003829, shape=(), dtype=float32)

上記の例では、 dist_datasetを反復処理して、トレーニングに入力を提供しました。また、numpy入力をサポートするためにtf.distribute.Strategy.make_experimental_numpy_datasetも提供しています。このAPIを使用して、 tf.distribute.Strategy.experimental_distribute_dataset呼び出す前にデータセットを作成できます。

データを反復処理するもう1つの方法は、反復子を明示的に使用することです。データセット全体を反復処理するのではなく、特定のステップ数で実行する場合に、これを実行することをお勧めします。反復上記今最初のイテレータを作成するように変更され、その後、明示的に呼んでnextの入力データを取得することに。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.69532317, shape=(), dtype=float32)
tf.Tensor(0.690314, shape=(), dtype=float32)
tf.Tensor(0.68535477, shape=(), dtype=float32)
tf.Tensor(0.680445, shape=(), dtype=float32)
tf.Tensor(0.67558426, shape=(), dtype=float32)
tf.Tensor(0.67077184, shape=(), dtype=float32)
tf.Tensor(0.66600746, shape=(), dtype=float32)
tf.Tensor(0.66129065, shape=(), dtype=float32)
tf.Tensor(0.6566208, shape=(), dtype=float32)
tf.Tensor(0.6519975, shape=(), dtype=float32)

これは、 tf.distribute.Strategy使用してカスタムトレーニングループを配布する最も単純なケースをカバーしています。

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
カスタムトレーニングループサポートされていますサポートされています実験的サポート実験的サポート2.3以降に計画されたサポート

例とチュートリアル

カスタムトレーニングループで配布戦略を使用する例を次に示します。

  1. MirroredStrategyを使用してMNISTをトレーニングするためのチュートリアル
  2. TPUStrategyを使用したTPUStrategyトレーニングに関するガイド
  3. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ

その他のトピック

このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。

TF_CONFIG環境変数の設定

マルチワーカートレーニングの場合、前述のように、クラスターで実行されているバイナリごとにTF_CONFIG環境変数を設定する必要があります。 TF_CONFIG環境変数は、クラスターを構成するタスク、それらのアドレス、およびクラスター内の各タスクの役割を指定するJSON文字列です。 tensorflow / ecosystemリポジトリは、トレーニングタスクのTF_CONFIGを設定するKubernetesテンプレートを提供します。

TF_CONFIGには、クラスターとタスクの2つのコンポーネントがあります。 clusterは、workerなどのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。マルチワーカートレーニングでは、通常のワーカーが行うことに加えて、チェックポイントの保存やTensorBoardの要約ファイルの作成などのもう少し責任を負うワーカーが通常1人います。このようなワーカーは「チーフ」ワーカーと呼ばれ、インデックス0のワーカーがチーフワーカーとして任命されるのが通例です(実際、これがtf.distribute.Strategyの実装方法です)。一方、タスクは現在のタスクの情報を提供します。最初のコンポーネントクラスターはすべてのワーカーで同じであり、2番目のコンポーネントタスクはワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。

TF_CONFIG一例はTF_CONFIGです。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

このTF_CONFIGは、ホストとポートとともに、クラスター内に3つのワーカーと2つのpsタスクがあることを指定します。 「タスク」の部分は、クラスター内の現在のタスクの役割であるワーカー1(2番目のワーカー)を指定します。クラスター内の有効な役割は、「チーフ」、「ワーカー」、「ps」、および「エバリュエーター」です。 tf.distribute.experimental.ParameterServerStrategyを使用する場合を除いて、「ps」ジョブはありません。

次は何ですか?

tf.distribute.Strategyは積極的に開発中です。それを試して、 GitHubの問題を使用しフィードバックを提供してください。