추정기(Estimator)를 사용한 다중 워커(Multi-worker) 훈련

TensorFlow.org에서 보기 구글 코랩(Colab)에서 실행하기 깃허브(GitHub) 소스 보기 Download notebook

개요

이 튜토리얼은 tf.estimator와 함께 분산 다중 워커 훈련을 하기 위하여 tf.distribute.Strategy를 어떻게 사용하는지 살펴봅니다. tf.estimator를 사용하여 코드를 작성하고 있고, 고성능의 장비 한 대로 다룰 수 있는 것보다 더 큰 작업을 하는 데에 관심이 있으시다면 이 튜토리얼이 알맞습니다.

시작하기 전에, 텐서플로로 분산 훈련하기를 먼저 읽어주세요. 다중 GPU 훈련 튜토리얼도 관련이 있습니다. 이 튜토리얼과 같은 모델을 사용합니다.

설정

먼저, 텐서플로를 설정하고 필요한 패키지들을 가져옵니다.

import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json

입력 함수

이 튜토리얼은 텐서플로 데이터셋(TensorFlow Datasets)의 MNIST 데이터셋을 사용합니다. 코드 내용은 다중 GPU 훈련 튜토리얼과 유사하지만 큰 차이점이 하나 있습니다. 바로 추정기를 써서 다중 워커 훈련을 할 때는 데이터셋을 워커 숫자대로 나누어 주어야 모델이 수렴합니다. 입력 데이터는 워커 인덱스로 샤딩(shard)합니다. 그러면 각 워커 프로세스가 데이터셋을 1/워커 수 만큼씩 겹치지 않게 나누어 갖습니다.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

훈련을 수렴시키기 위한 또 다른 방법으로 각 워커에서 데이터셋을 제각기 다른 시드 값으로 셔플하는 것도 있습니다.

다중 워커 설정

다중 GPU 훈련 튜토리얼과 비교할 때 가장 큰 차이 중 하나는 다중 워커를 설정하는 부분입니다. TF_CONFIG 환경 변수는 클러스터를 이루는 각 워커에 클러스터 설정을 지정하는 표준 방법입니다.

TF_CONFIG에는 clustertask라는 두 가지 구성요소가 있습니다. cluster는 전체 클러스터, 다시 말해 클러스터에 속한 워커와 파라미터 서버에 대한 정보를 제공합니다. task는 현재 작업에 대한 정보를 지정합니다. 이 예제에서는 작업의 typeworker이고, 작업의 index0입니다.

예를 들기 위하여, 이 튜토리얼에서는 두 개의 워커를 localhost에 띄울 때의 TF_CONFIG를 보여드리겠습니다. 실제로는 각 워커를 다른 장비에서 띄울 텐데, 실제 IP 주소와 포트를 할당하고, 그에 맞게 TF_CONFIG를 지정해야 합니다. 예를 들어, 각 장비의 작업 index가 달라야 합니다.

주의: 아래 코드를 코랩에서 실행하지 마십시오. 텐서플로 런타임이 주어진 IP와 포트로 gRPC 서버를 띄우려고 할 텐데, 아마도 실패할 것입니다.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

모델 정의하기

훈련을 위하여 레이어와 옵티마이저, 손실 함수를 정의하세요. 이 튜토리얼에서는 다중 GPU 훈련 튜토리얼과 비슷하게 케라스 레이어로 모델을 정의합니다.

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

모델을 훈련하기 위하여 tf.distribute.experimental.MultiWorkerMirroredStrategy의 인스턴스를 사용하세요. MultiWorkerMirroredStrategy는 모든 워커의 각 장비에, 모델의 레이어에 있는 모든 변수의 복사본을 만듭니다. 이 전략은 CollectiveOps라는 수집을 위한 통신용 텐서플로 연산을 사용하여 그래디언트를 모으고, 변수들의 값을 동일하게 맞춥니다. 텐서플로로 분산 훈련하기에 이 전략에 대한 더 자세한 내용이 있습니다.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

모델 훈련 및 평가하기

다음으로, 추정기의 RunConfig에 분산 전략을 지정하십시오. 그리고 tf.estimator.train_and_evaluate로 훈련 및 평가를 합니다. 이 튜토리얼에서는 train_distribute로만 전략을 지정하였기 때문에 훈련 과정만 분산 처리합니다. eval_distribute를 지정하여 평가도 분산 처리할 수 있습니다.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fce905425f8>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fce90548ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fce90548ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fce90548ea0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:loss = 2.3166714, step = 0

INFO:tensorflow:loss = 2.3166714, step = 0

INFO:tensorflow:global_step/sec: 185.648

INFO:tensorflow:global_step/sec: 185.648

INFO:tensorflow:loss = 2.3105354, step = 100 (0.541 sec)

INFO:tensorflow:loss = 2.3105354, step = 100 (0.541 sec)

