추정기(Estimator)를 사용한 다중 작업자 훈련

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

개요

참고: tf.distribute API와 함께 추정기를 사용할 수는 있지만, tf.distribute와 함께 Keras를 사용하는 것을 추천합니다. Keras를 사용한 다중 작업자 훈련을 참조하세요. tf.distribute.Strategy를 추정기와 사용하는 것은 부분적으로만 지원하고 있습니다.

이 튜토리얼은 tf.estimator를 이용한 분산 다중 작업자 훈련을 위해 tf.distribute.Strategy를 사용하는 방법을 보여줍니다. tf.estimator를 사용하여 코드를 작성하고 있고, 고성능의 장비 한 대로 다룰 수 있는 것보다 더 큰 작업을 수행하는 데 관심이 있다면 이 튜토리얼이 적합합니다.

시작하기 전에 분산 전략 가이드를 읽어주세요. 이 튜토리얼과 같은 모델을 사용하는 다중 GPU 훈련 튜토리얼도 관련이 있습니다.

설정

먼저, 텐서플로를 설정하고 필요한 패키지들을 가져옵니다.

import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json

참고: TF2.4부터 다중 작업자 미러링 방법은 즉시 실행이 활성화된 경우(기본 설정) 추정기에서 오류를 일으킵니다. TF2.4의 오류는 TypeError: cannot pickle '_thread.lock' object입니다. 자세한 내용은 이슈 #46556을 참조하세요. 해결 방법은 즉시 실행을 비활성화하는 것입니다.

tf.compat.v1.disable_eager_execution()

입력 함수

이 튜토리얼은 TensorFlow 데이터세트의 MNIST 데이터세트를 사용합니다. 코드 내용은 다중 GPU 훈련 튜토리얼과 유사하지만 큰 차이점이 하나 있습니다. 바로 추정기를 사용하여 다중 작업자 훈련을 할 때는 데이터세트를 작업자 숫자대로 나누어 주어야 모델이 수렴합니다. 입력 데이터는 작업자 인덱스로 샤딩(shard)합니다. 그러면 각 작업자가 데이터세트의 1/num_workers 고유 부분을 처리합니다.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

훈련을 수렴시키기 위한 또 다른 방법으로 각 작업자에서 데이터세트를 제각기 다른 시드 값으로 셔플하는 것도 있습니다.

다중 작업자 구성

다중 GPU 훈련 튜토리얼과 비교할 때 가장 큰 차이 중 하나는 다중 워커를 설정하는 부분입니다. TF_CONFIG 환경 변수는 클러스터를 이루는 각 워커에 클러스터 설정을 지정하는 표준 방법입니다.

TF_CONFIG에는 clustertask라는 두 가지 구성 요소가 있습니다. cluster는 전체 클러스터, 다시 말해 클러스터에 속한 작업자와 매개변수 서버에 대한 정보를 제공합니다. task는 현재 작업에 대한 정보를 제공합니다. 첫 번째 구성 요소 cluster는 모든 작업자 및 매개변수 서버에 대해 동일하며 두 번째 구성 요소 task는 각 작업자 및 매개변수 서버에서 다르며 고유한 typeindex를 지정합니다. 이 예제에서는 작업의 typeworker이고, 작업의 index0입니다.

예를 들기 위해 이 튜토리얼에서는 두 개의 작업자를 localhost에 띄울 때의 TF_CONFIG를 보여드리겠습니다. 실제로는 외부 IP 주소 및 포트에 여러 작업자를 만들고 각 작업자에 대해 TF_CONFIG를 적절하게 설정합니다. 예를 들어 index 작업을 수정합니다.

경고: Colab에서 다음 코드를 실행하지 마세요. TensorFlow의 런타임은 지정된 IP 주소 및 포트에서 gRPC 서버를 생성하려고 시도하지만 실패할 가능성이 높습니다. 단일 시스템에서 여러 작업자를 테스트 실행하는 방법의 예는 이 튜토리얼의 keras 버전을 참조하세요.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

모델 정의하기

훈련을 위하여 레이어와 옵티마이저, 손실 함수를 정의하세요. 이 튜토리얼에서는 다중 GPU 훈련 튜토리얼과 비슷하게 케라스 레이어로 모델을 정의합니다.

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

모델을 훈련하기 위하여 tf.distribute.experimental.MultiWorkerMirroredStrategy의 인스턴스를 사용하세요. MultiWorkerMirroredStrategy는 모든 워커의 각 장비에, 모델의 레이어에 있는 모든 변수의 복사본을 만듭니다. 이 전략은 CollectiveOps라는 수집을 위한 통신용 텐서플로 연산을 사용하여 그래디언트를 모으고, 변수들의 값을 동일하게 맞춥니다. 텐서플로로 분산 훈련하기에 이 전략에 대한 더 자세한 내용이 있습니다.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7462/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

모델 훈련 및 평가하기

