TensorFlowを使用した分散トレーニング

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

TensorFlow.orgで表示GoogleColabで実行GitHubでソースを表示 ノートブックをダウンロード

概要

tf.distribute.Strategyは、トレーニングを複数のGPU、複数のマシン、またはTPUに分散するためのTensorFlowAPIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。

tf.distribute.Strategyは、次の主要な目標を念頭に置いて設計されています。

  • 使いやすく、研究者、機械学習エンジニアなどを含む複数のユーザーセグメントをサポートします。
  • 箱から出してすぐに優れたパフォーマンスを提供します。
  • 戦略を簡単に切り替えることができます。

Keras Model.fitなどの高レベルAPIでtf.distribute.Strategyを使用してトレーニングを配布したり、カスタムトレーニングループ(および一般的にはTensorFlowを使用した計算)を配布したりできます。

TensorFlow 2.xでは、プログラムを熱心に実行することも、 tf.functionを使用してグラフで実行することもできます。 tf.distribute.Strategyは、これら両方の実行モードをサポートすることを目的としていますが、 tf.functionで最適に機能します。熱心なモードはデバッグ目的でのみ推奨され、 tf.distribute.TPUStrategyではサポートされていません。このガイドの焦点はトレーニングですが、このAPIは、さまざまなプラットフォームで評価と予測を配布するためにも使用できます。

TensorFlowの基盤となるコンポーネントが戦略対応になるように変更されているため、コードをほとんど変更せずにtf.distribute.Strategyを使用できます。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。

このガイドでは、さまざまなタイプの戦略と、さまざまな状況でそれらを使用する方法について学習します。パフォーマンスの問題をデバッグする方法については、 Optimize TensorFlowGPUパフォーマンスガイドをご覧ください。

TensorFlowを設定する

import tensorflow as tf

戦略の種類

tf.distribute.Strategyは、さまざまな軸に沿った多くのユースケースをカバーすることを目的としています。これらの組み合わせの一部は現在サポートされており、その他は将来追加される予定です。これらの軸のいくつかは次のとおりです。

  • 同期トレーニングと非同期トレーニング:これらは、データ並列処理を使用してトレーニングを分散する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが独立して入力データをトレーニングし、変数を非同期で更新します。通常、同期トレーニングは、パラメーターサーバーアーキテクチャを介したall-reduceおよびasyncを介してサポートされます。
  • ハードウェアプラットフォーム:トレーニングを1台のマシン上の複数のGPU、ネットワーク内の複数のマシン(それぞれ、0個以上のGPU)、またはクラウドTPUに拡張することができます。

これらのユースケースをサポートするために、TensorFlowにはMirroredStrategyTPUStrategyMultiWorkerMirroredStrategyParameterServerStrategyCentralStorageStrategy 、およびその他の利用可能な戦略があります。次のセクションでは、TensorFlowのどのシナリオでこれらのどれがサポートされているかを説明します。ここに簡単な概要があります:

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras Model.fitサポートされていますサポートされていますサポートされています実験的サポート実験的サポート
カスタムトレーニングループサポートされていますサポートされていますサポートされています実験的サポート実験的サポート
Estimator API限定サポートサポートされていません限定サポート限定サポート限定サポート

MirroredStrategy

tf.distribute.MirroredStrategyは、1台のマシン上の複数のGPUでの同期分散トレーニングをサポートします。 GPUデバイスごとに1つのレプリカを作成します。モデル内の各変数は、すべてのレプリカにミラーリングされます。これらの変数が一緒になって、 MirroredVariableと呼ばれる単一の概念変数を形成します。これらの変数は、同一の更新を適用することにより、相互に同期されます。

効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を通信します。 All-reduceは、テンソルを合計することですべてのデバイスに集約し、各デバイスで使用できるようにします。これは、非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信のタイプに応じて、利用可能な多くのall-reduceアルゴリズムと実装があります。デフォルトでは、All-Reduceの実装としてNVIDIA Collective Communication Library( NCCL )を使用します。他のいくつかのオプションから選択するか、独自に作成することができます。

MirroredStrategyを作成する最も簡単な方法は次のとおりです。

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

これにより、 MirroredStrategyインスタンスが作成され、TensorFlowに表示されるすべてのGPUとNCCLがクロスデバイス通信として使用されます。

マシンで一部のGPUのみを使用する場合は、次のように実行できます。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

