TensorFlowを使用した分散トレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示 ノートブックをダウンロード

概要

tf.distribute.Strategy複数のGPU間のトレーニング、複数のマシン、またはのTPUを配布するTensorFlow APIです。このAPIを使用すると、最小限のコード変更で既存のモデルとトレーニングコードを配布できます。

tf.distribute.Strategy念頭に置いて、これらの重要な目標で設計されています:

  • 使いやすく、研究者、機械学習エンジニアなどを含む複数のユーザーセグメントをサポートします。
  • 箱から出してすぐに優れたパフォーマンスを提供します。
  • 戦略を簡単に切り替えることができます。

tf.distribute.Strategyのような高レベルAPIで使用することができKeras 、またカスタムトレーニングループ(及び、TensorFlowを使用して、一般的に、任意の計算)を配布するために使用することができます。

TensorFlow 2.xでは、あなたは熱心にあなたのプログラムを実行する、または使用してグラフのできるtf.functiontf.distribute.Strategy 、実行のこれらのモードの両方をサポートしていきますが、に最適ですtf.function 。イーガーモードのみデバッグ目的のために推奨されており、サポートされていませんtf.distribute.TPUStrategy 。このガイドの焦点はトレーニングですが、このAPIは、さまざまなプラットフォームで評価と予測を配布するためにも使用できます。

あなたは使用することができますtf.distribute.Strategy TensorFlowの基礎となるコンポーネントは戦略-自覚するように変更されているので、あなたのコードに非常にいくつかの変更で。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。

このガイドでは、さまざまなタイプの戦略と、さまざまな状況でそれらを使用する方法について学習します。パフォーマンスの問題をデバッグする方法については、ご覧の最適化TensorFlow GPUのパフォーマンスのガイドを。

# Import TensorFlow
import tensorflow as tf

戦略の種類

tf.distribute.Strategy異なる軸に沿ってユースケースの数をカバーすることを意図しています。これらの組み合わせの一部は現在サポートされており、その他は将来追加される予定です。これらの軸のいくつかは次のとおりです。

  • 非同期トレーニング対同期:これらは、データ並列とトレーニングを配布する2つの一般的な方法です。同期トレーニングでは、すべてのワーカーが入力データのさまざまなスライスを同期してトレーニングし、各ステップで勾配を集計します。非同期トレーニングでは、すべてのワーカーが独立して入力データをトレーニングし、変数を非同期に更新します。通常、同期トレーニングは、パラメータサーバーアーキテクチャを介したall-reduceおよびasyncを介してサポートされます。
  • ハードウェアプラットフォーム:あなたが(0以上のGPUそれぞれに)ネットワーク内、またはクラウドのTPUに一台のマシン、または複数のマシン上で複数のGPU上であなたのトレーニングを拡張することもできます。

これらのユースケースをサポートするために、6つの戦略が利用可能です。次のセクションでは、TensorFlowのどのシナリオでこれらのどれがサポートされているかを説明します。概要は次のとおりです。

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras APIサポートされていますサポートされていますサポートされています実験的サポート実験的サポート
カスタムトレーニングループサポートされていますサポートされていますサポートされています実験的サポート実験的サポート
Estimator API限定サポートサポートされていません限定サポート限定サポート限定サポート

MirroredStrategy

tf.distribute.MirroredStrategy一台のマシン上で複数のGPU上での同期分散型トレーニングをサポートしています。 GPUデバイスごとに1つのレプリカを作成します。モデル内の各変数は、すべてのレプリカ間でミラーリングされます。一緒に、これらの変数が呼ばれる単一の概念の変数を形成MirroredVariable 。これらの変数は、同一の更新を適用することにより、相互に同期されます。

効率的なall-reduceアルゴリズムを使用して、デバイス間で変数の更新を通信します。 All-reduceは、テンソルを合計することですべてのデバイスに集約し、各デバイスで使用できるようにします。これは非常に効率的で、同期のオーバーヘッドを大幅に削減できる融合アルゴリズムです。デバイス間で利用可能な通信のタイプに応じて、利用可能な多くのall-reduceアルゴリズムと実装があります。デフォルトでは、NVIDIA集合通信ライブラリ(使用NCCLをすべて軽減実装として)。他のいくつかのオプションから選択するか、独自に作成することができます。

ここで作成するための最も簡単な方法ですMirroredStrategy

mirrored_strategy = tf.distribute.MirroredStrategy()
2021-08-21 01:24:44.677825: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.686081: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.687041: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.689423: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-21 01:24:44.690022: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.690987: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.691896: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.284404: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.285446: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.286341: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.287150: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 14648 MB memory:  -> device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

