케라스를 사용한 다중 워커(Multi-worker) 훈련

TensorFlow.org에서 보기 구글 코랩(Colab)에서 실행하기 깃허브(GitHub) 소스 보기

개요

이 튜토리얼에서는 tf.distribute.Strategy API를 사용하여 케라스 모델을 다중 워커로 분산 훈련하는 방법을 살펴보겠습니다. 다중 워커를 사용하여 훈련할 수 있도록 전략을 디자인했기 때문에, 단일 워커 훈련용으로 만들어진 케라스 모델도 코드를 조금만 바꾸면 다중 워커를 사용하여 훈련할 수 있습니다.

tf.distribute.Strategy API에 관심이 있으신 분들은 텐서플로로 분산 훈련하기 가이드에서 텐서플로가 제공하는 분산 훈련 전략들을 훑어보실 수 있습니다.

설정

먼저, 텐서플로를 설정하고 필요한 패키지들을 가져옵니다.

from __future__ import absolute_import, division, print_function, unicode_literals
try:
  # %tensorflow_version 기능은 코랩에서만 사용할 수 있습니다.
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

데이터셋 준비하기

MNIST 데이터셋을 TensorFlow Datasets에서 받아옵시다. MNIST 데이터셋은 0-9 숫자를 손으로 쓴 28x28 픽셀 흑백 이미지입니다. 6만 개의 훈련 샘플과 만 개의 테스트 샘플이 들어있습니다.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

# MNIST 데이터를 (0, 255] 범위에서 (0., 1.] 범위로 조정
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255
  return image, label

datasets, info = tfds.load(name='mnist',
                           with_info=True,
                           as_supervised=True)

train_datasets_unbatched = datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
train_datasets = train_datasets_unbatched.batch(BATCH_SIZE)

케라스 모델 만들기

