훈련 루프와 함께 tf.distribute.Strategy 사용하기

TensorFlow.org에서 보기 구글 코랩(Colab)에서 실행하기 깃허브(GitHub) 소스 보기

이 예제는 사용자 정의 훈련 루프(custom training loops)와 함께 tf.distribute.Strategy를 사용하는 법을 보여드립니다. 우리는 간단한 CNN 모델을 패션 MNIST 데이터셋에 대해 훈련을 할 것입니다. 패션 MNIST 데이터셋은 60000개의 28 x 28 크기의 훈련 이미지들과 10000개의 28 x 28 크기의 테스트 이미지들을 포함하고 있습니다.

이 예제는 유연성을 높이고, 훈련을 더 잘 제어할 수 있도록 사용자 정의 훈련 루프를 사용합니다. 또한, 사용자 훈련 루프를 사용하는 것은 모델과 훈련 루프를 디버깅하기 쉬워집니다.

from __future__ import absolute_import, division, print_function, unicode_literals

# Import TensorFlow
import tensorflow as tf

# 헬퍼 라이브러리들
import numpy as np
import os

print(tf.__version__)
2.0.0-rc2

패션 MNIST 데이터셋 다운로드

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()


# 하나의 차원을 배열에 추가 -> 새로운 shape == (28, 28, 1)
# 이렇게 하는 이유는 우리의 모델에서 첫 번째 층이 합성곱 층이고
# 합성곱 층은 4D 입력을 요구하기 때문입니다.
# (batch_size, height, width, channels).
# batch_size 차원은 나중에 추가할 것입니다.

train_images = train_images[..., None]
test_images = test_images[..., None]

# 이미지를 [0, 1] 범위로 변경하기.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 0s 0us/step

변수와 그래프를 분산하는 전략 만들기

tf.distribute.MirroredStrategy 전략이 어떻게 동작할까요? * 모든 변수와 모델 그래프는 장치(replicas, 다른 문서에서는 replica가 분산 훈련에서 장치 등에 복제된 모델을 의미하는 경우가 있으나 이 문서에서는 장치 자체를 의미합니다)에 복제됩니다. * 입력은 장치에 고르게 분배되어 들어갑니다. * 각 장치는 주어지는 입력에 대해서 손실(loss)과 그래디언트를 계산합니다. * 그래디언트들을 전부 더함으로써 모든 장치들 간에 그래디언트들이 동기화됩니다. * 동기화된 후에, 동일한 업데이트가 각 장치에 있는 변수의 복사본(copies)에 동일하게 적용됩니다.

노트: 하나의 범위를 지정해서 모든 코드를 집어넣을 수 있습니다. 자, 같이 살펴보시죠!

# 만약 장치들의 목록이 `tf.distribute.MirroredStrategy` 생성자 안에 명시되어 있지 않다면,
# 자동으로 장치를 인식할 것입니다.
strategy = tf.distribute.MirroredStrategy()
print ('장치의 수: {}'.format(strategy.num_replicas_in_sync))
장치의 수: 1

입력 파이프라인 설정하기

그래프와 변수를 플랫폼과 무관한 SavedModel 형식으로 내보냅니다. 모델을 내보냈다면, 모델을 불러올 때 범위(scope)를 지정해도 되고 하지 않아도 됩니다.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

분산 데이터셋들을 strategy.scope 내에 생성합니다.

with strategy.scope():

  train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
  train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
  
  test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 
  test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

모델 만들기

tf.keras.Sequential을 사용해서 모델을 생성합니다. Model Subclassing API로도 모델 생성을 할 수 있습니다.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

  return model
# 체크포인트들을 저장하기 위해서 체크포인트 디렉토리를 생성합니다.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

손실 함수 정의하기

일반적으로, GPU/CPU 비율이 1인 단일 장치에서 손실은 입력 배치(batch)의 샘플 개수로 나누어집니다.

그렇다면, tf.distribute.Strategy를 사용할 때, 손실은 어떻게 계산되어야 할까요?

  • 예를들면, 4개의 GPU가 있고 입력 배치 크기가 64라고 하죠. 입력 배치 하나가 여러 개의 장치(4개의 GPU)에 분배됩니다. 각 장치는 크기가 16인 입력을 받습니다.

  • 각 장치에 있는 모델은 해당 입력에 대해 정방향 계산(forward pass)을 수행하고 손실을 계산합니다. 손실을 장치에 할당된 입력 샘플의 수(BATCH_SIZE_PER_REPLICA = 16)로 나누는 것이 아니라 GLOBAL_BATCH_SIZE(64)로 나누어야 합니다.

