케라스를 사용한 다중 워커(Multi-worker) 훈련

TensorFlow.org에서 보기 Google Colab에서 실행하기 GitHub에서 소스 보기 노트북 다운로드하기

개요

이 튜토리얼에서는 tf.distribute.MultiWorkerMirroredStrategy API를 사용하여 tf.distribute.MultiWorkerMirroredStrategy 모델 및 Model.fit API로 다중 작업자 분산 훈련을 수행하는 방법을 보여줍니다. 이 전략을 이용하면 단일 작업자에서 실행되도록 설계된 Keras 모델을 최소한의 코드 변경만으로 여러 작업자에서 원활하게 작동하도록 할 수 있습니다.

Keras 및 사용자 정의 훈련 루프와 함께 MultiWorkerMirroredStrategy를 사용하는 방법에 대해 알아보려면 Keras 및 MultiWorkerMirroredStrategy를 이용한 사용자 정의 훈련 루프를 참조하세요.

이 튜토리얼에는 데모용으로 두 개의 작업자가 있는 최소 다중 작업자 예제가 포함되어 있습니다.

적절한 전략 선택

시작하기 전에 tf.distribute.MultiWorkerMirroredStrategy가 해당 가속기와 훈련에 적합한 선택인지 확인하세요. 다음은 데이터 병렬 처리로 훈련을 배포하는 두 가지 일반적인 방법입니다.

  • 첫 번째는 tf.distribute.MirroredStrategy, tf.distribute.TPUStrategytf.distribute.MultiWorkerMirroredStrategy와 같이 훈련 단계가 작업자와 복제본 간에 동기화되는 동기식 훈련입니다. 모든 작업자는 동기화된 입력 데이터의 다른 조각에 대해 훈련하고 각 단계에서 그래디언트를 집계합니다.
  • 두 번째는 tf.distribute.experimental.ParameterServerStrategy와 같이 훈련 단계가 엄격하게 동기화되지 않은 비동기 훈련입니다. 모든 작업자는 입력 데이터에 대해 독립적으로 훈련하고 변수를 비동기적으로 업데이트합니다.

TPU가 없는 다중 작업자 동기식 훈련을 찾고 있다면 tf.distribute.MultiWorkerMirroredStrategy를 선택해야 합니다. 이는 모든 작업자에 걸쳐 각 장치의 모델 레이어에 있는 모든 변수의 복사본을 만들고, 집합적 통신을 위한 TensorFlow 연산인 CollectiveOps를 사용하여 그래디언트를 집계하고 변수를 동기화 상태로 유지합니다. 관심 있는 분들은 집합적 구현 옵션에 대한 tf.distribute.experimental.CommunicationOptions 매개변수를 확인해 보세요.

tf.distribute.Strategy API에 대한 개요는 TensorFlow의 분산 훈련을 참조하세요.

설정

몇 가지 필요한 가져오기로 시작합니다.

import json
import os
import sys

TensorFlow를 가져오기 전에 환경을 일부 변경합니다.

  • 실제 적용 상황에서 각 작업자는 다른 시스템에 있을 것입니다. 이 튜토리얼의 목적상 모든 작업자는 시스템에서 실행됩니다. 따라서 모든 작업자가 동일한 GPU를 사용하려고 하여 오류가 발생하지 않도록 모든 GPU를 비활성화합니다.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  • TF_CONFIG 환경 변수를 재설정합니다(나중에 자세히 알아볼 것입니다).
os.environ.pop('TF_CONFIG', None)
  • 현재 디렉터리가 Python의 경로에 있도록 합니다. 그러면 나중에 %%writefile이 쓴 파일을 노트북이 가져올 수 있습니다.
if '.' not in sys.path:
  sys.path.insert(0, '.')

tf.keras.callbacks.BackupAndRestoresave_freq 인수를 사용하여 특정 단계에서 체크포인트의 빈도를 저장하는 기능이 TensorFlow 2.10부터 도입되었으므로 tf-nightly를 설치합니다.

pip install tf-nightly

마지막으로 TensorFlow를 가져옵니다.

import tensorflow as tf
2022-12-15 01:43:39.072372: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay

데이터세트 및 모델 정의