tf.keras.Sequential API를 사용하여 간단한 합성곱 신경망 케라스 모델을 만들고 컴파일하도록 하겠습니다. 우리 MNIST 데이터셋으로 훈련시킬 모델입니다.

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  model.compile(
      loss=tf.keras.losses.sparse_categorical_crossentropy,
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

먼저 단일 워커를 이용하여 적은 수의 에포크만큼만 훈련을 해보고 잘 동작하는지 확인해봅시다. 에포크가 넘어감에 따라 손실(loss)은 줄어들고 정확도는 1.0에 가까워져야 합니다.

single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(x=train_datasets, epochs=3)
Epoch 1/3
938/938 [==============================] - 12s 13ms/step - loss: 2.0621 - accuracy: 0.4405
Epoch 2/3
938/938 [==============================] - 3s 3ms/step - loss: 1.2361 - accuracy: 0.7200
Epoch 3/3
938/938 [==============================] - 3s 3ms/step - loss: 0.6834 - accuracy: 0.8369

<tensorflow.python.keras.callbacks.History at 0x7fdaf02af7f0>

다중 워커 구성

자 이제 다중 워커 훈련의 세계로 들어가 봅시다. 텐서플로에서 여러 장비를 사용할 때는 TF_CONFIG 환경 변수를 설정해야 합니다. 하나의 클러스터를 구성하는 각 장비에 클러스터 구성을 알려주고 각각 다른 역할을 부여하기 위해 TF_CONFIG를 사용합니다.

TF_CONFIGclustertask 두 개의 부분으로 구성됩니다. cluster에는 훈련 클러스터에 대한 정보를 지정합니다. worker 같은 여러 타입의 작업 이름을 키로 하는 파이썬 딕셔너리를 지정합니다. 다중 워커 훈련에서는 보통 일반적인 워커보다 조금 더 많은 일을 하는 특별한 워커가 하나 필요합니다. 이 워커는 체크포인트를 저장하거나, 서머리(summary)를 쓰는 일 등을 추가로 담당하게 됩니다. 보통 치프('chief') 워커라고 부르고, 관례적으로 index 번호가 0인 워커가 치프 워커가 됩니다(사실 tf.distribute.Strategy가 이렇게 구현되었습니다). 한편 task에는 현재 워커의 작업에 대한 정보를 지정합니다.

이 예에서는 작업(task) type"worker"로 지정하고, index0으로 지정하였습니다. 이 말은 이 장비가 첫 번째 워커이고, 따라서 치프 워커이며, 다른 워커보다 더 많은 일을 하게 된다는 뜻입니다. 물론 다른 장비들에도 TF_CONFIG 환경변수가 설정되어야 합니다. 다른 장비들에도 cluster에는 동일한 딕셔너리를 지정하겠지만, task에는 각 장비의 역할에 따라 다른 작업 type이나 index를 지정해야 합니다.

예시를 위하여, 이 튜토리얼에서는 두 개의 워커를 localhost에 띄우는 방법을 보여드리겠습니다. 실제로는 각 워커를 다른 장비에서 띄울텐데, 실제 IP 주소와 포트를 할당하고, 그에 맞게 TF_CONFIG를 지정해야 합니다.

주의: 아래 코드를 코랩에서 실행하지 마십시오. 텐서플로 런타임이 주어진 IP와 포트로 gRPC 서버를 띄우려고 할 텐데, 아마도 실패할 것입니다.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

이 예제에서는 학습률을 바꾸지 않고 그대로 사용한 것에 주의하십시오. 실제로는 전역(global) 배치 크기에 따라 학습률을 조정해야 할 수 있습니다.

적절한 전략 고르기

텐서플로의 분산 전략은 크게 각 훈련 단계가 워커들이 가진 복제본들끼리 동기화되는 동기 훈련 방식과, 동기화가 엄격하게 이루어지지 않는 비동기 훈련 방식이 있습니다.

이 튜토리얼에서 다루는 MultiWorkerMirroredStrategy는 동기 다중 워커 훈련에서 추천하는 전략입니다. 모델을 훈련하려면 tf.distribute.experimental.MultiWorkerMirroredStrategy 인스턴스를 하나 만드십시오. MultiWorkerMirroredStrategy는 모델의 레이어에 있는 모든 변수의 복사본을 각 워커의 장치마다 만듭니다. 그리고 수집 작업을 위한 텐서플로 연산인 CollectiveOps를 사용하여 그래디언트를 모으고, 각 변수의 값을 동기화합니다. tf.distribute.Strategy 가이드에 이 전략에 대한 더 자세한 설명이 있습니다.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.

Warning:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.

INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategyCollectiveCommunication 매개변수로 선택할 수 있는 여러 가지 구현체를 제공합니다. RING(링)은 링 구조 기반의 수집 작업 구현체이고, 장비 간 통신을 위하여 gRPC를 사용합니다. NCCLNvidia의 NCCL로 수집 작업을 구현한 것입니다. AUTO를 지정하면, 런타임이 알아서 선택합니다. 어떤 수집 작업 구현체가 최적인지는 GPU의 종류와 수, 클러스터 내 네트워크 연결 등 여러 요소에 따라 달라집니다.

MultiWorkerMirroredStrategy로 모델 훈련하기

다중 워커 분산 훈련을 위하여 tf.distribute.Strategy API를 tf.keras와 함께 사용하려면, 딱 한 가지만 바꾸면 됩니다. 바로 모델 구성과 model.compile() 호출 코드를 strategy.scope() 안으로 넣는 것입니다. 분산 전략의 범위(scope)를 써서 변수를 어디에 어떻게 만들지 지정할 수 있습니다. MultiWorkerMirroredStrategy의 경우, 만들어지는 변수는 MirroredVariable이고, 각 워커에 복제본이 생깁니다.

NUM_WORKERS = 2
# 여기서 배치 크기는 워커의 수를 곱한 크기로 늘려야 합니다. `tf.data.Dataset.batch`에는
# 전역 배치 크기를 지정해야 하기 때문입니다. 전에는 64였지만, 이제 128이 됩니다.
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS
train_datasets = train_datasets_unbatched.batch(GLOBAL_BATCH_SIZE)
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3)
Epoch 1/3
469/469 [==============================] - 9s 18ms/step - loss: 2.1940 - accuracy: 0.2534
Epoch 2/3
469/469 [==============================] - 1s 3ms/step - loss: 1.8323 - accuracy: 0.6556
Epoch 3/3
469/469 [==============================] - 1s 3ms/step - loss: 1.2882 - accuracy: 0.7726

<tensorflow.python.keras.callbacks.History at 0x7fdb6d0d9d30>

데이터셋 샤딩과 배치 크기

다중 워커 훈련에서는 수렴과 성능을 위하여 데이터를 여러 부분으로 샤딩(sharding)해야 합니다. 하지만, 위 코드 예에서는 데이터셋을 샤딩하지 않고 바로 model.fit()으로 보낸 것을 볼 수 있습니다. 이는 tf.distribute.Strategy API가 다중 워커 훈련에 맞게 자동으로 데이터셋을 샤딩해주기 때문입니다.

만약 훈련할 때 샤딩을 직접 하고 싶다면, tf.data.experimental.DistributeOptions API를 사용해서 자동 샤딩 기능을 끌 수 있습니다. 다음과 같이 말입니다.

options = tf.data.Options()
options.experimental_distribute.auto_shard = False
train_datasets_no_auto_shard = train_datasets.with_options(options)

또 하나 주목할 점은 datasets의 배치 크기입니다. 앞서 코드에서 GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS로 지정하였습니다. 단일 워커일 때보다 NUM_WORKERS 배만큼 크게 지정한 것입니다. 이는 실제로 각 워커에 전달되는 배치 크기가 tf.data.Dataset.batch()에 매개변수로 전달된 전역 배치 크기를 워커의 수로 나눈 것이 되기 때문입니다. 즉, 이렇게 바꾸어야 실제로 워커가 처리하는 배치 크기가 단일 워커일 때와 동일한 값이 됩니다.

