このページは Cloud Translation API によって翻訳されました。
Switch to English

tf.distribute.Strategyを使用したカスタムトレーニング

TensorFlow.orgで見る Google Colabで実行 GitHubでソースを表示する ノートブックをダウンロード

このチュートリアルでは、カスタムトレーニングループでtf.distribute.Strategyを使用する方法を示します。ファッションMNISTデータセットで簡単なCNNモデルをトレーニングします。ファッションMNISTデータセットには、サイズ28 x 28の60000トレイン画像とサイズ28 x 28の10000テスト画像が含まれています。

カスタムトレーニングループを使用してモデルをトレーニングしているのは、トレーニングをより柔軟に制御できるためです。さらに、モデルとトレーニングループのデバッグが容易になります。

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

ファッションMNISTデータセットをダウンロードする

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 1s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 1s 0us/step

変数とグラフを分散する戦略を作成する

tf.distribute.MirroredStrategy戦略はどのように機能しますか?

  • すべての変数とモデルグラフがレプリカに複製されます。
  • 入力はレプリカ全体に均等に分散されます。
  • 各レプリカは、受け取った入力の損失と勾配を計算します。
  • 勾配は、それらを合計することにより、すべてのレプリカ間で同期されます。
  • 同期後、各レプリカの変数のコピーに同じ更新が行われます。
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

入力パイプラインのセットアップ

グラフと変数をプラットフォームに依存しないSavedModel形式にエクスポートします。モデルが保存された後、スコープの有無にかかわらずそれをロードできます。

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

データセットを作成して配布します。

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

モデルを作成する

tf.keras.Sequentialを使用してモデルを作成します。これを行うには、モデルサブクラス化APIを使用することもできます。

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

損失関数を定義する

通常、1 GPU / CPUを搭載した単一のマシンでは、損失は入力バッチ内の例の数で除算されます。

それで、 tf.distribute.Strategyを使用するとき、損失はどのように計算されるべきですか?

  • たとえば、4つのGPUと64のバッチサイズがあるとします。1つの入力バッチがレプリカ(4つのGPU)に分散され、各レプリカはサイズ16の入力を取得します。

  • 各レプリカのモデルは、それぞれの入力でフォワードパスを実行し、損失を計算します。ここで、損失をそれぞれの入力の例の数(BATCH_SIZE_PER_REPLICA = 16)で除算する代わりに、損失をGLOBAL_BATCH_SIZE(64)で除算する必要があります。

なぜこれを行うのですか?

  • これは、勾配が各レプリカで計算された後、それらを合計することによってレプリカ間で同期されるため、実行する必要があります。

TensorFlowでこれを行う方法?

  • このチュートリアルのようにカスタムトレーニングループを作成している場合は、例ごとの損失を合計し、その合計をGLOBAL_BATCH_SIZEで除算する必要がありますscale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)またはtf.nn.compute_average_lossを使用できますtf.nn.compute_average_lossは、例ごとの損失、オプションのサンプルの重み、およびGLOBAL_BATCH_SIZEを引数として取り、スケーリングされた損失を返します。

  • モデルで正則化損失を使用している場合は、レプリカの数で損失値をスケーリングする必要があります。これを行うには、 tf.nn.scale_regularization_loss関数を使用します。

  • tf.reduce_mean使用は推奨されません。そうすることで、損失がレプリカごとの実際のバッチサイズで除算され、ステップごとに異なる場合があります。

  • この削減とスケーリングは、keras model.compilemodel.fit自動的に行われます

  • (以下の例のように) tf.keras.lossesクラスを使用する場合、損失削減はNONEまたはSUMいずれかに明示的に指定する必要があります。 AUTOSUM_OVER_BATCH_SIZE一緒に使用する場合は許可されませんtf.distribute.StrategyAUTOは許可されていません。分散の場合、ユーザーがどの削減が正しいかを確認するために、それを明確に検討する必要があるためです。 SUM_OVER_BATCH_SIZEは許可されていません。現在、レプリカのバッチサイズで除算され、レプリカの数による除算はユーザーにSUM_OVER_BATCH_SIZEいるため、 SUM_OVER_BATCH_SIZEなためです。そのため、代わりにユーザーに明示的に削減を依頼します。

  • labelsが多次元の場合は、各サンプルの要素数全体でper_example_lossを平均します。たとえば、 predictionsの形状が(batch_size, H, W, n_classes)labels(batch_size, H, W)場合、 per_example_lossように更新する必要があります: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

損失と精度を追跡するための指標を定義する

これらのメトリックは、テストの損失とトレーニングおよびテストの精度を追跡します。 .result()を使用して、いつでも累積統計を取得できます。

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