다음으로, 간단한 모델 및 데이터세트 설정으로 mnist_setup.py 파일을 만듭니다. 이 Python 파일은 이 튜토리얼에서 작업자 프로세스에 사용됩니다.

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

단일 작업자에 대한 모델 훈련

먼저 몇 개의 epoch에 대해 모델을 훈련하고 단일 작업자의 결과를 관찰하여 이상이 없는지 확인합니다. 훈련이 진행됨에 따라 손실은 감소하고 정확도는 증가합니다.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
2022-12-15 01:43:42.188844: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 7ms/step - loss: 2.2791 - accuracy: 0.1067
Epoch 2/3
70/70 [==============================] - 0s 7ms/step - loss: 2.2241 - accuracy: 0.3194
Epoch 3/3
70/70 [==============================] - 0s 7ms/step - loss: 2.1610 - accuracy: 0.4893
<keras.callbacks.History at 0x7f2717617160>

다중 작업자 구성

이제 다중 작업자 훈련의 세계로 들어가 보겠습니다.

작업 및 태스크가 있는 클러스터

TensorFlow에서 분산 훈련에는 여러 작업이 있는 'cluster'가 포함되며 각 작업에는 하나 이상의 'task'가 있을 수 있습니다.

각각의 역할이 다를 수 있는 여러 머신에서의 훈련을 위해 TF_CONFIG 구성 환경 변수가 필요합니다. TF_CONFIG는 클러스터의 일부인 각 작업자에 대한 클러스터 구성을 지정하는 데 사용되는 JSON 문자열입니다.

TF_CONFIG 변수에는 'cluster''task'의 두 가지 구성요소가 있습니다.

  • 'cluster'는 모든 작업자에 대해 동일하며 'worker' 또는 'chief'와 같은 다양한 유형의 작업으로 구성된 사전인 훈련 클러스터에 대한 정보를 제공합니다.

    • tf.distribute.MultiWorkerMirroredStrategy를 이용한 다중 작업자 훈련의 경우, 일반적으로 하나의 'worker'가 있으며, 이 작업자가 정규 'worker'가 수행하는 작업과 더불어 체크포인트를 저장하고 TensorBoard에 대한 요약 파일을 작성하는 등의 작업을 책임집니다. 이러한 'worker'를 책임 작업자라고 합니다(작업 이름은 'chief').
    • 'index' 0인 작업자가 'chief'가 되는 것이 일반적입니다.
  • 'task'는 현재 작업에 대한 정보를 제공하며 작업자마다 다릅니다. 이를 통해 해당 작업자의 'type''index'가 지정됩니다.

다음은 구성의 예입니다.

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

tf_config는 단순히 Python의 지역 변수입니다. 훈련 구성에 사용하려면 JSON으로 직렬화하고 TF_CONFIG 환경 변수에 배치합니다.

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

위의 구성 예에서는 'type' 작업을 'worker'로 설정하고 'index' 작업을 0으로 설정했습니다. 따라서 이 시스템는 첫 번째 작업자이며, 'chief' 작업자로 지정됩니다.

참고: 다른 시스템에도 TF_CONFIG 환경 변수가 설정되어 있어야 하며 동일한 'cluster' 사전이 있어야 하지만 해당 시스템의 역할에 따라 다른 작업 'type' 또는 작업 'index'를 갖습니다.

실제로 외부 IP 주소/포트에 여러 작업자를 만들고 그에 따라 각 작업자에 TF_CONFIG 변수를 설정합니다. 설명을 위해 이 튜토리얼에서는 localhost에 두 작업자로 TF_CONFIG 변수를 설정하는 방법을 보여줍니다.

  • 위와 같이 첫 번째('chief') 작업자의 TF_CONFIG
  • 두 번째 작업자의 경우 tf_config['task']['index']=1을 설정합니다.

노트북의 환경 변수 및 하위 프로세스

하위 프로세스는 상위 요소로부터 환경 변수를 상속합니다. 따라서 이 Jupyter Notebook 프로세스에서 다음과 같이 환경 변수를 설정하는 경우:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

... 하위 프로세스에서 환경 변수에 액세스할 수 있습니다.

echo ${GREETINGS}
Hello TensorFlow!