INFO:tensorflow:global_step/sec: 206.924

INFO:tensorflow:global_step/sec: 206.924

INFO:tensorflow:loss = 2.3037848, step = 200 (0.483 sec)

INFO:tensorflow:loss = 2.3037848, step = 200 (0.483 sec)

INFO:tensorflow:global_step/sec: 205.815

INFO:tensorflow:global_step/sec: 205.815

INFO:tensorflow:loss = 2.307022, step = 300 (0.486 sec)

INFO:tensorflow:loss = 2.307022, step = 300 (0.486 sec)

INFO:tensorflow:global_step/sec: 211.803

INFO:tensorflow:global_step/sec: 211.803

INFO:tensorflow:loss = 2.3039684, step = 400 (0.472 sec)

INFO:tensorflow:loss = 2.3039684, step = 400 (0.472 sec)

INFO:tensorflow:global_step/sec: 208.473

INFO:tensorflow:global_step/sec: 208.473

INFO:tensorflow:loss = 2.3035712, step = 500 (0.480 sec)

INFO:tensorflow:loss = 2.3035712, step = 500 (0.480 sec)

INFO:tensorflow:global_step/sec: 208.2

INFO:tensorflow:global_step/sec: 208.2

INFO:tensorflow:loss = 2.2912772, step = 600 (0.480 sec)

INFO:tensorflow:loss = 2.2912772, step = 600 (0.480 sec)

INFO:tensorflow:global_step/sec: 209.583

INFO:tensorflow:global_step/sec: 209.583

INFO:tensorflow:loss = 2.2859, step = 700 (0.478 sec)

INFO:tensorflow:loss = 2.2859, step = 700 (0.478 sec)

INFO:tensorflow:global_step/sec: 231.029

INFO:tensorflow:global_step/sec: 231.029

INFO:tensorflow:loss = 2.283319, step = 800 (0.432 sec)

INFO:tensorflow:loss = 2.283319, step = 800 (0.432 sec)

INFO:tensorflow:global_step/sec: 592.365

INFO:tensorflow:global_step/sec: 592.365

INFO:tensorflow:loss = 2.2791746, step = 900 (0.169 sec)

INFO:tensorflow:loss = 2.2791746, step = 900 (0.169 sec)

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-09-23T04:18:52Z

INFO:tensorflow:Starting evaluation at 2020-09-23T04:18:52Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 1.03532s

INFO:tensorflow:Inference Time : 1.03532s

INFO:tensorflow:Finished evaluation at 2020-09-23-04:18:53

INFO:tensorflow:Finished evaluation at 2020-09-23-04:18:53

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2839413

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2839413

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1468079.

INFO:tensorflow:Loss for final step: 1.1468079.

({'loss': 2.2839413, 'global_step': 938}, [])

훈련 성능 최적화하기

이제 모델과 tf.distribute.Strategy로 만든 다중 워커를 사용할 수 있는 추정기가 있습니다. 다중 워커 훈련 성능을 최적화하려면 다음과 같은 방법을 사용해 보십시오.

  • 배치 크기 늘리기: 여기서 지정하는 배치 크기는 GPU당 크기입니다. 일반적으로, GPU 메모리 크기에 맞는 한 가장 크게 배치 크기를 잡는 것이 좋습니다.
  • 변수 형변환: 가능하면 변수를 tf.float 타입으로 바꾸세요. 공식 ResNet 모델의 예제에서 어떻게 변환하는지 볼 수 있습니다.
  • 집합 통신 구현을 사용하세요: MultiWorkerMirroredStrategy는 여러 가지 집합 통신 구현을 제공합니다.

    • RING은 장비 간 통신을 위하여 gRPC를 써서 링 네트워크 기반의 집합 통신을 구현한 것입니다.
    • NCCLNvidia의 NCCL을 사용하여 수집 연산을 구현한 것입니다.
    • AUTO는 런타임이 알아서 고르도록 합니다.

    어떤 집합 구현이 가장 좋은지는 GPU의 숫자와 종류, 클러스터 장비 간 네트워크 연결 등에 따라 다를 수 있습니다. 런타임 자동 선택을 오버라이드하려면, MultiWorkerMirroredStrategy 생성자의 communication 인자에 적절한 값을 주면 됩니다. 예를 들어 communication=tf.distribute.experimental.CollectiveCommunication.NCCL과 같이 주면 됩니다.

다른 코드 예제

  1. 처음부터 끝까지 살펴보는 예제에서는 tensorflow/ecosystem의 쿠버네티스(Kubernetes) 템플릿을 이용하여 다중 워커를 사용하여 훈련합니다. 이 예제에서는 케라스 모델을 만든 후, tf.keras.estimator.model_to_estimator API를 이용하여 추정기 모델로 변환합니다.
  2. 다중 분산 전략으로 실행할 수 있는 공식 ResNet50 모델.