추정기(Estimator)를 사용한 다중 워커(Multi-worker) 훈련

TensorFlow.org에서 보기 구글 코랩(Colab)에서 실행하기 깃허브(GitHub) 소스 보기

개요

이 튜토리얼은 tf.estimator와 함께 분산 다중 워커 훈련을 하기 위하여 tf.distribute.Strategy를 어떻게 사용하는지 살펴봅니다. tf.estimator를 사용하여 코드를 작성하고 있고, 고성능의 장비 한 대로 다룰 수 있는 것보다 더 큰 작업을 하는 데에 관심이 있으시다면 이 튜토리얼이 알맞습니다.

시작하기 전에, 텐서플로로 분산 훈련하기를 먼저 읽어주세요. 다중 GPU 훈련 튜토리얼도 관련이 있습니다. 이 튜토리얼과 같은 모델을 사용합니다.

설정

먼저, 텐서플로를 설정하고 필요한 패키지들을 가져옵니다.

from __future__ import absolute_import, division, print_function, unicode_literals 
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json

입력 함수

이 튜토리얼은 텐서플로 데이터셋(TensorFlow Datasets)의 MNIST 데이터셋을 사용합니다. 코드 내용은 다중 GPU 훈련 튜토리얼과 유사하지만 큰 차이점이 하나 있습니다. 바로 추정기를 써서 다중 워커 훈련을 할 때는 데이터셋을 워커 숫자대로 나누어 주어야 모델이 수렴합니다. 입력 데이터는 워커 인덱스로 샤딩(shard)합니다. 그러면 각 워커 프로세스가 데이터셋을 1/워커 수 만큼씩 겹치지 않게 나누어 갖습니다.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

훈련을 수렴시키기 위한 또 다른 방법으로 각 워커에서 데이터셋을 제각기 다른 시드 값으로 셔플하는 것도 있습니다.

다중 워커 설정

다중 GPU 훈련 튜토리얼과 비교할 때 가장 큰 차이 중 하나는 다중 워커를 설정하는 부분입니다. TF_CONFIG 환경 변수는 클러스터를 이루는 각 워커에 클러스터 설정을 지정하는 표준 방법입니다.

TF_CONFIG에는 clustertask라는 두 가지 구성요소가 있습니다. cluster는 전체 클러스터, 다시 말해 클러스터에 속한 워커와 파라미터 서버에 대한 정보를 제공합니다. task는 현재 작업에 대한 정보를 지정합니다. 이 예제에서는 작업의 typeworker이고, 작업의 index0입니다.

예를 들기 위하여, 이 튜토리얼에서는 두 개의 워커를 localhost에 띄울 때의 TF_CONFIG를 보여드리겠습니다. 실제로는 각 워커를 다른 장비에서 띄울 텐데, 실제 IP 주소와 포트를 할당하고, 그에 맞게 TF_CONFIG를 지정해야 합니다. 예를 들어, 각 장비의 작업 index가 달라야 합니다.

주의: 아래 코드를 코랩에서 실행하지 마십시오. 텐서플로 런타임이 주어진 IP와 포트로 gRPC 서버를 띄우려고 할 텐데, 아마도 실패할 것입니다.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

모델 정의하기

훈련을 위하여 레이어와 옵티마이저, 손실 함수를 정의하세요. 이 튜토리얼에서는 다중 GPU 훈련 튜토리얼과 비슷하게 케라스 레이어로 모델을 정의합니다.

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

모델을 훈련하기 위하여 tf.distribute.experimental.MultiWorkerMirroredStrategy의 인스턴스를 사용하세요. MultiWorkerMirroredStrategy는 모든 워커의 각 장비에, 모델의 레이어에 있는 모든 변수의 복사본을 만듭니다. 이 전략은 CollectiveOps라는 수집을 위한 통신용 텐서플로 연산을 사용하여 그래디언트를 모으고, 변수들의 값을 동일하게 맞춥니다. 텐서플로로 분산 훈련하기에 이 전략에 대한 더 자세한 내용이 있습니다.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

모델 훈련 및 평가하기

다음으로, 추정기의 RunConfig에 분산 전략을 지정하십시오. 그리고 tf.estimator.train_and_evaluate로 훈련 및 평가를 합니다. 이 튜토리얼에서는 train_distribute로만 전략을 지정하였기 때문에 훈련 과정만 분산 처리합니다. eval_distribute를 지정하여 평가도 분산 처리할 수 있습니다.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f944c428f98>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
Downloading and preparing dataset mnist (11.06 MiB) to /home/kbuilder/tensorflow_datasets/mnist/3.0.0...

WARNING:absl:Dataset mnist is hosted on GCS. It will automatically be downloaded to your
local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead set
data_dir=gs://tfds-data/datasets.


Dataset mnist downloaded and prepared to /home/kbuilder/tensorflow_datasets/mnist/3.0.0. Subsequent calls will reuse this data.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1635: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1635: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:loss = 2.3306842, step = 0

INFO:tensorflow:loss = 2.3306842, step = 0

INFO:tensorflow:global_step/sec: 201.029