다음 섹션에서는 이 메서드를 사용하여 TF_CONFIG를 작업자 하위 프로세스에 전달합니다. 실제 시나리오에서는 이런 식으로 작업을 시작하지 않습니다. 이 튜토리얼은 단순히 최소한의 다중 작업자 예제로 작업을 수행하는 방법을 보여줍니다.

모델 훈련하기

모델을 훈련하려면 먼저 tf.distribute.MultiWorkerMirroredStrategy의 인스턴스를 만듭니다.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

참고: MultiWorkerMirroredStrategy.init()가 호출될 경우, TF_CONFIG는 파싱되며 TensorFlow GRPC 서버는 구동됩니다. 따라서 tf.distribute.Strategy 인스턴스 생성 전에 TF_CONFIG 환경변수가 설정되어야 합니다. TF_CONFIG가 아직 설정되지 않았으므로 상기 전략은 사실상 단일 워커 훈련입니다.

tf.distribute.Strategy API를 tf.keras에 통합하면 여러 작업자에게 훈련을 분배하기 위해 유일하게 수행하는 변경은 모델 구축 및 model.compile() 호출을 strategy.scope()로 감싸는 것입니다. 분산 전략의 범위에 따라 변수가 생성되는 방식과 위치가 결정되고 MultiWorkerMirroredStrategy의 경우, 생성된 변수는 MirroredVariable이며 각 작업자에 복제됩니다.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

참고: 현재, MultiWorkerMirroredStrategy에는 전략 인스턴스가 생성된 후 TensorFlow ops를 생성해야 하는 제한이 있습니다. RuntimeError: Collective ops must be configured at program startup가 발생하면 프로그램 맨 처음에 MultiWorkerMirroredStrategy 인스턴스를 만들고 전략이 인스턴스화된 후에 ops를 생성할 수 있는 코드를 넣으세요.

실제로 MultiWorkerMirroredStrategy로 실행하려면 작업자 프로세스를 실행하고 TF_CONFIG를 전달해야 합니다.

앞서 작성한 mnist_setup.py 파일과 마찬가지로 다음은 각 작업자가 실행할 main.py입니다.

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

위의 코드 스니펫에서 Dataset.batch로 전달되는 global_batch_sizeper_worker_batch_size * num_workers로 설정된다는 점에 주목하세요. 그러면 각 작업자가 작업자 수에 관계없이 per_worker_batch_size 예제 배치를 처리하게 됩니다.

현재 디렉터리에는 이제 두 Python 파일이 모두 포함됩니다.

ls *.py
main.py
mnist_setup.py

TF_CONFIG를 JSON으로 직렬화하고 환경 변수에 추가합니다.

os.environ['TF_CONFIG'] = json.dumps(tf_config)

이제, main.py를 실행하고 TF_CONFIG를 사용하는 작업자 프로세스를 시작할 수 있습니다.

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

위의 명령에서 몇 가지 주의할 사항이 있습니다.

  1. 일부 bash 명령을 실행하기 위해 노트북 "매직"%%bash가 사용됩니다.
  2. --bg 플래그를 사용하여 백그라운드에서 bash 프로세스를 실행합니다. 이 작업자는 종료되지 않기 때문입니다. 시작하기 전에 모든 작업자를 기다립니다.

백그라운드 작업자 프로세스는 이 노트북에 출력을 인쇄하지 않으므로 &>는 출력을 파일로 리디렉션하고 나중에 로그 파일에서 발생한 상황을 검사할 수 있습니다.

따라서 프로세스가 시작될 때까지 몇 초 동안 기다리세요.

import time
time.sleep(10)

이제, 지금까지 작업자의 로그 파일에 출력된 내용을 검사합니다.

cat job_0.log
2022-12-15 01:43:45.590694: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
2022-12-15 01:43:47.627300: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

로그 파일의 마지막 줄은 다음과 같아야 합니다: Started server with target: grpc://localhost:12345. 이제 첫 번째 작업자가 준비되었으며 다른 모든 작업자가 계속 진행할 준비가 되기를 기다립니다.

따라서 두 번째 작업자 프로세스가 시작되도록 tf_config를 업데이트합니다.

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