クロスデバイス通信をオーバーライドする場合は、 tf.distribute.CrossDeviceOpsのインスタンスを指定して、 cross_device_ops引数を使用してオーバーライドできます。現在、 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDeviceは、デフォルトであるtf.distribute.NcclAllReduce以外の2つのオプションです。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategyを使用すると、TensorFlowトレーニングをTensor Processing Unit(TPU)で実行できます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊なASICです。これらは、 Google ColabTPU Research Cloud 、およびCloudTPUで利用できます。

分散トレーニングアーキテクチャに関しては、 TPUStrategyは同じMirroredStrategyであり、同期分散トレーニングを実装します。 TPUは、 TPUStrategyで使用される複数のTPUコアにわたって、効率的なall-reduceおよびその他の集合的な操作の独自の実装を提供します。

TPUStrategyをインスタンス化する方法は次のとおりです。

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolverインスタンスは、TPUの検索に役立ちます。 Colabでは、引数を指定する必要はありません。

これをクラウドTPUに使用する場合:

  • tpu引数でTPUリソースの名前を指定する必要があります。
  • プログラムの開始時に、TPUシステムを明示的に初期化する必要があります。これは、TPUを計算に使用する前に必要です。 TPUシステムを初期化すると、TPUメモリも消去されるため、状態が失われないように、最初にこの手順を完了することが重要です。

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategyは、 MirroredStrategyと非常によく似ています。複数のワーカーに同期分散トレーニングを実装し、それぞれに複数のGPUが含まれる可能性があります。 tf.distribute.MirroredStrategyと同様に、すべてのワーカーの各デバイスでモデル内のすべての変数のコピーを作成します。

MultiWorkerMirroredStrategyを作成する最も簡単な方法は次のとおりです。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategyには、クロスデバイス通信用の2つの実装があります。 CommunicationImplementation.RINGRPCベースであり、CPUとGPUの両方をサポートします。 CommunicationImplementation.NCCLはNCCLを使用し、GPUで最先端のパフォーマンスを提供しますが、CPUをサポートしていません。 CollectiveCommunication.AUTOは、選択をTensorflowに延期します。次の方法で指定できます。

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

マルチGPUトレーニングと比較して、マルチワーカートレーニングを開始するための重要な違いの1つは、マルチワーカーのセットアップです。 'TF_CONFIG'環境変数は、クラスターの一部である各ワーカーにクラスター構成を指定するTensorFlowの標準的な方法です。詳細については、このドキュメントの「 TF_CONFIGの設定」セクションを参照してください。

MultiWorkerMirroredStrategyの詳細については、次のチュートリアルを検討してください。

ParameterServerStrategy

パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。詳細については、パラメータサーバーのトレーニングチュートリアルをご覧ください。

TensorFlow 2では、パラメーターサーバートレーニングは、 tf.distribute.experimental.coordinator.ClusterCoordinatorクラスを介して中央コーディネーターベースのアーキテクチャを使用します。

この実装では、 workerおよびparameter serverタスクは、コーディネーターからのタスクをリッスンするtf.distribute.Serverを実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。

コーディネーターで実行されているプログラミングでは、 ParameterServerStrategyオブジェクトを使用してトレーニングステップを定義し、 ClusterCoordinatorを使用してトレーニングステップをリモートワーカーにディスパッチします。それらを作成する最も簡単な方法は次のとおりです。

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

ParameterServerStrategyの詳細については、KerasModel.fitを使用したパラメーターサーバーのトレーニングとカスタムトレーニングループのチュートリアルをご覧ください。

TensorFlow 1では、 ParameterServerStrategyは、 tf.compat.v1.distribute.experimental.ParameterServerStrategyシンボルを介したEstimatorでのみ使用できます。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategyは、同期トレーニングも行います。変数はミラーリングされません。代わりに、変数はCPUに配置され、操作はすべてのローカルGPUに複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。

次の方法でCentralStorageStrategyのインスタンスを作成します。

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

これにより、表示されているすべてのGPUとCPUを使用するCentralStorageStrategyインスタンスが作成されます。レプリカの変数への更新は、変数に適用される前に集約されます。

その他の戦略

上記の戦略に加えて、 tf.distributeを使用する際のプロトタイピングとデバッグに役立つ可能性のある他の2つの戦略があります。

デフォルト戦略

デフォルト戦略は、明示的な配布戦略が範囲内にない場合に存在する配布戦略です。これはtf.distribute.Strategyインターフェースを実装しますが、パススルーであり、実際の配布を提供しません。たとえば、 Strategy.run(fn)は単にfnを呼び出します。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作する必要があります。あなたはそれを「ノーオペレーション」戦略と考えることができます。