왜 이렇게 할까요?

  • 위와 같이 계산하는 이유는 그래디언트들이 각 장치에서 계산된 다음, 모든 장치를 동기화하기 위해 이 그래디언트 값들을 전부 더하기 때문입니다.

이 것을 텐서플로에서는 어떻게 할까요?

  • 만약 이 예제처럼 사용자 정의 훈련 루프를 작성한다면, 다음과 같이 샘플당 손실을 더하고 GLOBAL_BATCH_SIZE로 나누어야 합니다. scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) 또는 tf.nn.compute_average_loss 함수를 사용할 수도 있습니다. 이 함수는 샘플당 손실과 선택적으로 샘플 가중치, GLOBAL_BATCH_SIZE를 매개변수 값으로 받고 스케일이 조정된 손실을 반환합니다.

  • 만약 규제 손실을 사용하는 모델이라면, 장치 개수로 손실 값을 스케일 조정해야 합니다. 이는 tf.nn_scale_regularization_loss 함수를 사용하여 처리할 수 있습니다.

  • tf.reduce_mean을 사용하는 것은 추천하지 않습니다. 이렇게 하는 것은 손실을 실제 장치당 배치 크기로 나눕니다. 이 실제 장치당 배치 크기는 아마 각 단계(step)마다 크기가 다를 수 있습니다.

  • 이런 축소(reduction)와 스케일 조정은 케라스의 model.compilemodel.fit에서 자동적으로 수행됩니다.

  • 만약 tf.keras.losses 클래스(아래의 예제에서처럼)를 사용한다면, reduction 매개변수를 명시적으로 NONE 또는 SUM 중 하나로 표시해야 합니다. AUTO가 허용되지 않는 것은 사용자가 분산 모드에서 어떻게 축소할지 명시적으로 지정하는 것이 바람직하기 때문입니다. 현재 SUM_OVER_BATCH_SIZE가 장치당 배치 크기로만 나누고 장치 개수로 나누는 것은 사용자에게 위임하기 때문입니다. 그래서 이렇게 하는 것 대신에 사용자가 명시적으로 축소를 수행하도록 만드는 것이 좋습니다.

with strategy.scope():
  # reduction을 `none`으로 설정합니다. 그래서 우리는 축소를 나중에 하고,
  # GLOBAL_BATCH_SIZE로 나눌 수 있습니다.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      reduction=tf.keras.losses.Reduction.NONE)
  # 또는 loss_fn = tf.keras.losses.sparse_categorical_crossentropy를 사용해도 됩니다.
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

손실과 정확도를 기록하기 위한 지표 정의하기

이 지표(metrics)는 테스트 손실과 훈련 정확도, 테스트 정확도를 기록합니다. .result()를 사용해서 누적된 통계값들을 언제나 볼 수 있습니다.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

훈련 루프

# 모델과 옵티마이저는 `strategy.scope`에서 만들어져야 합니다.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
with strategy.scope():
  def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
      predictions = model(images, training=True)
      loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_accuracy.update_state(labels, predictions)
    return loss 

  def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)
with strategy.scope():
  # `experimental_run_v2`는 주어진 계산을 복사하고,
  # 분산된 입력으로 계산을 수행합니다.
  
  @tf.function
  def distributed_train_step(dataset_inputs):
    per_replica_losses = strategy.experimental_run_v2(train_step,
                                                      args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                           axis=None)
 
  @tf.function
  def distributed_test_step(dataset_inputs):
    return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))

  for epoch in range(EPOCHS):
    # 훈련 루프
    total_loss = 0.0
    num_batches = 0
    for x in train_dist_dataset:
      total_loss += distributed_train_step(x)
      num_batches += 1
    train_loss = total_loss / num_batches

    # 테스트 루프
    for x in test_dist_dataset:
      distributed_test_step(x)

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)

    template = ("에포크 {}, 손실: {}, 정확도: {}, 테스트 손실: {}, "
                "테스트 정확도: {}")
    print (template.format(epoch+1, train_loss,
                           train_accuracy.result()*100, test_loss.result(),
                           test_accuracy.result()*100))

    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