두 번째 작업자를 시작합니다. 모든 작업자가 활성 상태이므로 훈련이 시작됩니다(따라서 이 프로세스를 백그라운드로 처리할 필요가 없음).

python main.py
2022-12-15 01:43:55.790767: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
2022-12-15 01:43:57.912229: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-12-15 01:43:59.021660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-12-15 01:43:59.325746: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 44ms/step - loss: 2.2751 - accuracy: 0.1901
Epoch 2/3
70/70 [==============================] - 3s 44ms/step - loss: 2.2069 - accuracy: 0.4153
Epoch 3/3
70/70 [==============================] - 3s 44ms/step - loss: 2.1303 - accuracy: 0.5448

첫 번째 작업자가 작성한 로그를 다시 확인하면 작업자가 해당 모델 훈련에 참여했음을 알 수 있습니다.

cat job_0.log
2022-12-15 01:43:45.590694: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
2022-12-15 01:43:47.627300: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-12-15 01:43:59.020150: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-12-15 01:43:59.325986: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 44ms/step - loss: 2.2751 - accuracy: 0.1901
Epoch 2/3
70/70 [==============================] - 3s 44ms/step - loss: 2.2069 - accuracy: 0.4153
Epoch 3/3
70/70 [==============================] - 3s 44ms/step - loss: 2.1303 - accuracy: 0.5448

참고: 단일 시스템에서 여러 작업자를 실행하면 오버헤드만 추가되기 때문에 이 튜토리얼의 시작 부분에서 실행한 테스트보다 느리게 실행될 수 있습니다. 여기서 목표는 훈련 시간을 개선하는 것이 아니라 다중 작업자 훈련의 예를 제공하는 것입니다.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

심층적 다중 작업자 훈련

지금까지 기본적인 다중 작업자 설정을 수행하는 방법을 배웠습니다. 튜토리얼의 나머지 부분에서는 실제 사용 사례에 유용하거나 중요할 수 있는 다른 요소에 대해 자세히 설명합니다.

데이터세트 샤딩

다중 작업자 훈련 중 수렴과 성능을 보장하려면 데이터 샤딩이 필요합니다.

이전 섹션의 예는 tf.distribute.Strategy API에서 제공하는 기본 자동 샤딩을 이용합니다. tf.data.experimental.DistributeOptionstf.data.experimental.AutoShardPolicy를 설정하여 샤딩을 제어할 수 있습니다.

자동 샤딩에 대한 자세한 내용은 분산 입력 가이드를 참조하세요.

다음은 각 복제본이 모든 예를 처리하도록 자동 샤딩을 해제하는 방법을 보여주는 간단한 예입니다(권장하지 않음).

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

평가

validation_dataModel.fit에도 전달하면 각 epoch에 대해 훈련과 평가가 번갈아 이루어집니다. 평가 작업은 동일한 작업자 집합에 배포되고 그 결과가 집계되어 모든 작업자가 사용할 수 있습니다.

훈련과 마찬가지로 검증 데이터세트는 파일 수준에서 자동으로 샤딩됩니다. 검증 데이터세트에서 전역 배치 크기를 설정하고 validation_steps를 설정해야 합니다.

평가를 위해 반복되는 데이터세트(tf.data.Dataset.repeat 호출)를 권장합니다.

또는 주기적으로 체크포인트를 읽고 평가를 실행하는 다른 작업을 생성할 수도 있습니다. 이것이 Estimator가 하는 일입니다. 그러나 이는 권장되는 평가 방법이 아니므로 자세한 내용은 생략합니다.

공연

다중 작업자 훈련의 성능을 조정하기 위해 다음을 시도할 수 있습니다.

  • tf.distribute.MultiWorkerMirroredStrategy는 다수의 집합 통신 구현을 제공합니다.

    • RING은 호스트 사이의 통신 레이어로 gRPC를 사용하여 링 기반 집합체를 구현합니다.
    • NCCLNVIDIA 집합 통신 라이브러리를 사용하여 집합체를 구현합니다.
    • AUTO는 선택을 런타임으로 미룹니다.

    최선의 집합체 구현 선택은 클러스터의 GPU 수, GPU 유형 및 네트워크 상호 연결에 따라 다릅니다. 자동 선택을 재정의하려면 MultiWorkerMirroredStrategy 생성자의 communication_options 매개변수를 지정합니다. 예를 들면 다음과 같습니다.

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
    
  • 가능한 경우 변수를 tf.float로 캐스팅합니다.

    • 공식 ResNet 모델에는 이를 수행하는 방법의 가 포함되어 있습니다.