성능

이제 케라스 모델이 완성되었습니다. MultiWorkerMirroredStrategy를 사용하여 여러 워커를 사용하여 훈련할 수 있습니다. 다중 워커 훈련의 성능을 더 높이려면 다음 기법들을 확인해 보십시오.

  • MultiWorkerMirroredStrategy는 여러 가지 수집 작업 통신 구현체를 제공합니다. RING(링)은 링 구조 기반의 수집 작업 구현체이고, 장비 간 통신을 위하여 gRPC를 사용합니다. NCCLNvidia의 NCCL로 수집 작업을 구현한 것입니다. AUTO를 지정하면, 런타임이 알아서 선택합니다. 어떤 수집 작업 구현체가 최적인지는 GPU의 종류와 수, 클러스터 내 네트워크 연결 등 여러 요소에 따라 달라집니다. 런타임이 알아서 선택한 것을 바꾸려면, MultiWorkerMirroredStrategy 생성자의 communication 매개변수에 적절한 값을 지정하십시오. 예를 들면 communication=tf.distribute.experimental.CollectiveCommunication.NCCL과 같이 지정합니다.
  • 가능하면 변수를 tf.float 타입으로 바꾸십시오. 공식 ResNet 모델을 보면 어떻게 바꾸는지 예제가 있습니다.

내결함성

동기 훈련 방식에서는, 워커 중 하나가 죽으면 전체 클러스터가 죽어버리고, 복구 메커니즘이 따로 없습니다. 하지만 케라스와 tf.distribute.Strategy를 함께 사용하면, 워커가 죽거나 불안정해지는 경우에도 내결함성을 제공합니다. 이는 사용자가 선택한 분산 파일 시스템에 훈련 상태를 저장하는 기능을 제공하기 때문입니다. 기존 인스턴스가 죽거나 정지당해서 재시작되는 경우에도 훈련 상태를 복구할 수 있습니다.

모든 워커가 훈련 에포크 혹은 스텝에 따라 동기화되므로, 다른 워커들은 죽거나 정지당한 워커가 복구될 때까지 기다려야 합니다.

ModelCheckpoint 콜백

다중 워커 훈련의 내결함 기능을 사용하려면, tf.keras.Model.fit()를 호출할 때 tf.keras.callbacks.ModelCheckpoint의 인스턴스를 제공해야 합니다. 이 콜백이 체크포인트와 훈련 상태를 ModelCheckpointfilepath 매개변수에 지정한 디렉터리에 저장합니다.

# `filepath` 매개변수를 모든 워커가 접근할 수 있는 파일 시스템 경로로 바꾸십시오.
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3, callbacks=callbacks)
Epoch 1/3
    469/Unknown - 9s 19ms/step - loss: 2.1680 - accuracy: 0.3467WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.5/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1781: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.5/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1781: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets

INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets

469/469 [==============================] - 10s 21ms/step - loss: 2.1680 - accuracy: 0.3467
Epoch 2/3
461/469 [============================>.] - ETA: 0s - loss: 1.7832 - accuracy: 0.5980INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets

INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets

469/469 [==============================] - 2s 4ms/step - loss: 1.7793 - accuracy: 0.5997
Epoch 3/3
461/469 [============================>.] - ETA: 0s - loss: 1.2209 - accuracy: 0.7724INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets

INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets

469/469 [==============================] - 2s 4ms/step - loss: 1.2170 - accuracy: 0.7732

<tensorflow.python.keras.callbacks.History at 0x7fdb6ce3fda0>

워커가 정지당하면, 정지당한 워커가 다시 살아날 때까지 전체 클러스터가 잠시 멈춥니다. 워커가 클러스터에 다시 들어오면, 다른 워커도 재시작됩니다. 모든 워커가 이전에 저장한 체크포인트 파일을 읽고, 예전 상태를 불러오면 클러스터가 다시 일관된 상태가 됩니다. 그리고서 훈련이 재개됩니다.

ModelCheckpointfilepath가 위치한 디렉터리를 살펴보면, 임시로 생성된 체크포인트 파일들을 확인할 수 있을 것입니다. 이 파일들은 실패한 작업을 복구하는데 필요한 것들로, 다중 워커 훈련 작업을 성공적으로 마치고 나면 tf.keras.Model.fit() 함수가 끝날 때 라이브러리가 알아서 삭제할 것입니다.

참조

  1. 텐서플로로 분산 훈련하기 가이드는 사용 가능한 분산 전략들을 개괄하고 있습니다.
  2. 공식 ResNet50 모델은 MirroredStrategyMultiWorkerMirroredStrategy로 훈련할 수 있습니다.