トレーニングループ

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1, Loss: 0.50295090675354, Accuracy: 82.1116714477539, Test Loss: 0.3852590322494507, Test Accuracy: 86.5999984741211
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2, Loss: 0.32958829402923584, Accuracy: 88.20333862304688, Test Loss: 0.3391425311565399, Test Accuracy: 87.6500015258789
Epoch 3, Loss: 0.2872008979320526, Accuracy: 89.57167053222656, Test Loss: 0.2974696457386017, Test Accuracy: 89.31000518798828
Epoch 4, Loss: 0.255713552236557, Accuracy: 90.58499908447266, Test Loss: 0.2988712787628174, Test Accuracy: 89.31999969482422
Epoch 5, Loss: 0.23122134804725647, Accuracy: 91.41667175292969, Test Loss: 0.27742496132850647, Test Accuracy: 89.99000549316406
Epoch 6, Loss: 0.212575763463974, Accuracy: 92.17333221435547, Test Loss: 0.2573488652706146, Test Accuracy: 90.75
Epoch 7, Loss: 0.1963273137807846, Accuracy: 92.77166748046875, Test Loss: 0.2587501108646393, Test Accuracy: 90.66000366210938
Epoch 8, Loss: 0.1779220998287201, Accuracy: 93.46666717529297, Test Loss: 0.267805814743042, Test Accuracy: 90.55999755859375
Epoch 9, Loss: 0.16410504281520844, Accuracy: 93.91333770751953, Test Loss: 0.25632956624031067, Test Accuracy: 91.00999450683594
Epoch 10, Loss: 0.14829590916633606, Accuracy: 94.47833251953125, Test Loss: 0.25820475816726685, Test Accuracy: 91.00999450683594

上記の例で注意すべき点:

最新のチェックポイントを復元してテストする

tf.distribute.Strategyチェックポイントされたモデルは、戦略の有無にかかわらず復元できます。

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.00999450683594

データセットを反復する別の方法

イテレータの使用

あなたがデータセット全体を通じて一定数の段階のを反復処理していないしたい場合には、使用してイテレータを作成することができますiterコールと明示的に呼び出しnextイテレータ上を。 tf.functionの内部と外部の両方でデータセットを反復処理することを選択できます。以下は、イテレータを使用してtf.functionの外部でデータセットを反復処理することを示す小さなスニペットです。

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.12157603353261948, Accuracy: 95.0
Epoch 10, Loss: 0.1367541253566742, Accuracy: 94.6875
Epoch 10, Loss: 0.14902949333190918, Accuracy: 93.90625
Epoch 10, Loss: 0.12149540334939957, Accuracy: 95.625
Epoch 10, Loss: 0.13160167634487152, Accuracy: 94.6875
Epoch 10, Loss: 0.13297739624977112, Accuracy: 95.3125
Epoch 10, Loss: 0.16038034856319427, Accuracy: 94.53125
Epoch 10, Loss: 0.1035340279340744, Accuracy: 96.40625
Epoch 10, Loss: 0.11846740543842316, Accuracy: 95.625
Epoch 10, Loss: 0.09006750583648682, Accuracy: 96.71875

tf.function内での反復

for x in ...コンストラクトを使用するか、上記のようにイテレーターを作成することにより、 train_dist_dataset内の入力train_dist_dataset全体を反復処理することもできます。以下の例は、トレーニングの1エポックをtrain_dist_datasetし、関数内でtrain_dist_datasetを反復処理する方法をtrain_dist_datasetています。

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13680464029312134, Accuracy: 94.90499877929688
Epoch 2, Loss: 0.12503673136234283, Accuracy: 95.33499908447266
Epoch 3, Loss: 0.11472766101360321, Accuracy: 95.71333312988281
Epoch 4, Loss: 0.10419528931379318, Accuracy: 96.13500213623047
Epoch 5, Loss: 0.09566374123096466, Accuracy: 96.44833374023438
Epoch 6, Loss: 0.08704081922769547, Accuracy: 96.82499694824219
Epoch 7, Loss: 0.08157625794410706, Accuracy: 96.96333312988281
Epoch 8, Loss: 0.07562965154647827, Accuracy: 97.11000061035156
Epoch 9, Loss: 0.0676642507314682, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06430575996637344, Accuracy: 97.58333587646484

レプリカ間でのトレーニング損失の追跡

tf.metrics.Meanを使用してさまざまなレプリカのトレーニングロスを追跡することお勧めしませ 。これは、ロススケーリング計算が実行されるためです。

たとえば、次の特性を持つトレーニングジョブを実行するとします。

  • 2つのレプリカ
  • 各レプリカで2つのサンプルが処理されます
  • 結果の損失値:各レプリカの[2、3]および[4、5]
  • グローバルバッチサイズ= 4

損失のスケーリングでは、損失値を追加し、グローバルバッチサイズで除算することにより、各レプリカの損失のサンプルごとの値を計算します。この場合、 (2 + 3) / 4 = 1.25および(4 + 5) / 4 = 2.25です。

tf.metrics.Meanを使用して2つのレプリカ間の損失を追跡する場合、結果は異なります。この例では、 totalが3.50、 countが2になり、メトリックでresult()が呼び出されると、 total / count = 1.75にresult()ます。 tf.keras.Metricsで計算された損失は、同期しているレプリカの数に等しい追加の係数によってスケーリングされます。

ガイドと例

カスタムトレーニングループで配布戦略を使用するいくつかの例を次に示します。

  1. 分散型トレーニングガイド
  2. MirroredStrategyを使用したDenseNetの例。
  3. MirroredStrategyTPUStrategyを使用してトレーニングされたBERTの例。この例は、分散トレーニングなどでチェックポイントからロードして定期的なチェックポイントを生成する方法を理解するのに特に役立ちます。
  4. keras_use_ctlフラグを使用して有効にできるMirroredStrategyを使用してトレーニングされたNCFの例。
  5. MirroredStrategyを使用してトレーニングされたNMTの例。

配布戦略ガイドにリストされているその他の例。

次のステップ