에포크 1, 손실: 0.5001617670059204, 정확도: 81.9800033569336, 테스트 손실: 0.3974190056324005, 테스트 정확도: 85.37999725341797
에포크 2, 손실: 0.33394601941108704, 정확도: 87.94833374023438, 테스트 손실: 0.32548752427101135, 테스트 정확도: 88.3499984741211
에포크 3, 손실: 0.2879915237426758, 정확도: 89.60499572753906, 테스트 손실: 0.30214235186576843, 테스트 정확도: 89.05000305175781
에포크 4, 손실: 0.25683072209358215, 정확도: 90.65833282470703, 테스트 손실: 0.27727892994880676, 테스트 정확도: 90.02000427246094
에포크 5, 손실: 0.23288790881633759, 정확도: 91.44499969482422, 테스트 손실: 0.2653945982456207, 테스트 정확도: 90.30000305175781
에포크 6, 손실: 0.21560721099376678, 정확도: 92.03166961669922, 테스트 손실: 0.28444525599479675, 테스트 정확도: 89.67000579833984
에포크 7, 손실: 0.19633154571056366, 정확도: 92.67166900634766, 테스트 손실: 0.25517672300338745, 테스트 정확도: 90.70999908447266
에포크 8, 손실: 0.18195079267024994, 정확도: 93.22166442871094, 테스트 손실: 0.252077579498291, 테스트 정확도: 90.81999969482422
에포크 9, 손실: 0.16769452393054962, 정확도: 93.711669921875, 테스트 손실: 0.2730155885219574, 테스트 정확도: 90.44999694824219
에포크 10, 손실: 0.1520342379808426, 정확도: 94.34166717529297, 테스트 손실: 0.25690993666648865, 테스트 정확도: 90.88999938964844

위의 예제에서 주목해야 하는 부분 * 이 예제는 train_dist_datasettest_dist_datasetfor x in ... 구조를 통해서 반복합니다. * 스케일이 조정된 손실은 distributed_train_step의 반환값입니다. tf.distribute.Strategy.reduce 호출을 사용해서 장치들 간의 스케일이 조정된 손실 값을 전부 합칩니다. 그리고 나서 tf.distribute.Strategy.reduce 반환 값을 더하는 식으로 배치 간의 손실을 모읍니다. * tf.keras.Metricstf.distribute.Strategy.experimental_run_v2에 의해서 실행되는 train_steptest_step 함수 안에서 업데이트 되어야 합니다. * tf.distribute.Strategy.experimental_run_v2는 그 전략안에 포함된 각 지역 복제 모델로부터 결과값을 반환해 줍니다. 그리고 이 결과를 사용하는 몇 가지 방법들이 있습니다. tf.distribute.Strategy.reduce를 이용하여 값들을 합칠 수 있습니다. tf.distribute.Strategy.experimental_local_results를 사용해서 결과값(지역 복제 모델 당 하나의 결과값)에 들어있는 값들의 리스트를 얻을 수도 있습니다.

최신 체크포인트를 불러와서 테스트하기

tf.distribute.Strategy를 사용해서 체크포인트가 만들어진 모델은 전략 사용 여부에 상관없이 불러올 수 있습니다.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: {}'.format(
    eval_accuracy.result()*100))
전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: 90.44999694824219

데이터셋에 대해 반복작업을 하는 다른 방법들

반복자(iterator)를 사용하기

만약 주어진 스텝의 수에 따라서 반복하기 원하면서 전체 데이터셋을 보는 것을 원치 않는다면, iter를 호출하여 반복자를 만들 수 있습니다. 그 다음 명시적으로 next를 호출합니다. 또한, tf.funtion 내부 또는 외부에서 데이터셋을 반복하도록 설정 할 수 있습니다. 다음은 반복자를 사용하여 tf.function 외부에서 데이터셋을 반복하는 코드 예제입니다.

with strategy.scope():
  for _ in range(EPOCHS):
    total_loss = 0.0
    num_batches = 0
    train_iter = iter(train_dist_dataset)

    for _ in range(10):
      total_loss += distributed_train_step(next(train_iter))
      num_batches += 1
    average_train_loss = total_loss / num_batches

    template = ("에포크 {}, 손실: {}, 정확도: {}")
    print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
    train_accuracy.reset_states()
에포크 10, 손실: 0.1385430097579956, 정확도: 95.15625
에포크 10, 손실: 0.17082223296165466, 정확도: 94.0625
에포크 10, 손실: 0.13082504272460938, 정확도: 94.0625
에포크 10, 손실: 0.14208829402923584, 정확도: 94.6875
에포크 10, 손실: 0.16222235560417175, 정확도: 93.90625
에포크 10, 손실: 0.10496318340301514, 정확도: 96.5625
에포크 10, 손실: 0.14079435169696808, 정확도: 95.3125
에포크 10, 손실: 0.1420704424381256, 정확도: 94.6875
에포크 10, 손실: 0.11773526668548584, 정확도: 95.625
에포크 10, 손실: 0.12115661799907684, 정확도: 95.0