다음으로, 추정기의 RunConfig에 분산 전략을 지정하십시오. 그리고 tf.estimator.train_and_evaluate로 훈련 및 평가를 합니다. 이 튜토리얼에서는 train_distribute로만 전략을 지정하였기 때문에 훈련 과정만 분산 처리합니다. eval_distribute를 지정하여 평가도 분산 처리할 수 있습니다.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f38fbfa22d0>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:374: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2021-08-25 20:40:12.522828: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2021-08-25 20:40:12.524184: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.3095288, step = 0
INFO:tensorflow:loss = 2.3095288, step = 0
INFO:tensorflow:global_step/sec: 274.843
INFO:tensorflow:global_step/sec: 274.843
INFO:tensorflow:loss = 2.2987561, step = 100 (0.366 sec)
INFO:tensorflow:loss = 2.2987561, step = 100 (0.366 sec)
INFO:tensorflow:global_step/sec: 309.661
INFO:tensorflow:global_step/sec: 309.661
INFO:tensorflow:loss = 2.287698, step = 200 (0.323 sec)
INFO:tensorflow:loss = 2.287698, step = 200 (0.323 sec)
INFO:tensorflow:global_step/sec: 312.946
INFO:tensorflow:global_step/sec: 312.946
INFO:tensorflow:loss = 2.3068209, step = 300 (0.320 sec)
INFO:tensorflow:loss = 2.3068209, step = 300 (0.320 sec)
INFO:tensorflow:global_step/sec: 306.356
INFO:tensorflow:global_step/sec: 306.356
INFO:tensorflow:loss = 2.2934983, step = 400 (0.327 sec)
INFO:tensorflow:loss = 2.2934983, step = 400 (0.327 sec)
INFO:tensorflow:global_step/sec: 304.325
INFO:tensorflow:global_step/sec: 304.325
INFO:tensorflow:loss = 2.2960126, step = 500 (0.328 sec)
INFO:tensorflow:loss = 2.2960126, step = 500 (0.328 sec)
INFO:tensorflow:global_step/sec: 352.194
INFO:tensorflow:global_step/sec: 352.194
INFO:tensorflow:loss = 2.28951, step = 600 (0.284 sec)
INFO:tensorflow:loss = 2.28951, step = 600 (0.284 sec)
INFO:tensorflow:global_step/sec: 434.226
INFO:tensorflow:global_step/sec: 434.226
INFO:tensorflow:loss = 2.2902632, step = 700 (0.231 sec)
INFO:tensorflow:loss = 2.2902632, step = 700 (0.231 sec)
INFO:tensorflow:global_step/sec: 424.559
INFO:tensorflow:global_step/sec: 424.559
INFO:tensorflow:loss = 2.2757022, step = 800 (0.234 sec)
INFO:tensorflow:loss = 2.2757022, step = 800 (0.234 sec)
INFO:tensorflow:global_step/sec: 672.581
INFO:tensorflow:global_step/sec: 672.581
INFO:tensorflow:loss = 2.2659209, step = 900 (0.148 sec)
INFO:tensorflow:loss = 2.2659209, step = 900 (0.148 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-08-25T20:40:20
INFO:tensorflow:Starting evaluation at 2021-08-25T20:40:20
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.43849s
INFO:tensorflow:Inference Time : 1.43849s
INFO:tensorflow:Finished evaluation at 2021-08-25-20:40:21
INFO:tensorflow:Finished evaluation at 2021-08-25-20:40:21
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2772546
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2772546
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.1376913.
INFO:tensorflow:Loss for final step: 1.1376913.
({'loss': 2.2772546, 'global_step': 938}, [])

훈련 성능 최적화하기

이제 tf.distribute.Strategy에 의해 구동되는 모델과 다중 작업자 지원 Estimator가 있습니다. 다중 작업자 훈련의 성능을 최적화하기 위해 다음 방법을 시도할 수 있습니다.

  • 배치 크기 늘리기: 여기서 지정하는 배치 크기는 GPU당 크기입니다. 일반적으로, GPU 메모리 크기에 맞는 한 가장 크게 배치 크기를 잡는 것이 좋습니다.

  • 변수 형변환: 가능하면 변수를 tf.float 타입으로 바꾸세요. 공식 ResNet 모델의 예제에서 어떻게 변환하는지 볼 수 있습니다.

  • 집합 통신 구현을 사용하세요: MultiWorkerMirroredStrategy는 여러 가지 집합 통신 구현을 제공합니다.

    • RING은 장비 간 통신을 위하여 gRPC를 써서 링 네트워크 기반의 집합 통신을 구현한 것입니다.
    • NCCLNvidia의 NCCL을 사용하여 수집 연산을 구현한 것입니다.
    • AUTO는 런타임이 알아서 고르도록 합니다.

    어떤 집합 구현이 가장 좋은지는 GPU의 숫자와 종류, 클러스터 장비 간 네트워크 연결 등에 따라 다를 수 있습니다. 런타임 자동 선택을 오버라이드하려면, MultiWorkerMirroredStrategy 생성자의 communication 인자에 적절한 값을 주면 됩니다. 예를 들어 communication=tf.distribute.experimental.CollectiveCommunication.NCCL과 같이 주면 됩니다.

가이드의 성능 섹션을 방문하여 TensorFlow 모델의 성능을 최적화하는 데 사용할 수 있는 다른 전략과 도구에 대해 자세히 알아보세요.

다른 코드 예제

  1. 처음부터 끝까지 살펴보는 예제에서는 tensorflow/ecosystem의 쿠버네티스(Kubernetes) 템플릿을 이용하여 다중 워커를 사용하여 훈련합니다. 이 예제에서는 케라스 모델을 만든 후, tf.keras.estimator.model_to_estimator API를 이용하여 추정기 모델로 변환합니다.
  2. 많은 부분을 다중 분산 전략으로 실행할 수 있는 공식 모델.