デフォルト戦略はシングルトンであり、それ以上のインスタンスを作成することはできません。これは、明示的な戦略のスコープ外でtf.distribute.get_strategyを使用して取得できます(明示的な戦略のスコープ内で現在の戦略を取得するために使用できるのと同じAPI)。

default_strategy = tf.distribute.get_strategy()

この戦略は2つの主な目的を果たします。

  • これにより、配布対応のライブラリコードを無条件に記述できます。たとえば、 tf.optimizerでは、 tf.distribute.get_strategyを使用して、その戦略を使用して勾配を減らすことができます。これにより、 Strategy.reduceを呼び出すことができる戦略オブジェクトが常に返されます。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
プレースホルダー18
  • ライブラリコードと同様に、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず動作するエンドユーザーのプログラムを作成するために使用できます。これを示すサンプルコードスニペットは次のとおりです。
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategyは、すべての変数と計算を単一の指定されたデバイスに配置するための戦略です。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

この戦略は、いくつかの点でデフォルト戦略とは異なります。デフォルト戦略では、分散戦略なしでTensorFlowを実行する場合と比較した場合、変数配置ロジックは変更されません。ただし、 OneDeviceStrategyを使用する場合、そのスコープで作成されたすべての変数は、指定されたデバイスに明示的に配置されます。さらに、 OneDeviceStrategy.runを介して呼び出された関数も、指定されたデバイスに配置されます。

この戦略を通じて分散された入力は、指定されたデバイスにプリフェッチされます。デフォルト戦略では、入力分布はありません。

デフォルト戦略と同様に、この戦略は、実際に複数のデバイス/マシンに配布される他の戦略に切り替える前に、コードをテストするためにも使用できます。これにより、デフォルト戦略よりもいくらか多くの配布戦略機構が実行されますが、たとえば、 MirroredStrategyTPUStrategyを完全に使用できるわけではありません。戦略がないかのように動作するコードが必要な場合は、デフォルト戦略を使用します。

これまで、さまざまな戦略と、それらをインスタンス化する方法について学習しました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法を示します。

KerasModel.fitでtf.distribute.Strategyを使用する

tf.distribute.Strategyは、TensorFlowによるKerasAPI仕様の実装であるtf.kerasに統合されています。 tf.kerasは、モデルを構築およびトレーニングするための高レベルAPIです。 tf.kerasバックエンドに統合することで、 Model.fitを使用してKerasトレーニングフレームワークで記述されたトレーニングをシームレスに配布できます。

コードで変更する必要があるものは次のとおりです。

  1. 適切なtf.distribute.Strategyのインスタンスを作成します。
  2. Kerasモデル、オプティマイザー、メトリクスの作成をstrategy.scope内に移動します。

TensorFlow分散戦略は、すべてのタイプのKerasモデル(シーケンシャルファンクショナルサブクラス)をサポートします。

これを行うためのコードスニペットは、1つのDenseレイヤーを持つ非常に単純なKerasモデルです。

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

この例ではMirroredStrategyを使用しているため、複数のGPUを搭載したマシンでこれを実行できます。 strategy.scope()は、トレーニングの配布に使用する戦略をKerasに示します。このスコープ内にモデル/オプティマイザー/メトリクスを作成すると、通常の変数の代わりに分散変数を作成できます。これを設定すると、通常どおりにモデルを適合させることができます。 MirroredStrategyは、利用可能なGPUでのモデルのトレーニングの複製、グラデーションの集計などを処理します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 2.2552
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9968
2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.6190
0.6190494298934937

ここで、 tf.data.Datasetはトレーニングと評価の入力を提供します。 NumPy配列を使用することもできます。

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 0.4406
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1947
<keras.callbacks.History at 0x7fb81813d2d0>
プレースホルダー27

どちらの場合も( DatasetまたはNumPyを使用)、指定された入力の各バッチは複数のレプリカ間で均等に分割されます。たとえば、2つのGPUでMirroredStrategyを使用している場合、サイズ10の各バッチは2つのGPUに分割され、各ステップで5つの入力例を受け取ります。 GPUを追加すると、各エポックのトレーニングが速くなります。通常、追加の計算能力を有効に活用するために、アクセラレータを追加するときにバッチサイズを増やす必要があります。モデルによっては、学習率を再調整する必要もあります。 strategy.num_replicas_in_syncを使用して、レプリカの数を取得できます。

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras Model.fitサポートされていますサポートされていますサポートされています実験的サポート実験的サポート