これは、作成されMirroredStrategy TensorFlowに表示されている全てのGPUを使用するインスタンスを、そしてNCCL-ASクロスデバイス通信。

マシンで一部のGPUのみを使用する場合は、次のように実行できます。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

あなたは、クロスデバイス通信を無効にしたい場合は、使用して行うことができcross_device_opsのインスタンス供給することにより、引数をtf.distribute.CrossDeviceOps 。現在、 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDeviceより他の2つのオプションですtf.distribute.NcclAllReduceデフォルトです。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategyあなたがテンソル処理ユニット(TPU類)のごTensorFlowのトレーニングを実行することができます。 TPUは、機械学習のワークロードを劇的に加速するように設計されたGoogleの特殊なASICです。彼らは上で利用可能なGoogleのコラボTPU研究クラウド、およびクラウドTPU

分散訓練アーキテクチャの観点から、 TPUStrategy同じであるMirroredStrategy -it実装同期分散訓練。 TPUはに使用される複数のTPUコアにわたって全減らす効率的かつ他の集合動作の独自の実装を提供TPUStrategy

ここでは、インスタンス化する方法をあるTPUStrategy

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolverインスタンスは、のTPUを見つけることができます。 Colabでは、引数を指定する必要はありません。

これをクラウドTPUに使用する場合:

  • あなたには、あなたのTPUリソースの名前を指定する必要がありtpu引数。
  • あなたは、プログラムの開始時に明示的にTPUシステムを初期化する必要があります。これは、TPUを計算に使用する前に必要です。 tpuシステムを初期化すると、TPUメモリも消去されるため、状態が失われないように、最初にこの手順を完了することが重要です。

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy非常によく似ているMirroredStrategy 。それぞれが潜在的に複数のGPUを備えた複数のワーカーにまたがる同期分散トレーニングを実装します。同様にtf.distribute.MirroredStrategy 、それはすべての労働者全体で、各デバイスのモデル内のすべての変数のコピーを作成します。

ここで作成するための最も簡単な方法ですMultiWorkerMirroredStrategy

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategyクロスデバイス通信のための2つの実装を持っています。 CommunicationImplementation.RINGあるRPCをベースとCPUとGPUの両方をサポートしています。 CommunicationImplementation.NCCL NCCLを使用し、GPU上での最先端のパフォーマンスを提供しますが、それがCPUをサポートしていません。 CollectiveCommunication.AUTO Tensorflowに選択を延期します。次の方法で指定できます。

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

マルチGPUトレーニングと比較して、マルチワーカートレーニングを開始するための重要な違いの1つは、マルチワーカーのセットアップです。 TF_CONFIG環境変数は、クラスタの一部である各ワーカーにクラスタ構成を指定するTensorFlowにおける標準的な方法です。詳細情報の設定TF_CONFIG

ParameterServerStrategy

パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。チェックアウトパラメータサーバーのトレーニングの詳細についてのチュートリアルを。

TensorFlow 2では、パラメータサーバトレーニングを介して中央コーディネータベースのアーキテクチャを使用tf.distribute.experimental.coordinator.ClusterCoordinatorクラス。

この実装では、 workerparameter serverタスクが実行tf.distribute.Serverコーディネーターからのタスクのために聞くのを。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。

コーディネーター上で実行中のプログラムでは、あなたが使用するParameterServerStrategyトレーニングステップを定義し、使用するオブジェクトをClusterCoordinatorリモートワーカーへの派遣研修ステップに。それらを作成する最も簡単な方法は次のとおりです。

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

TensorFlow 1において、 ParameterServerStrategyのみ推定経由で利用可能であるtf.compat.v1.distribute.experimental.ParameterServerStrategyシンボル。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy同様に、同期の訓練を行います。変数はミラーリングされず、代わりにCPUに配置され、操作はすべてのローカルGPUに複製されます。 GPUが1つしかない場合、すべての変数と操作はそのGPUに配置されます。

インスタンスを作成CentralStorageStrategyによって:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

これが作成されますCentralStorageStrategyすべての可視のGPUとCPUを使用するインスタンスを。レプリカの変数への更新は、変数に適用される前に集約されます。

その他の戦略

上記の戦略に加えて、使用している場合プロトタイピングとデバッグのために役に立つかもしれない他の二つの戦略があるtf.distribute APIが。

デフォルトの戦略

