![]() | ![]() | ![]() | ![]() |
このチュートリアルでは、カスタムトレーニングループでtf.distribute.Strategy
を使用する方法を示します。ファッションMNISTデータセットで単純なCNNモデルをトレーニングします。ファッションMNISTデータセットには、サイズ28 x28の60000の列車画像とサイズ28x28の10000のテスト画像が含まれています。
カスタムトレーニングループを使用してモデルをトレーニングしています。これにより、トレーニングの柔軟性と制御が向上します。さらに、モデルとトレーニングループのデバッグが簡単になります。
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.3.0
ファッションMNISTデータセットをダウンロードする
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz 32768/29515 [=================================] - 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz 26427392/26421880 [==============================] - 1s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz 8192/5148 [===============================================] - 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz 4423680/4422102 [==============================] - 1s 0us/step
変数とグラフを分散するための戦略を作成します
tf.distribute.MirroredStrategy
戦略はどのように機能しますか?
- すべての変数とモデルグラフがレプリカに複製されます。
- 入力はレプリカ全体に均等に分散されます。
- 各レプリカは、受け取った入力の損失と勾配を計算します。
- グラデーションは、それらを合計することにより、すべてのレプリカ間で同期されます。
- 同期後、各レプリカの変数のコピーに対して同じ更新が行われます。
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1
入力パイプラインを設定する
グラフと変数をプラットフォームに依存しないSavedModel形式にエクスポートします。モデルを保存したら、スコープの有無にかかわらずモデルをロードできます。
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
データセットを作成して配布します。
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
モデルを作成する
tf.keras.Sequential
を使用してモデルを作成します。モデルサブクラス化APIを使用してこれを行うこともできます。
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
損失関数を定義する
通常、1つのGPU / CPUを備えた単一のマシンでは、損失は入力のバッチ内の例の数で除算されます。
では、 tf.distribute.Strategy
を使用する場合、損失はどのように計算するtf.distribute.Strategy
ますか?
たとえば、4つのGPUと64のバッチサイズがあるとします。入力の1つのバッチがレプリカ(4つのGPU)に分散され、各レプリカはサイズ16の入力を取得します。
各レプリカのモデルは、それぞれの入力を使用してフォワードパスを実行し、損失を計算します。ここで、損失をそれぞれの入力の例の数(BATCH_SIZE_PER_REPLICA = 16)で除算する代わりに、損失をGLOBAL_BATCH_SIZE(64)で除算する必要があります。
なぜこれをするのですか?
- これを行う必要があるのは、各レプリカで勾配が計算された後、それらを合計することによってレプリカ間で同期されるためです。
TensorFlowでこれを行う方法は?
このチュートリアルのようにカスタムトレーニングループを作成している場合は、例ごとの損失を合計し、その合計をGLOBAL_BATCH_SIZEで除算する必要があります
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
またはtf.nn.compute_average_loss
を使用できますtf.nn.compute_average_loss
は、例ごとの損失、オプションのサンプルの重み、およびGLOBAL_BATCH_SIZEを引数として取り、スケーリングされた損失を返します。モデルで正規化損失を使用している場合は、レプリカの数で損失値をスケーリングする必要があります。これは、
tf.nn.scale_regularization_loss
関数を使用してtf.nn.scale_regularization_loss
ます。tf.reduce_mean
使用はお勧めしません。そうすることで、損失をレプリカごとの実際のバッチサイズで除算します。これはステップごとに異なる場合があります。この縮小とスケーリングは、
model.compile
とmodel.fit
自動的に行われます。tf.keras.losses
クラスを使用する場合(以下の例のように)、損失削減をNONE
またはSUM
いずれかに明示的に指定する必要があります。AUTO
とSUM_OVER_BATCH_SIZE
一緒に使用する場合は許可されませんtf.distribute.Strategy
。AUTO
は、分散の場合に正しいことを確認するために、ユーザーがどの削減を希望するかを明示的に検討する必要があるため、許可されていません。SUM_OVER_BATCH_SIZE
は、現在、レプリカのバッチサイズごとにのみ除算され、レプリカの数による除算はユーザーにSUM_OVER_BATCH_SIZE
いるため、許可されていません。これは見逃しやすい可能性があります。その代わりに、ユーザーに明示的に削減を行うように依頼します。labels
が多次元の場合は、各サンプルの要素数全体でper_example_loss
を平均します。形状例えば、もしpredictions
された(batch_size, H, W, n_classes)
とlabels
ある(batch_size, H, W)
あなたは更新する必要がありますper_example_loss
:ようper_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
with strategy.scope():
# Set reduction to `none` so we can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
損失と精度を追跡するためのメトリックを定義します
これらのメトリックは、テストの損失とトレーニングおよびテストの精度を追跡します。 .result()
を使用して、いつでも累積統計を取得できます。
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
トレーニングループ
# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# TRAIN LOOP
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# TEST LOOP
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Iterator.get_next_as_optional()` instead. INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). Epoch 1, Loss: 0.50295090675354, Accuracy: 82.1116714477539, Test Loss: 0.3852590322494507, Test Accuracy: 86.5999984741211 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). Epoch 2, Loss: 0.32958829402923584, Accuracy: 88.20333862304688, Test Loss: 0.3391425311565399, Test Accuracy: 87.6500015258789 Epoch 3, Loss: 0.2872008979320526, Accuracy: 89.57167053222656, Test Loss: 0.2974696457386017, Test Accuracy: 89.31000518798828 Epoch 4, Loss: 0.255713552236557, Accuracy: 90.58499908447266, Test Loss: 0.2988712787628174, Test Accuracy: 89.31999969482422 Epoch 5, Loss: 0.23122134804725647, Accuracy: 91.41667175292969, Test Loss: 0.27742496132850647, Test Accuracy: 89.99000549316406 Epoch 6, Loss: 0.212575763463974, Accuracy: 92.17333221435547, Test Loss: 0.2573488652706146, Test Accuracy: 90.75 Epoch 7, Loss: 0.1963273137807846, Accuracy: 92.77166748046875, Test Loss: 0.2587501108646393, Test Accuracy: 90.66000366210938 Epoch 8, Loss: 0.1779220998287201, Accuracy: 93.46666717529297, Test Loss: 0.267805814743042, Test Accuracy: 90.55999755859375 Epoch 9, Loss: 0.16410504281520844, Accuracy: 93.91333770751953, Test Loss: 0.25632956624031067, Test Accuracy: 91.00999450683594 Epoch 10, Loss: 0.14829590916633606, Accuracy: 94.47833251953125, Test Loss: 0.25820475816726685, Test Accuracy: 91.00999450683594
上記の例で注意すべき点:
-
for x in ...
コンストラクトを使用して、train_dist_dataset
とtest_dist_dataset
を繰り返し処理していfor x in ...
。 - スケーリングされた損失は、
distributed_train_step
戻り値です。この値は、使用してレプリカを横切って集約されtf.distribute.Strategy.reduce
の戻り値合計することにより通話をした後、バッチを横切るtf.distribute.Strategy.reduce
呼び出しを。 -
tf.keras.Metrics
内部で更新する必要がありtrain_step
とtest_step
によって実行されますtf.distribute.Strategy.run
。 *tf.distribute.Strategy.run
は、戦略内の各ローカルレプリカから結果を返します。この結果を利用するには、複数の方法があります。tf.distribute.Strategy.reduce
をtf.distribute.Strategy.reduce
して、集計値を取得できます。tf.distribute.Strategy.experimental_local_results
を実行して、結果に含まれる値のリストをローカルレプリカごとに1つずつ取得することもできます。
最新のチェックポイントを復元してテストする
tf.distribute.Strategy
チェックポイントされたモデルは、ストラテジーの有無にかかわらず復元できます。
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.00999450683594
データセットを反復処理する別の方法
イテレータの使用
あなたがデータセット全体を通じて一定数の段階のを反復処理していないしたい場合には、使用してイテレータを作成することができますiter
コールと明示的に呼び出しnext
イテレータ上を。 tf.functionの内部と外部の両方でデータセットを反復処理することを選択できます。これは、イテレータを使用したtf.functionの外部でのデータセットの反復を示す小さなスニペットです。
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
Epoch 10, Loss: 0.12157603353261948, Accuracy: 95.0 Epoch 10, Loss: 0.1367541253566742, Accuracy: 94.6875 Epoch 10, Loss: 0.14902949333190918, Accuracy: 93.90625 Epoch 10, Loss: 0.12149540334939957, Accuracy: 95.625 Epoch 10, Loss: 0.13160167634487152, Accuracy: 94.6875 Epoch 10, Loss: 0.13297739624977112, Accuracy: 95.3125 Epoch 10, Loss: 0.16038034856319427, Accuracy: 94.53125 Epoch 10, Loss: 0.1035340279340744, Accuracy: 96.40625 Epoch 10, Loss: 0.11846740543842316, Accuracy: 95.625 Epoch 10, Loss: 0.09006750583648682, Accuracy: 96.71875
tf.function内での反復
for x in ...
構文を使用するか、上記のように反復子を作成することにより、 train_dist_dataset
内の入力train_dist_dataset
全体を反復処理することもできます。以下の例は、トレーニングの1つのエポックをtrain_dist_dataset
し、関数内でtrain_dist_datasetを反復処理する方法をtrain_dist_dataset
ています。
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
Epoch 1, Loss: 0.13680464029312134, Accuracy: 94.90499877929688 Epoch 2, Loss: 0.12503673136234283, Accuracy: 95.33499908447266 Epoch 3, Loss: 0.11472766101360321, Accuracy: 95.71333312988281 Epoch 4, Loss: 0.10419528931379318, Accuracy: 96.13500213623047 Epoch 5, Loss: 0.09566374123096466, Accuracy: 96.44833374023438 Epoch 6, Loss: 0.08704081922769547, Accuracy: 96.82499694824219 Epoch 7, Loss: 0.08157625794410706, Accuracy: 96.96333312988281 Epoch 8, Loss: 0.07562965154647827, Accuracy: 97.11000061035156 Epoch 9, Loss: 0.0676642507314682, Accuracy: 97.47999572753906 Epoch 10, Loss: 0.06430575996637344, Accuracy: 97.58333587646484
レプリカ全体のトレーニング損失の追跡
損失スケーリングの計算が実行されるため、 tf.metrics.Mean
を使用して異なるレプリカ間でトレーニング損失を追跡することはお勧めしません。
たとえば、次の特性を持つトレーニングジョブを実行する場合:
- 2つのレプリカ
- 各レプリカで2つのサンプルが処理されます
- 結果の損失値:各レプリカの[2、3]および[4、5]
- グローバルバッチサイズ= 4
損失スケーリングでは、損失値を加算し、グローバルバッチサイズで除算することにより、各レプリカの損失のサンプルごとの値を計算します。この場合: (2 + 3) / 4 = 1.25
および(4 + 5) / 4 = 2.25
。
tf.metrics.Mean
を使用して2つのレプリカ間の損失を追跡すると、結果は異なります。この例では、 total
3.50、 count
2になります。これにより、メトリックでresult()
が呼び出されると、 total
/ count
= 1.75にresult()
ます。 tf.keras.Metrics
で計算された損失は、同期しているレプリカの数に等しい追加の係数でスケーリングされます。
ガイドと例
カスタムトレーニングループで配布戦略を使用する例を次に示します。
- 分散トレーニングガイド
MirroredStrategy
を使用したDenseNetの例。-
MirroredStrategy
とTPUStrategy
を使用してトレーニングされたBERTの例。この例は、チェックポイントからロードし、分散トレーニングなどで定期的なチェックポイントを生成する方法を理解するのに特に役立ちます。 -
keras_use_ctl
フラグを使用して有効にできるMirroredStrategy
を使用してトレーニングされたNCFの例。 -
MirroredStrategy
を使用してトレーニングされたNMTの例。
流通戦略ガイドに記載されているその他の例。
次のステップ
- モデルで新しい
tf.distribute.Strategy
試してください。 - TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールの詳細については、ガイドの「パフォーマンス」セクションにアクセスしてください。