내결함성

동기식 훈련에서 작업자 중 하나가 실패하고 실패 복구 메커니즘이 없으면 클러스터가 실패합니다.

tf.distribute.Strategy와 함께 Keras를 사용하면 작업자가 작동하지 않거나 불안정한 경우 내결함성의 이점이 있습니다. 이를 위해 이전에 실패했거나 선점한 인스턴스를 다시 시작할 때 훈련 상태가 복구되도록 선택한 분산 파일 시스템에서 훈련 상태를 유지할 수 있습니다.

작업자를 사용할 수 없게 되면 다른 작업자가 실패합니다(시간 초과 후). 이러한 경우 사용할 수 없는 작업자는 물론 실패한 다른 작업자도 다시 시작해야 합니다.

참고: 이전에는 다중 작업자 훈련 작업 중 장애로 인해 훈련을 재시작할 경우 ModelCheckpoint 콜백이 훈련 상태를 복구하는 메커니즘을 제공했습니다. TensorFlow 팀은 일관성 있는 경험을 위해 단일 작업자 훈련 지원을 추가로 제공하고자 BackupAndRestore 콜백을 새롭게 도입하였으며, 기존 ModelCheckpoint 콜백의 내결함성 기능을 제거했습니다. 이제 해당 동작을 사용하는 애플리케이션은 새 BackupAndRestore 콜백으로 마이그레이션해야 합니다.

ModelCheckpoint 콜백

ModelCheckpoint 콜백은 더 이상 내결함성 기능을 제공하지 않습니다. 대신 BackupAndRestore 콜백을 사용하세요.

ModelCheckpoint 콜백은 여전히 체크포인트를 저장하는 데 사용할 수 있습니다. 하지만 이를 통해 훈련이 중단되거나 성공적으로 종료된 경우 체크포인트에서 훈련을 계속하기 위해 사용자가 수동으로 모델을 로드해야 합니다.

필요한 경우, ModelCheckpoint 콜백 외부에서 모델/가중치를 저장하고 복원하도록 선택할 수 있습니다.

모델 저장 및 로드하기

model.save 또는 tf.saved_model.save를 사용하여 모델을 저장하려면 작업자마다 저장 대상이 달라야 합니다.

  • 수석 작업자가 아닌 경우 모델을 임시 디렉터리에 저장해야 합니다.
  • 수석 작업자의 경우 제공된 모델 디렉터리에 저장해야 합니다.

여러 작업자가 동일한 위치에 쓰려고 하여 오류가 발생하는 것을 방지하기 위해 작업자의 임시 디렉터리는 고유해야 합니다.

모든 디렉터리에 저장되는 모델은 동일하며, 일반적으로 수석 작업자가 저장한 모델만 복원 또는 제공을 위해 참조해야 합니다.

훈련이 완료되면 작업자가 만든 임시 디렉터리를 삭제하는 정리 논리가 있어야 합니다.

수석 작업자와 작업자에서 동시에 저장하는 이유는 체크포인트 동안 변수를 집계해야 할 수 있고, 이를 위해 수석 작업자와 작업자가 모두 allreduce 통신 프로토콜에 참여해야 하기 때문입니다. 반면, 수석 작업자와 작업자를 같은 모델 디렉터리에 저장하도록 하면 경합으로 인해 오류가 발생합니다.

MultiWorkerMirroredStrategy를 사용하면 프로그램이 모든 작업자에서 실행되고 현재 작업자가 책임 작업자인지 여부를 알기 위해 task_typetask_id 특성을 갖는 클러스터 확인자 객체가 이용됩니다.

  • task_type은 현재 작업이 무엇인지 알려줍니다(예: 'worker').
  • task_id는 작업자의 식별자를 알려줍니다.
  • task_id == 0인 작업자가 수석 작업자로 지정됩니다.