デフォルト戦略は、明示的な配布戦略がスコープ内にない場合に存在する配布戦略です。これは実装tf.distribute.Strategyインタフェースをなくパススルーであり、実際の分布を提供しません。例えば、 strategy.run(fn)単に呼び出しますfn 。この戦略を使用して記述されたコードは、戦略なしで記述されたコードとまったく同じように動作する必要があります。あなたはそれを「ノーオペレーション」戦略と考えることができます。

デフォルトの戦略はシングルトンであり、それ以上のインスタンスを作成することはできません。これは、使用して得ることができるtf.distribute.get_strategy明示的な戦略の範囲外の(明示的な戦略の範囲内の現在の戦略を取得するために使用することができるのと同じAPIを)。

default_strategy = tf.distribute.get_strategy()

この戦略は2つの主な目的を果たします。

  • これにより、配布対応のライブラリコードを無条件に記述できます。例えば、中tf.optimizerあなたが使用することができますよtf.distribute.get_strategy還元するためにその戦略を使用して勾配を、それは常にあなたが呼び出すことができますどの戦略オブジェクトを返しますStrategy.reduce APIを。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • ライブラリコードと同様に、条件付きロジックを必要とせずに、配布戦略の有無にかかわらず動作するエンドユーザーのプログラムを作成するために使用できます。これを示すサンプルコードスニペットは次のとおりです。
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy指定された単一のデバイス上のすべての変数と計算を配置するための戦略です。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

この戦略は、いくつかの点でデフォルト戦略とは異なります。デフォルトストラテジーでは、分散ストラテジーなしでTensorFlowを実行する場合と比較して、変数配置ロジックは変更されません。使用している場合でもOneDeviceStrategy 、その範囲で作成されたすべての変数を明示的に指定されたデバイス上に置かれています。また、経由で呼び出される関数OneDeviceStrategy.runまた、指定されたデバイス上に配置されます。

この戦略を通じて配布された入力は、指定されたデバイスにプリフェッチされます。デフォルト戦略では、入力分布はありません。

デフォルト戦略と同様に、この戦略は、実際に複数のデバイス/マシンに配布される他の戦略に切り替える前に、コードをテストするためにも使用できます。これは、多少の戦略ではなく、例えば、使用の全範囲にデフォルトよりも流通戦略の機械を行使しますMirroredStrategyまたはTPUStrategy 。ストラテジーがないかのように動作するコードが必要な場合は、デフォルトストラテジーを使用してください。

これまで、利用可能なさまざまな戦略と、それらをインスタンス化する方法を見てきました。次のいくつかのセクションでは、それらを使用してトレーニングを配布するさまざまな方法を示します。

使用tf.distribute.Strategytf.keras.Model.fit

tf.distribute.Strategyに統合されtf.kerasのTensorFlowの実装である、 KerasのAPI仕様tf.keras 、ビルドや電車のモデルへの高レベルAPIです。統合することによってtf.keras 、あなたが使用してKerasトレーニングフレームワークで書かれたあなたのトレーニング配布することのシームレスなバックエンドModel.fit

コードで変更する必要があるものは次のとおりです。

  1. 適切なのインスタンスを作成tf.distribute.Strategy
  2. 内部Kerasモデル、オプティマイザとメトリックの作成に移動strategy.scope

TensorFlow配布戦略は、シーケンシャル、ファンクショナル、サブクラスのすべてのタイプのKerasモデルをサポートします。

これは、1つの密なレイヤーを持つ非常に単純なKerasモデルに対してこれを行うためのコードスニペットです。

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

この例では使用していますMirroredStrategy使用すると、複数のGPUを搭載したマシン上でこれを実行することができますので、。 strategy.scope()のトレーニングを配布するために使用するための戦略Kerasに示します。このスコープ内にモデル/オプティマイザー/メトリックを作成すると、通常の変数の代わりに分散変数を作成できます。これを設定すると、通常どおりにモデルを適合させることができます。 MirroredStrategy 、可能なGPU上でのモデルのトレーニングを複製勾配を集約し、より多くの世話をします。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-08-21 01:24:46.237677: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-08-21 01:24:46.271153: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 0.0086
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0038
2021-08-21 01:24:49.147347: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.0024
0.002372059039771557

ここtf.data.Datasetトレーニングとevalの入力を提供します。 NumPy配列を使用することもできます。

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0017
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 7.4622e-04
2021-08-21 01:24:50.486957: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
<keras.callbacks.History at 0x7f12401ede10>