tf.function 내부에서 반복하기

전체 입력 train_dist_dataset에 대해서 tf.function 내부에서 for x in ... 생성자를 사용함으로써 반복을 하거나, 위에서 사용했던 것처럼 반복자를 사용함으로써 반복을 할 수 있습니다. 아래의 예제에서는 tf.function로 한 훈련의 에포크를 감싸고 그 함수에서 train_dist_dataset를 반복하는 것을 보여 줍니다.

with strategy.scope():
  @tf.function
  def distributed_train_epoch(dataset):
    total_loss = 0.0
    num_batches = 0
    for x in dataset:
      per_replica_losses = strategy.experimental_run_v2(train_step,
                                                        args=(x,))
      total_loss += strategy.reduce(
        tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
      num_batches += 1
    return total_loss / tf.cast(num_batches, dtype=tf.float32)

  for epoch in range(EPOCHS):
    train_loss = distributed_train_epoch(train_dist_dataset)

    template = ("Epoch {}, Loss: {}, Accuracy: {}")
    print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

    train_accuracy.reset_states()
Epoch 1, Loss: 0.1437469869852066, Accuracy: 94.63833618164062
Epoch 2, Loss: 0.12903869152069092, Accuracy: 95.1066665649414
Epoch 3, Loss: 0.11952802538871765, Accuracy: 95.6300048828125
Epoch 4, Loss: 0.10788594186306, Accuracy: 95.94499969482422
Epoch 5, Loss: 0.10107263922691345, Accuracy: 96.24666595458984
Epoch 6, Loss: 0.09155285358428955, Accuracy: 96.63999938964844
Epoch 7, Loss: 0.08459325134754181, Accuracy: 96.82166290283203
Epoch 8, Loss: 0.07766937464475632, Accuracy: 97.1449966430664
Epoch 9, Loss: 0.07159435003995895, Accuracy: 97.3499984741211
Epoch 10, Loss: 0.06595112383365631, Accuracy: 97.50833129882812

장치 간의 훈련 손실 기록하기

노트: 일반적인 규칙으로, tf.keras.Metrics를 사용하여 샘플당 손실 값을 기록하고 장치 내부에서 값이 합쳐지는 것을 피해야 합니다.

tf.metrics.Mean을 사용하여 여러 장치의 훈련 손실을 기록하는 것을 추천하지 않습니다. 왜냐하면 손실의 스케일을 조정하는 계산이 수행되기 때문입니다.

예를 들어, 다음과 같은 조건의 훈련을 수행한다고 합시다. * 두개의 장치 * 두개의 샘플들이 각 장치에 의해 처리됩니다. * 손실 값을 산출합니다: 각각의 장치에 대해 [2, 3]과 [4, 5] * Global batch size = 4

손실의 스케일 조정을 하면, 손실 값을 더하고 GLOBAL_BATCH_SIZE로 나누어 각 장치에 대한 샘플당 손실값을 계산할 수 있습니다. 이 경우에는 (2 + 3) / 4 = 1.24와 (4 + 5) / 4 = 2.25입니다.

만약 tf.metrics.Mean을 사용해서 두 개의 장치에 대해 손실값을 계산한다면, 결과값이 다릅니다. 이 예제에서는, 측정 지표의 result()가 메서드가 호출될 때 total이 3.50이고 count가 2입니다. 결과값은 total/count가 1.75가 됩니다. tf.keras.Metrics를 이용해서 계산한 손실값이 추가적인 요인에 의해서 크기조정되며, 이 추가적인 요인은 동기화되는 장치의 개수입니다.

예제와 튜토리얼

사용자 정의 훈련루프를 포함한 분산 전략을 사용하는 몇 가지 예제가 있습니다.

  1. MirroredStrategy를 사용해서 MNIST를 훈련하는 Tutorial
  2. MirroredStrategy를 사용하는 DenseNet 예제.
  3. MirroredStrategyTPUStrategy를 사용해서 훈련하는 BERT 예제. 이 예제는 분산 훈련 중에 어떻게 체크포인트로부터 불러오는지와 어떻게 주기적으로 체크포인트들을 생성해 내는지를 이해하기에 정말 좋습니다.
  4. keras_use_ctl flag를 사용해서 활성화 할 수 있는 MirroredStrategy를 이용해서 훈련되는 NCF 예제
  5. MirroredStrategy을 사용해서 훈련되는 NMT 예제.

더 많은 예제는 여기에 있습니다. Distribution strategy guide

다음단계

새로운 tf.distribute.Strategy API를 모델에 적용해 보세요.