아래 코드 스니펫에서 write_filepath 함수는 작업자의 task_id에 따라 쓸 파일 경로를 제공합니다.

  • 수석 작업자(task_id == 0)의 경우 원래 파일 경로에 씁니다.
  • 다른 작업자의 경우 쓸 디렉터리 경로에 task_id가 있는 임시 디렉터리인 temp_dir가 만들어집니다.
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configurations.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type` is `None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

이제 저장할 준비가 되었습니다.

multi_worker_model.save(write_model_path)
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op, _update_step_xla while saving (showing 2 of 2). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

위에서 설명한 것처럼 나중에 모델은 책임 작업자가 저장한 파일 경로에서만 로드되어야 됩니다. 따라서 비 책임 작업자가 저장한 임시 항목을 제거하세요.

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

이제 로드할 때 편리한 tf.keras.models.load_model API를 사용하여 추가 작업을 계속합니다.

여기에서는 단일 작업자만 사용하여 훈련을 로드하고 계속한다고 가정합니다. 이 경우 다른 strategy.scope() 내에서 tf.keras.models.load_model을 호출하지 않습니다(앞서 정의한 바와 같이 strategy = tf.distribute.MultiWorkerMirroredStrategy()임에 유의):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 8ms/step - loss: 2.3017 - accuracy: 0.0852
Epoch 2/2
20/20 [==============================] - 0s 7ms/step - loss: 2.2802 - accuracy: 0.1797
<keras.callbacks.History at 0x7f2717617a60>

체크포인트 저장 및 복원

한편, 체크포인트를 사용하면 전체 모델을 저장하지 않고도 모델의 가중치를 저장하고 복원할 수 있습니다.

여기에서는 모델을 추적하는 하나의 tf.train.Checkpoint를 만들고, 최신 체크포인트만 보존되도록 이를 tf.train.CheckpointManager로 관리합니다.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

CheckpointManager가 설정되면 이제 수석이 아닌 작업자가 저장한 체크포인트를 저장하고 제거할 준비가 된 것입니다.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

이제 모델을 복원해야 할 때 편리한 tf.train.latest_checkpoint 함수를 사용하여 저장된 최신 체크포인트를 찾을 수 있습니다. 체크포인트를 복원한 후 훈련을 계속할 수 있습니다.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-12-15 01:44:15.096852: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-12-15 01:44:15.387474: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 7ms/step - loss: 2.2985 - accuracy: 0.0914
Epoch 2/2
20/20 [==============================] - 0s 7ms/step - loss: 2.2800 - accuracy: 0.1586
<keras.callbacks.History at 0x7f280e3bde50>

BackupAndRestore 콜백

tf.keras.callbacks.BackupAndRestore 콜백은 BackupAndRestore에 대한 backup_dir 인수 아래에 임시 체크포인트 파일의 모델 및 현재 훈련 상태를 백업하여 내결함성 기능을 제공합니다.

참고: Tensorflow 2.9에서는 현재 모델과 훈련 상태가 epoch 경계에서 백업됩니다. tf-nightly 버전과 TensorFlow 2.10부터 BackupAndRestore 콜백은 epoch 또는 단계 경계에서 모델과 훈련 상태를 백업할 수 있습니다. BackupAndRestore는 선택적 save_freq 인수를 허용합니다. save_freq'epoch' 또는 int 값을 허용합니다. save_freq'epoch'로 설정되면 모델은 epoch마다 백업됩니다. save_freq0보다 큰 정수 값으로 설정되면 save_freq개의 배치마다 모델이 백업됩니다.

작업이 중단되고 다시 시작되면 BackupAndRestore 콜백이 마지막 체크포인트를 복원하고 훈련 상태가 마지막으로 저장된 epoch 및 단계의 시작부터 훈련을 계속할 수 있습니다.

이를 사용하려면 Model.fit 호출에서 tf.keras.callbacks.BackupAndRestore의 인스턴스를 제공하세요.

BackupAndRestore 콜백은 CheckpointManager를 사용하여 훈련 상태를 저장하고 복원하는데, 이 때 최신 체크포인트와 함께 기존 체크포인트를 추적하는 체크포인트라는 파일이 생성됩니다. 이러한 이유로 이름 충돌을 피하기 위해 backup_dir는 다른 체크포인트를 저장하는 데 재사용되지 않아야 합니다.