両方の場合で、 Datasetまたは所与の入力のnumpyの、各バッチは複数のレプリカ間で均等に分割されます。使用している場合、例えば、 MirroredStrategy 2つのGPUで、サイズ10の各バッチは、各工程における各受信5入力例で、2つのGPUの間で分割されます。 GPUを追加すると、各エポックのトレーニングが速くなります。通常、追加の計算能力を効果的に利用するために、アクセラレータを追加するにつれてバッチサイズを増やす必要があります。モデルによっては、学習率を再調整する必要もあります。あなたは使用することができますstrategy.num_replicas_in_syncレプリカの数を取得します。

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras APIサポートされていますサポートされていますサポートされています実験的サポート実験的サポート

例とチュートリアル

上記のKerasとのエンドツーエンドの統合を説明するチュートリアルと例のリストを次に示します。

  1. チュートリアルでMNISTを訓練するMirroredStrategy
  2. チュートリアル使っMNISTを訓練するMultiWorkerMirroredStrategy
  3. ガイド使用MNISTの訓練にTPUStrategy
  4. チュートリアルとTensorFlow 2におけるパラメータのサーバーの訓練のためのParameterServerStrategy
  5. TensorFlowモデルガーデンは、リポジトリの最先端モデルのコレクションを含む、さまざまな戦略を使用して実装しました。

使用tf.distribute.Strategyカスタムトレーニングループで

あなたが見てきたように、使用してtf.distribute.Strategy Kerasのでmodel.fitあなたのコードの唯一の数行を変更する必要があります。もう少し努力し、あなたも使用することができますtf.distribute.Strategyカスタムトレーニングループと。

EstimatorやKerasで可能なよりも柔軟性とトレーニングループの制御が必要な場合は、カスタムトレーニングループを作成できます。たとえば、GANを使用する場合、ラウンドごとに異なる数のジェネレーターまたはディスクリミネーターのステップを実行することができます。同様に、高レベルのフレームワークは強化学習トレーニングにはあまり適していません。

tf.distribute.Strategyクラスは、サポートカスタムトレーニングループにメソッドのコアセットを提供します。これらを使用すると、最初にコードのマイナーな再構築が必要になる場合がありますが、それが完了すると、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えることができるはずです。

ここでは、以前と同じKerasモデルを使用した簡単なトレーニング例のこのユースケースを示す簡単なスニペットが表示されます。

まず、戦略の範囲内でモデルとオプティマイザーを作成します。これにより、モデルとオプティマイザーで作成された変数がミラーリングされた変数になります。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

次に、入力データセットを作成し、呼び出しtf.distribute.Strategy.experimental_distribute_dataset戦略に基づいてデータセットを配布します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-08-21 01:24:50.715370: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

次に、トレーニングの1つのステップを定義します。使用tf.GradientTapeモデルの変数を更新するために、これらのグラデーションを適用するグラデーションやオプティマイザを計算します。この訓練ステップを配布するには、関数に入れtrain_stepとに渡しtf.distrbute.Strategy.runあなたから得たデータセットの入力と一緒にdist_dataset前に作成しました:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上記のコードで注意すべき他のいくつかのこと:

  1. あなたは使用tf.nn.compute_average_loss損失を計算します。 tf.nn.compute_average_loss例あたりの損失を合計し、global_batch_sizeで合計を割ります。勾配は各レプリカで計算され、後にした後、彼らはそれらを合計することによって、レプリカ全体で集約されているので、これは重要です。
  2. また、使用tf.distribute.Strategy.reduceで返される結果集計するAPIをtf.distribute.Strategy.runtf.distribute.Strategy.run戦略の各ローカルレプリカからの結果を返し、この結果を消費する複数の方法があります。あなたはできるreduce集計値を得るためにそれらを。あなたも行うことができますtf.distribute.Strategy.experimental_local_results値のリストは、結果、ローカルレプリカごとに1つに含ま取得します。
  3. お問い合わせの際apply_gradients流通戦略の範囲内で、その動作が変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配のすべてのレプリカの合計を実行します。

あなたが訓練ステップを定義した後、最後に、あなたはを反復することができdist_datasetとループの訓練を実行します。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(1.3084135, shape=(), dtype=float32)
tf.Tensor(1.2977839, shape=(), dtype=float32)
tf.Tensor(1.2872384, shape=(), dtype=float32)
tf.Tensor(1.2767767, shape=(), dtype=float32)
tf.Tensor(1.2663989, shape=(), dtype=float32)
tf.Tensor(1.256105, shape=(), dtype=float32)
tf.Tensor(1.2458944, shape=(), dtype=float32)
tf.Tensor(1.2357674, shape=(), dtype=float32)
tf.Tensor(1.2257235, shape=(), dtype=float32)
tf.Tensor(1.2157627, shape=(), dtype=float32)
tf.Tensor(1.2058848, shape=(), dtype=float32)
tf.Tensor(1.1960893, shape=(), dtype=float32)
tf.Tensor(1.1863762, shape=(), dtype=float32)
tf.Tensor(1.1767453, shape=(), dtype=float32)
tf.Tensor(1.1671963, shape=(), dtype=float32)
tf.Tensor(1.1577287, shape=(), dtype=float32)
tf.Tensor(1.1483426, shape=(), dtype=float32)
tf.Tensor(1.1390375, shape=(), dtype=float32)
tf.Tensor(1.1298131, shape=(), dtype=float32)
tf.Tensor(1.1206692, shape=(), dtype=float32)