例とチュートリアル

上記のModel.fitとのエンドツーエンドの統合を説明するチュートリアルと例のリストを次に示します。

  1. チュートリアルModel.fitMirroredStrategyを使用したトレーニング。
  2. チュートリアルModel.fitMultiWorkerMirroredStrategyを使用したトレーニング。
  3. ガイドModel.fitおよびTPUStrategyの使用例が含まれています。
  4. チュートリアルModel.fitおよびParameterServerStrategyを使用したパラメーターサーバーのトレーニング。
  5. チュートリアルModel.fitTPUStrategyを使用してGLUEベンチマークから多くのタスクのBERTを微調整します。
  6. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ

カスタムトレーニングループでtf.distribute.Strategyを使用する

上で示したように、Keras Model.fittf.distribute.Strategyを使用するには、コードの数行を変更するだけで済みます。もう少し努力すれば、カスタムトレーニングループでtf.distribute.Strategyを使用することもできます。

EstimatorやKerasよりも柔軟性とトレーニングループの制御が必要な場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、ラウンドごとに異なる数のジェネレーターまたはディスクリミネーターのステップを実行することができます。同様に、高レベルのフレームワークは強化学習トレーニングにはあまり適していません。

tf.distribute.Strategyクラスは、カスタムトレーニングループをサポートするためのメソッドのコアセットを提供します。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるようになります。

以下は、以前と同じKerasモデルを使用した簡単なトレーニング例のこのユースケースを示す簡単なスニペットです。

まず、戦略の範囲内でモデルとオプティマイザーを作成します。これにより、モデルとオプティマイザーで作成された変数がミラーリングされた変数になります。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

次に、入力データセットを作成し、 tf.distribute.Strategy.experimental_distribute_datasetを呼び出して、戦略に基づいてデータセットを配布します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

次に、トレーニングの1つのステップを定義します。 tf.GradientTapeを使用して勾配を計算し、オプティマイザーを使用してそれらの勾配を適用してモデルの変数を更新します。このトレーニングステップを配布するには、関数train_stepに入れて、前に作成したdist_datasetから取得したデータセット入力とともにtf.distribute.Strategy.runに渡します。

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上記のコードで注意すべき他のいくつかのこと:

  1. tf.nn.compute_average_lossを使用して損失を計算しました。 tf.nn.compute_average_lossは、例ごとの損失を合計し、その合計をglobal_batch_sizeで除算します。後で各レプリカで勾配が計算された後、それらを合計することによってレプリカ全体で集約されるため、これは重要です。
  2. また、 tf.distribute.Strategy.reduce APIを使用して、 tf.distribute.Strategy.runによって返された結果を集計しました。 tf.distribute.Strategy.runは、ストラテジー内の各ローカルレプリカから結果を返します。この結果を利用するには、複数の方法があります。それらをreduceて、集計値を取得できます。 tf.distribute.Strategy.experimental_local_resultsを実行して、結果に含まれる値のリストをローカルレプリカごとに1つずつ取得することもできます。
  3. 配布戦略スコープ内でapply_gradientsを呼び出すと、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。

最後に、トレーニングステップを定義したら、 dist_datasetを反復処理して、トレーニングをループで実行できます。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32)
tf.Tensor(0.18628375, shape=(), dtype=float32)
tf.Tensor(0.18570684, shape=(), dtype=float32)
tf.Tensor(0.18513316, shape=(), dtype=float32)
tf.Tensor(0.1845627, shape=(), dtype=float32)
tf.Tensor(0.18399543, shape=(), dtype=float32)
tf.Tensor(0.18343134, shape=(), dtype=float32)
tf.Tensor(0.18287037, shape=(), dtype=float32)
tf.Tensor(0.18231256, shape=(), dtype=float32)
tf.Tensor(0.18175781, shape=(), dtype=float32)
tf.Tensor(0.18120615, shape=(), dtype=float32)
tf.Tensor(0.18065754, shape=(), dtype=float32)
tf.Tensor(0.18011193, shape=(), dtype=float32)
tf.Tensor(0.17956935, shape=(), dtype=float32)
tf.Tensor(0.17902976, shape=(), dtype=float32)
tf.Tensor(0.17849308, shape=(), dtype=float32)
tf.Tensor(0.17795937, shape=(), dtype=float32)
tf.Tensor(0.17742859, shape=(), dtype=float32)
tf.Tensor(0.17690066, shape=(), dtype=float32)
tf.Tensor(0.17637561, shape=(), dtype=float32)