현재 BackupAndRestore 콜백은 전략이 없는 단일 작업자 훈련(MirroredStrategy)과 MultiWorkerMirroredStrategy를 사용하는 다중 작업자 훈련을 지원합니다.

현재 BackupAndRestore 콜백은 전략이 없는 단일 작업자 훈련(MirroredStrategy)과 MultiWorkerMirroredStrategy를 사용하는 다중 작업자 훈련을 지원합니다.

다음은 다중 작업자 훈련 및 단일 작업자 훈련에 대한 두 가지 예제입니다.

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback. The training state 
# is backed up at epoch boundaries by default.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-12-15 01:44:18.504579: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 9ms/step - loss: 2.2887 - accuracy: 0.1348
Epoch 2/3
70/70 [==============================] - 1s 9ms/step - loss: 2.2458 - accuracy: 0.3402
Epoch 3/3
70/70 [==============================] - 1s 9ms/step - loss: 2.1951 - accuracy: 0.4759
<keras.callbacks.History at 0x7f280f4309a0>

BackupAndRestore 콜백의 save_freq 인수가 'epoch'로 설정된 경우 모델은 epoch마다 백업됩니다.

# The training state is backed up at epoch boundaries because `save_freq` is
# set to `epoch`.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-12-15 01:44:23.521369: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 9ms/step - loss: 2.2788 - accuracy: 0.1493
Epoch 2/3
70/70 [==============================] - 1s 9ms/step - loss: 2.2026 - accuracy: 0.3723
Epoch 3/3
70/70 [==============================] - 1s 9ms/step - loss: 2.1214 - accuracy: 0.5451
<keras.callbacks.History at 0x7f280eb3adc0>

참고: 다음 코드 블록은 Tensorflow 2.10이 출시될 때까지 tf-nightly에서만 사용할 수 있는 기능을 사용합니다.

BackupAndRestore 콜백의 save_freq 인수가 0보다 큰 정수 값으로 설정되면 save_freq의 배치마다 모델이 백업됩니다.

# The training state is backed up at every 30 steps because `save_freq` is set
# to an integer value of `30`.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-12-15 01:44:28.537486: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 10ms/step - loss: 2.2870 - accuracy: 0.1674
Epoch 2/3
70/70 [==============================] - 1s 10ms/step - loss: 2.2313 - accuracy: 0.2991
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1604 - accuracy: 0.4556
<keras.callbacks.History at 0x7f280ea3ec40>

BackupAndRestore에서 지정한 backup_dir의 디렉터리를 살펴보면 일시적으로 생성된 일부 체크포인트 파일을 발견할 수 있습니다. 이러한 파일은 이전에 손실된 인스턴스를 복구하는 데 필요하며 훈련이 성공적으로 종료되면 Model.fit 마지막에 라이브러리에서 제거됩니다.

참고: 현재, BackupAndRestore 콜백은 강제 실행 모드만 지원합니다. 그래프 모드에서 위의 모델 저장 및 로드 섹션에서 설명한 대로 Model.save/tf.saved_model.savetf.keras.models.load_model을 각각 모델 저장과 복원에 사용하고 훈련 중 Model.fitinitial_epoch를 제공하는 방법을 고려해 보세요.

추가 자료

  1. TensorFlow에서 분산 훈련하기 가이드는 사용 가능한 분산 전략을 간략히 소개합니다.
  2. Keras 및 MultiWorkerMirroredStrategy를 이용한 사용자 지정 훈련 루프 튜토리얼은 Keras 및 사용자 지정 훈련 루프와 함께 MultiWorkerMirroredStrategy를 이용하는 방법을 보여줍니다.
  3. 여러 분산 전략을 실행하도록 구성할 수 있는 공식 모델을 확인해 보세요.
  4. tf.function으로 성능 향상 가이드는 TensorFlow 모델의 성능을 최적화하는 데 사용할 수 있는 TensorFlow 프로파일러와 같은 다른 전략 및 도구에 대한 정보를 제공합니다.