上記の例では、繰り返し処理dist_datasetあなたのトレーニングに入力を提供することを。あなたも備えているtf.distribute.Strategy.make_experimental_numpy_dataset numpyの入力をサポートします。あなたは、呼び出す前に、データセットを作成するには、このAPIを使用することができますtf.distribute.Strategy.experimental_distribute_dataset

データを反復処理するもう1つの方法は、反復子を明示的に使用することです。データセット全体を反復処理するのではなく、特定のステップ数で実行する場合に、これを実行することをお勧めします。反復上記今最初のイテレータを作成するように変更され、その後、明示的に呼んでnextの入力データを取得することに。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(1.1116053, shape=(), dtype=float32)
tf.Tensor(1.1026212, shape=(), dtype=float32)
tf.Tensor(1.0937165, shape=(), dtype=float32)
tf.Tensor(1.0848908, shape=(), dtype=float32)
tf.Tensor(1.0761441, shape=(), dtype=float32)
tf.Tensor(1.0674756, shape=(), dtype=float32)
tf.Tensor(1.0588851, shape=(), dtype=float32)
tf.Tensor(1.0503721, shape=(), dtype=float32)
tf.Tensor(1.0419363, shape=(), dtype=float32)
tf.Tensor(1.0335773, shape=(), dtype=float32)

これは、使用しての最も単純なケースカバーtf.distribute.Strategyカスタムトレーニングループを配布するためにAPIを。

現在サポートされているものは何ですか?

トレーニングAPI MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
カスタムトレーニングループサポートされていますサポートされていますサポートされています実験的サポート実験的サポート

例とチュートリアル

カスタムトレーニングループで配布戦略を使用する例を次に示します。

  1. チュートリアル使っMNISTを訓練するMirroredStrategy
  2. ガイド使用MNISTの訓練にTPUStrategy
  3. TensorFlowモデルガーデンは、リポジトリの最先端モデルのコレクションを含む、さまざまな戦略を使用して実装しました。

その他のトピック

このセクションでは、複数のユースケースに関連するいくつかのトピックについて説明します。

TF_CONFIG環境変数の設定

前に述べたように、マルチ労働者の訓練のために、あなたが設定する必要がありTF_CONFIGクラスタで実行中の各バイナリのための環境変数を。 TF_CONFIG環境変数は、クラスタ、それらのアドレスと、クラスタ内の各タスクの役割を構成するどのようなタスクを指定JSON文字列です。 tensorflow/ecosystemレポはKubernetesテンプレート、アップセットを提供TF_CONFIGトレーニングタスクのために。

二つの成分がありTF_CONFIGクラスタとタスクは:。

  • クラスターは、トレーニングクラスターに関する情報を提供します。これは、ワーカーなどのさまざまなタイプのジョブで構成されるdictです。マルチワーカートレーニングでは、通常のワーカーが行うことに加えて、チェックポイントの保存やTensorBoardの要約ファイルの作成などのもう少し責任を負うワーカーが通常1人います。このような労働者は、「チーフ」労働者と呼ばれ、インデックス0の労働者が(実際には、これはどのようにあるチーフ労働者として任命されていることが通例であるさtf.distribute.Strategy実装されています)。
  • 一方、タスクは現在のタスクの情報を提供します。最初のコンポーネントクラスターはすべてのワーカーで同じであり、2番目のコンポーネントタスクはワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。

一例TF_CONFIG次のとおりです。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

このTF_CONFIG 3人の労働者と2があることを指定し"ps"のタスクがで"cluster"彼らのホストとポートと一緒に。 "task"の部分は、中現在のタスクの役割を指定し"cluster" -worker 1 (第2作業員を)。クラスタ内の有効な役割は、 "chief""worker""ps" 、及び"evaluator" 。何があってはなりません"ps"使用している場合を除き、仕事tf.distribute.experimental.ParameterServerStrategy

次は何ですか?

tf.distribute.Strategy積極的に開発中です。それを試してみて、使用してフィードバックを提供GitHubの問題を