INFO:tensorflow:global_step/sec: 201.029

INFO:tensorflow:loss = 2.301099, step = 100 (0.500 sec)

INFO:tensorflow:loss = 2.301099, step = 100 (0.500 sec)

INFO:tensorflow:global_step/sec: 284.468

INFO:tensorflow:global_step/sec: 284.468

INFO:tensorflow:loss = 2.2985516, step = 200 (0.352 sec)

INFO:tensorflow:loss = 2.2985516, step = 200 (0.352 sec)

INFO:tensorflow:global_step/sec: 274.961

INFO:tensorflow:global_step/sec: 274.961

INFO:tensorflow:loss = 2.2957296, step = 300 (0.363 sec)

INFO:tensorflow:loss = 2.2957296, step = 300 (0.363 sec)

INFO:tensorflow:global_step/sec: 285.727

INFO:tensorflow:global_step/sec: 285.727

INFO:tensorflow:loss = 2.2967663, step = 400 (0.350 sec)

INFO:tensorflow:loss = 2.2967663, step = 400 (0.350 sec)

INFO:tensorflow:global_step/sec: 289.581

INFO:tensorflow:global_step/sec: 289.581

INFO:tensorflow:loss = 2.2676642, step = 500 (0.345 sec)

INFO:tensorflow:loss = 2.2676642, step = 500 (0.345 sec)

INFO:tensorflow:global_step/sec: 290.669

INFO:tensorflow:global_step/sec: 290.669

INFO:tensorflow:loss = 2.2634776, step = 600 (0.344 sec)

INFO:tensorflow:loss = 2.2634776, step = 600 (0.344 sec)

INFO:tensorflow:global_step/sec: 283.014

INFO:tensorflow:global_step/sec: 283.014

INFO:tensorflow:loss = 2.2565768, step = 700 (0.353 sec)

INFO:tensorflow:loss = 2.2565768, step = 700 (0.353 sec)

INFO:tensorflow:global_step/sec: 326.933

INFO:tensorflow:global_step/sec: 326.933

INFO:tensorflow:loss = 2.2565258, step = 800 (0.305 sec)

INFO:tensorflow:loss = 2.2565258, step = 800 (0.305 sec)

INFO:tensorflow:global_step/sec: 680.572

INFO:tensorflow:global_step/sec: 680.572

INFO:tensorflow:loss = 2.2334833, step = 900 (0.147 sec)

INFO:tensorflow:loss = 2.2334833, step = 900 (0.147 sec)

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-01-24T14:52:40Z

INFO:tensorflow:Starting evaluation at 2020-01-24T14:52:40Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 0.88196s

INFO:tensorflow:Inference Time : 0.88196s

INFO:tensorflow:Finished evaluation at 2020-01-24-14:52:41

INFO:tensorflow:Finished evaluation at 2020-01-24-14:52:41

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.252755

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.252755

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1245012.

INFO:tensorflow:Loss for final step: 1.1245012.

({'loss': 2.252755, 'global_step': 938}, [])

훈련 성능 최적화하기

이제 모델과 tf.distribute.Strategy로 만든 다중 워커를 사용할 수 있는 추정기가 있습니다. 다중 워커 훈련 성능을 최적화하려면 다음과 같은 방법을 사용해 보십시오.

  • 배치 크기 늘리기: 여기서 지정하는 배치 크기는 GPU당 크기입니다. 일반적으로, GPU 메모리 크기에 맞는 한 가장 크게 배치 크기를 잡는 것이 좋습니다.
  • 변수 형변환: 가능하면 변수를 tf.float 타입으로 바꾸세요. 공식 ResNet 모델의 예제에서 어떻게 변환하는지 볼 수 있습니다.
  • 집합 통신 구현을 사용하세요: MultiWorkerMirroredStrategy는 여러 가지 집합 통신 구현을 제공합니다.

    • RING은 장비 간 통신을 위하여 gRPC를 써서 링 네트워크 기반의 집합 통신을 구현한 것입니다.
    • NCCLNvidia의 NCCL을 사용하여 수집 연산을 구현한 것입니다.
    • AUTO는 런타임이 알아서 고르도록 합니다.

    어떤 집합 구현이 가장 좋은지는 GPU의 숫자와 종류, 클러스터 장비 간 네트워크 연결 등에 따라 다를 수 있습니다. 런타임 자동 선택을 오버라이드하려면, MultiWorkerMirroredStrategy 생성자의 communication 인자에 적절한 값을 주면 됩니다. 예를 들어 communication=tf.distribute.experimental.CollectiveCommunication.NCCL과 같이 주면 됩니다.

다른 코드 예제

  1. 처음부터 끝까지 살펴보는 예제에서는 tensorflow/ecosystem의 쿠버네티스(Kubernetes) 템플릿을 이용하여 다중 워커를 사용하여 훈련합니다. 이 예제에서는 케라스 모델을 만든 후, tf.keras.estimator.model_to_estimator API를 이용하여 추정기 모델로 변환합니다.
  2. 다중 분산 전략으로 실행할 수 있는 공식 ResNet50 모델.