上記の例では、 dist_datasetを繰り返し処理して、トレーニングに入力を提供しました。 NumPy入力をサポートするためのtf.distribute.Strategy.make_experimental_numpy_datasetも提供されます。このAPIを使用して、 tf.distribute.Strategy.experimental_distribute_datasetを呼び出す前にデータセットを作成できます。

データを反復処理するもう1つの方法は、反復子を明示的に使用することです。データセット全体を反復処理するのではなく、特定のステップ数で実行する場合に、これを実行することをお勧めします。上記の反復は、最初にイテレータを作成し、 nextそれを明示的に呼び出して入力データを取得するように変更されます。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32)
tf.Tensor(0.17533402, shape=(), dtype=float32)
tf.Tensor(0.17481743, shape=(), dtype=float32)
tf.Tensor(0.17430364, shape=(), dtype=float32)
tf.Tensor(0.17379259, shape=(), dtype=float32)
tf.Tensor(0.17328428, shape=(), dtype=float32)
tf.Tensor(0.17277871, shape=(), dtype=float32)
tf.Tensor(0.17227581, shape=(), dtype=float32)
tf.Tensor(0.17177561, shape=(), dtype=float32)
tf.Tensor(0.17127804, shape=(), dtype=float32)
プレースホルダー36

これは、 tf.distribute.Strategyを使用してカスタムトレーニングループを配布する最も単純なケースをカバーしています。

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
カスタムトレーニングループサポートされていますサポートされていますサポートされています実験的サポート実験的サポート

例とチュートリアル

カスタムトレーニングループで配布戦略を使用する例を次に示します。

  1. チュートリアル:カスタムトレーニングループとMirroredStrategyを使用したトレーニング。
  2. チュートリアル:カスタムトレーニングループとMultiWorkerMirroredStrategyを使用したトレーニング。
  3. ガイドTPUStrategyを使用したカスタムトレーニングループの例が含まれています。
  4. チュートリアル:カスタムトレーニングループとParameterServerStrategyを使用したパラメーターサーバートレーニング。
  5. さまざまな戦略を使用して実装された最先端のモデルのコレクションを含むTensorFlowModelGardenリポジトリ

その他のトピック

このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。

TF_CONFIG環境変数の設定

マルチワーカートレーニングの場合、前述のように、クラスターで実行されているバイナリごとに'TF_CONFIG'環境変数を設定する必要があります。 'TF_CONFIG'環境変数は、クラスターを構成するタスク、それらのアドレス、およびクラスター内の各タスクの役割を指定するJSON文字列です。 tensorflow/ecosystemリポジトリは、トレーニングタスク用に'TF_CONFIG'を設定するKubernetesテンプレートを提供します。

'TF_CONFIG'には、クラスターとタスクの2つのコンポーネントがあります。

  • クラスターは、トレーニングクラスターに関する情報を提供します。これは、ワーカーなどのさまざまなタイプのジョブで構成されるdictです。マルチワーカートレーニングでは、通常のワーカーが行うことに加えて、チェックポイントの保存やTensorBoardのサマリーファイルの作成など、もう少し責任を負うワーカーが1人います。このようなワーカーは「チーフ」ワーカーと呼ばれ、インデックス0のワーカーがチーフワーカーとして任命されるのが通例です(実際、これがtf.distribute.Strategyの実装方法です)。
  • 一方、タスクは現在のタスクに関する情報を提供します。最初のコンポーネントクラスターはすべてのワーカーで同じであり、2番目のコンポーネントタスクはワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。

'TF_CONFIG'一例は次のとおりです。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

この'TF_CONFIG'は、ホストとポートとともに、 "cluster"に3つのワーカーと2つの"ps"タスクがあることを指定します。 "task"の部分は、 "cluster"の現在のタスクの役割を指定します—ワーカー1 (2番目のワーカー)。クラスタ内の有効なロールは、 "chief""worker""ps" 、および"evaluator"です。 tf.distribute.experimental.ParameterServerStrategyを使用する場合を除いて、 "ps"ジョブはありません。

次は何ですか?

tf.distribute.Strategyは積極的に開発中です。試してみて、 GitHubの問題を使用してフィードバックを提供してください。