ParameterServerStrategy를 사용한 매개변수 서버 교육

컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요.

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

개요

매개변수 서버 교육 은 여러 머신에서 모델 교육을 확장하는 일반적인 데이터 병렬 방법입니다.

매개변수 서버 교육 클러스터는 작업자매개변수 서버 로 구성됩니다. 변수는 매개변수 서버에서 생성되며 각 단계에서 작업자가 읽고 업데이트합니다. 기본적으로 작업자는 이러한 변수를 서로 동기화하지 않고 독립적으로 읽고 업데이트합니다. 이것이 때때로 매개변수 서버 스타일 교육을 비동기 교육 이라고 하는 이유입니다.

TensorFlow 2에서 매개변수 서버 교육은 tf.distribute.experimental.ParameterServerStrategy 클래스에 의해 구동되며, 이 클래스는 교육 단계를 수천 명의 작업자로 확장되는 클러스터에 배포합니다(매개변수 서버와 함께 제공됨).

지원되는 교육 방법

지원되는 두 가지 주요 교육 방법이 있습니다.

작업 및 태스크가 있는 클러스터

선택한 API( Model.fit 또는 사용자 지정 교육 루프)에 관계없이 TensorFlow 2의 분산 교육에는 여러 'jobs' 이 있는 'cluster' 가 포함되며 각 작업에는 하나 이상의 'tasks' 이 있을 수 있습니다.

매개변수 서버 교육을 사용할 때 다음을 수행하는 것이 좋습니다.

  • 하나의 조정자 작업(작업 이름이 chief )
  • 여러 작업자 작업(작업 이름 worker ); 그리고
  • 다중 매개변수 서버 작업(작업 이름 ps )

코디네이터 가 리소스를 생성하고, 교육 작업을 발송하고, 체크포인트를 작성하고, 작업 실패를 처리하는 동안 작업자매개변수 서버 는 코디네이터의 요청을 수신하는 tf.distribute.Server 를 실행합니다.

Model.fit API를 사용한 매개변수 서버 교육

Model.fit API를 사용한 매개변수 서버 교육을 위해서는 코디네이터가 tf.distribute.experimental.ParameterServerStrategy 개체와 tf.keras.utils.experimental.DatasetCreator 를 입력으로 사용해야 합니다. 전략이 없거나 다른 전략이 있는 Model.fit 사용과 유사하게, 워크플로에는 모델 생성 및 컴파일, 콜백 준비, Model.fit 호출이 포함됩니다.

커스텀 트레이닝 루프를 사용한 파라미터 서버 트레이닝

사용자 지정 교육 루프에서 tf.distribute.experimental.coordinator.ClusterCoordinator 클래스는 코디네이터에 사용되는 핵심 구성 요소입니다.

ClusterCoordinator 객체에서 제공하는 가장 중요한 API는 schedule 입니다.

  • schedule API는 tf.function 을 대기열에 넣고 미래와 같은 RemoteValue 를 즉시 반환합니다.
  • 대기 중인 함수는 백그라운드 스레드의 원격 작업자에게 전달되고 해당 RemoteValue 는 비동기식으로 채워집니다.
  • schedule 에는 작업자 할당이 필요하지 않으므로 전달된 tf.function 은 사용 가능한 모든 작업자에서 실행할 수 있습니다.
  • 실행된 작업자가 완료되기 전에 사용할 수 없게 되면 사용 가능한 다른 작업자에서 기능이 재시도됩니다.
  • 이 사실과 함수 실행이 원자적이지 않다는 사실 때문에 함수는 두 번 이상 실행될 수 있습니다.

원격 기능을 디스패치하는 것 외에도 ClusterCoordinator 는 모든 작업자에 대한 데이터 세트를 생성하고 작업자가 장애에서 복구할 때 이러한 데이터 세트를 재구축하는 데 도움이 됩니다.

튜토리얼 설정

자습서는 Model.fit 및 사용자 지정 교육 루프 경로로 분기되며 필요에 맞는 경로를 선택할 수 있습니다. "Training with X" 이외의 섹션은 두 경로 모두에 적용할 수 있습니다.

pip install portpicker

클러스터 설정

위에서 언급했듯이 매개변수 서버 교육 클러스터에는 교육 프로그램을 실행하는 조정자 작업, TensorFlow 서버 tf.distribute.Server 를 실행하는 하나 이상의 작업자 및 매개변수 서버 작업 및 사이드카 평가를 실행하는 추가 평가 작업이 필요합니다. (아래 사이드카 평가 섹션 참조). 설정 요구 사항은 다음과 같습니다.

  • 코디네이터 작업은 평가자를 제외한 다른 모든 TensorFlow 서버의 주소와 포트를 알아야 합니다.
  • 작업자와 매개변수 서버는 수신 대기해야 하는 포트를 알아야 합니다. 단순성을 위해 일반적으로 이러한 작업에서 TensorFlow 서버를 생성할 때 전체 클러스터 정보를 전달할 수 있습니다.
  • 평가자 작업은 훈련 클러스터의 설정을 알 필요가 없습니다. 그렇다면 훈련 클러스터에 연결을 시도해서는 안 됩니다.
  • 작업자 및 매개변수 서버는 각각 "worker""ps" 작업 유형을 가져야 합니다. 조정자는 레거시 이유로 작업 유형으로 "chief" 을 사용해야 합니다.

이 튜토리얼에서는 전체 매개변수 서버 교육을 Colab에서 실행할 수 있도록 in-process 클러스터를 생성합니다. 이후 섹션에서 실제 클러스터 를 설정하는 방법을 배웁니다.

진행 중인 클러스터

먼저 여러 TensorFlow 서버를 만들고 나중에 연결합니다. 이것은 이 튜토리얼의 데모를 위한 것이며 실제 교육에서는 "worker""ps" 머신에서 서버가 시작됩니다.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

in-process 클러스터 설정은 here 와 같은 단위 테스트에서 자주 사용됩니다.

로컬 테스트를 위한 또 다른 옵션은 로컬 시스템에서 프로세스를 시작하는 것입니다. 이 접근 방식의 예는 Keras를 사용한 다중 작업자 교육을 확인하세요.

ParameterServerStrategy 인스턴스화

교육 코드를 살펴보기 전에 ParameterServerStrategy 개체를 인스턴스화해 보겠습니다. Model.fit 또는 사용자 정의 훈련 루프를 진행하는지 여부에 관계없이 이것은 필요합니다. variable_partitioner 인수는 변수 샤딩 섹션 에서 설명합니다.

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

훈련에 GPU를 사용하려면 각 작업자에게 표시되는 GPU를 할당합니다. ParameterServerStrategy 는 각 작업자에서 사용 가능한 모든 GPU를 사용하며 모든 작업자는 동일한 수의 GPU를 사용할 수 있어야 합니다.

가변 샤딩

변수 샤딩은 변수를 샤드 라고 하는 여러 개의 작은 변수로 나누는 것을 말합니다. 가변 샤딩은 이러한 샤드에 액세스할 때 네트워크 부하를 분산하는 데 유용할 수 있습니다. 여러 매개변수 서버에 일반 변수의 계산 및 저장을 분산하는 것도 유용합니다.

변수 샤딩을 활성화하려면 ParameterServerStrategy 개체를 생성할 때 variable_partitioner 를 전달할 수 있습니다. variable_partitioner 는 변수가 생성될 때마다 호출되며 변수의 각 차원을 따라 샤드 수를 반환할 것으로 예상됩니다. tf.distribute.experimental.partitioners.MinSizePartitioner 와 같은 일부 기본 variable_partitioner 가 제공됩니다. tf.distribute.experimental.partitioners.MinSizePartitioner 와 같은 크기 기반 파티셔너를 사용하여 모델 교육 속도에 부정적인 영향을 줄 수 있는 작은 변수의 파티셔닝을 방지하는 것이 좋습니다.

variable_partitioner 가 전달되고 strategy.scope() 바로 아래에 변수를 생성하면 샤드 목록에 대한 액세스를 제공하는 variables 속성이 있는 컨테이너 유형이 됩니다. 대부분의 경우 이 컨테이너는 모든 샤드를 연결하여 자동으로 Tensor로 변환됩니다. 결과적으로 일반 변수로 사용할 수 있습니다. 반면에 tf.nn.embedding_lookup 과 같은 일부 TensorFlow 메서드는 이 컨테이너 유형에 대한 효율적인 구현을 제공하며 이러한 메서드에서는 자동 연결이 방지됩니다.

자세한 내용은 tf.distribute.experimental.ParameterServerStrategy 의 API 문서를 참조하세요.

Model.fit 으로 훈련하기

Keras는 재정의 가능한 train_step의 유연성과 train_step 에 대한 체크포인트 저장 또는 요약 저장과 같은 기능을 제공하는 콜백과 함께 후드 아래에서 교육 루프를 처리하는 Model.fit 을 통해 사용하기 쉬운 교육 API를 제공합니다. Model.fit 을 사용하면 전략 개체를 간단히 교체하여 동일한 교육 코드를 다른 전략에 사용할 수 있습니다.

입력 데이터

매개변수 서버 훈련이 있는 Model.fit 은 입력 데이터가 tf.distribute.InputContext 유형의 단일 인수를 취하고 tf.distribute.InputContext 을 반환하는 tf.data.Dataset 에 제공되어야 합니다. 그런 다음 이러한 callable 을 사용하는 tf.keras.utils.experimental.DatasetCreator 객체를 input_options 인수를 통해 선택적 tf.distribute.InputOptions 객체를 만듭니다.

매개변수 서버 교육으로 데이터를 섞고 반복하고 라이브러리가 Epoch 경계를 알 수 있도록 fit 호출에서 steps_per_epoch 를 지정하는 것이 좋습니다.

InputContext 인수에 대한 자세한 내용은 분산 입력 자습서를 참조하세요.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn 의 코드는 각 작업자 컴퓨터의 입력 장치(일반적으로 CPU)에서 호출됩니다.

모델 구성 및 컴파일

이제 tf.keras.models.Sequential tf.keras.Model )을 만든 다음 옵티마이저, 메트릭 또는 steps_per_execution 과 같은 매개변수와 같은 구성 요소를 통합하기 위해 Model.compile 호출을 수행합니다.

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

콜백 및 교육

실제 훈련을 위해 model.fit 을 호출하기 전에 다음과 같은 일반적인 작업에 필요한 콜백을 준비합시다.

  • ModelCheckpoint : 모델 가중치를 저장합니다.
  • BackupAndRestore : 학습 진행 상황이 자동으로 백업되고 클러스터에 사용할 수 없는 경우(예: 중단 또는 선점) 복구되도록 합니다. 또는
  • TensorBoard : 진행 보고서를 TensorBoard 도구에서 시각화되는 요약 파일에 저장합니다.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

ClusterCoordinator 로 직접 사용(선택 사항)

Model.fit 교육 경로를 선택하더라도 tf.distribute.experimental.coordinator.ClusterCoordinator 객체를 선택적으로 인스턴스화하여 작업자에서 실행하려는 다른 기능을 예약할 수 있습니다. 자세한 내용과 예제 는 맞춤형 훈련 루프를 사용한 훈련 섹션을 참조하십시오.

맞춤형 훈련 루프를 사용한 훈련

tf.distribute.Strategy 와 함께 맞춤형 훈련 루프를 사용하면 훈련 루프를 정의할 수 있는 유연성이 뛰어납니다. 위에서 정의한 ParameterServerStrategy ( strategy )로 tf.distribute.experimental.coordinator.ClusterCoordinator 를 사용하여 원격 작업자에게 훈련 단계 실행을 디스패치합니다.

그런 다음 다른 tf.distribute.Strategy 를 사용하여 훈련 루프에서 했던 것처럼 모델을 만들고 데이터 세트와 단계 함수를 정의합니다. tf.distribute.Strategy를 사용한 사용자 지정 교육 자습서에서 자세한 내용을 찾을 수 있습니다.

효율적인 데이터세트 미리 가져오기를 위해 아래 원격 작업자에게 교육 단계 파견 섹션에 언급된 권장 분산 데이터세트 생성 API를 사용하세요. 또한 작업자에게 할당된 GPU를 최대한 활용하려면 worker_fn 내부에서 Strategy.run 을 호출해야 합니다. 나머지 단계는 GPU가 있거나 없는 훈련에 대해 동일합니다.

다음 단계에서 이러한 구성 요소를 생성해 보겠습니다.

데이터 설정

먼저, Keras 전처리 레이어 에 의해 구현된 전처리 로직을 포함하는 데이터 세트를 생성하는 함수를 작성하십시오.

dataset_fn 외부에 이러한 레이어를 생성하지만 tf.function dataset_fn dataset_fn 하기 때문입니다.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

데이터세트에서 장난감 예시 생성:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

그런 다음 dataset_fn 으로 래핑된 훈련 데이터 세트를 생성합니다.

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

모델 구축

다음으로 모델 및 기타 개체를 만듭니다. strategy.scope 아래에 모든 변수를 생성해야 합니다.

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

FixedShardsPartitioner 를 사용하면 모든 변수가 두 개의 샤드로 분할되고 각 샤드가 서로 다른 매개변수 서버에 할당되었는지 확인합니다.

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

훈련 단계 정의

셋째, tf.function 으로 래핑된 훈련 단계를 생성합니다.

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

위 훈련 단계 함수에서 step_fn 에서 Strategy.runStrategy.reduce 를 호출하면 작업자당 여러 GPU를 지원할 수 있습니다. 작업자에게 GPU가 할당된 경우 Strategy.run 은 데이터 세트를 여러 복제본에 배포합니다.

원격 작업자에게 교육 단계 디스패치

모든 계산이 ParameterServerStrategy 에 의해 정의된 후 tf.distribute.experimental.coordinator.ClusterCoordinator 클래스를 사용하여 리소스를 생성하고 교육 단계를 원격 작업자에게 배포합니다.

먼저 ClusterCoordinator 개체를 만들고 전략 개체를 전달해 보겠습니다.

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

그런 다음 작업자별 데이터 세트와 반복자를 만듭니다. 아래의 per_worker_dataset_fn 에서 dataset_fnstrategy.distribute_datasets_from_function .distribute_datasets_from_function으로 래핑하는 것은 GPU로 원활하게 효율적인 프리페치를 허용하기 위해 권장됩니다.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

마지막 단계는 ClusterCoordinator.schedule 을 사용하여 원격 작업자에게 계산을 배포하는 것입니다.

  • schedule 메서드는 tf.function 을 큐에 넣고 미래와 같은 RemoteValue 를 즉시 반환합니다. 대기 중인 함수는 백그라운드 스레드의 원격 작업자에게 전달되고 RemoteValue 는 비동기식으로 채워집니다.
  • join 방법( ClusterCoordinator.join )은 예약된 모든 기능이 실행될 때까지 대기하는 데 사용할 수 있습니다.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

RemoteValue 의 결과를 가져오는 방법은 다음과 같습니다.

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

또는 모든 단계를 시작하고 완료를 기다리는 동안 작업을 수행할 수 있습니다.

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

이 특정 예에 대한 전체 교육 및 제공 워크플로는 이 테스트 를 확인하세요.

데이터세트 생성에 대해 자세히 알아보기

위 코드의 데이터셋은 ClusterCoordinator.create_per_worker_dataset API를 사용하여 생성됩니다. 작업자당 하나의 데이터세트를 생성하고 컨테이너 개체를 반환합니다. iter 메서드를 호출하여 작업자별 반복자를 만들 수 있습니다. 작업자당 반복자는 작업자당 하나의 반복자를 포함하고 작업자의 해당 슬라이스는 특정 작업자에서 함수가 실행되기 전에 ClusterCoordinator.schedule 메서드에 전달된 함수의 입력 인수에서 대체됩니다.

현재 ClusterCoordinator.schedule 메서드는 작업자가 동등하다고 가정하므로 Dataset.shuffle 작업이 포함된 경우 다르게 섞일 수 있다는 점을 제외하고 다른 작업자의 데이터 세트가 동일하다고 가정합니다. 이 때문에 데이터 세트를 무기한 반복하고 데이터 세트의 OutOfRangeError 에 의존하는 대신 유한한 수의 단계를 예약하는 것이 좋습니다.

또 다른 중요한 사항은 tf.data 데이터 세트가 작업 경계를 넘어 암시적 직렬화 및 역직렬화를 지원하지 않는다는 것입니다. 따라서 ClusterCoordinator.create_per_worker_dataset 에 전달된 함수 내부에 전체 데이터 세트를 생성하는 것이 중요합니다.

평가

분산 교육에서 평가 루프를 정의하고 실행하는 방법은 여러 가지가 있습니다. 각각은 아래와 같이 장단점이 있습니다. 선호하지 않는 경우 인라인 평가 방법을 권장합니다.

인라인 평가

이 방법에서 코디네이터는 교육과 평가를 번갈아 가며 이를 인라인 평가 라고 합니다.

인라인 평가에는 몇 가지 이점이 있습니다. 예를 들어:

  • 단일 작업이 보유할 수 없는 대규모 평가 모델 및 평가 데이터 세트를 지원할 수 있습니다.
  • 평가 결과는 다음 Epoch 훈련에 대한 결정을 내리는 데 사용할 수 있습니다.

인라인 평가를 구현하는 방법에는 직접 평가와 분산 평가의 두 가지가 있습니다.

  • 직접 평가 : 소규모 모델 및 평가 데이터 세트의 경우 코디네이터는 코디네이터의 평가 데이터 세트를 사용하여 분산 모델에서 직접 평가를 실행할 수 있습니다.
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • 분산 평가 : 코디네이터에서 직접 실행할 수 없는 대규모 모델 또는 데이터세트의 경우, 코디네이터 작업은 ClusterCoordinator.schedule / ClusterCoordinator.join 메서드를 통해 작업자에게 평가 작업을 배포할 수 있습니다.
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

사이드카 평가

또 다른 방법은 체크포인트를 반복적으로 읽고 최신 체크포인트에서 평가를 실행하는 전용 평가자 작업을 생성하는 사이드카 평가 입니다. 평가 결과에 따라 교육 루프를 변경할 필요가 없는 경우 교육 프로그램을 조기에 완료할 수 있습니다. 그러나 평가를 트리거하려면 추가 평가자 작업과 주기적인 체크포인트가 필요합니다. 다음은 가능한 사이드카 평가 루프입니다.

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

현실 세계의 클러스터

실제 프로덕션 환경에서는 다른 시스템의 다른 프로세스에서 모든 작업을 실행합니다. 각 작업에 대한 클러스터 정보를 구성하는 가장 간단한 방법은 "TF_CONFIG" 환경 변수를 설정하고 tf.distribute.cluster_resolver.TFConfigClusterResolver 를 사용하여 "TF_CONFIG" 를 구문 분석하는 것입니다.

"TF_CONFIG" 환경 변수에 대한 일반적인 설명은 분산 교육 가이드를 참조하세요.

Kubernetes 또는 기타 구성 템플릿을 사용하여 교육 작업을 시작하는 경우 이러한 템플릿이 이미 “TF_CONFIG" 를 설정했을 가능성이 큽니다.

"TF_CONFIG" 환경 변수 설정

3개의 작업자와 2개의 매개변수 서버가 있다고 가정하고 작업자 1의 "TF_CONFIG" 는 다음과 같을 수 있습니다.

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

평가자의 "TF_CONFIG" 는 다음과 같을 수 있습니다.

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

위의 평가자에 대한 "TF_CONFIG" 문자열의 "cluster" 부분은 선택 사항입니다.

모든 작업에 동일한 바이너리를 사용하는 경우

단일 바이너리를 사용하여 이러한 모든 작업을 실행하는 것을 선호하는 경우 처음에 프로그램을 다른 역할로 분기하도록 해야 합니다.

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

다음 코드는 TensorFlow 서버를 시작하고 기다립니다.

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

작업 실패 처리

작업자 실패

tf.distribute.experimental.coordinator.ClusterCoordinator 또는 Model.fit 은 작업자 실패에 대한 기본 제공 내결함성을 제공합니다. 작업자 복구 시 이전에 제공된 데이터 세트 기능(사용자 지정 교육 루프의 경우 ClusterCoordinator.create_per_worker_dataset 또는 tf.keras.utils.experimental.DatasetCreator 의 경우 Model.fit )이 작업자에서 호출되어 데이터 세트를 다시 생성합니다.

매개변수 서버 또는 코디네이터 오류

그러나 코디네이터는 매개변수 서버 오류를 확인하면 UnavailableError 또는 AbortedError 를 즉시 발생시킵니다. 이 경우 코디네이터를 다시 시작할 수 있습니다. 조정자 자체도 사용할 수 없게 될 수 있습니다. 따라서 교육 진행 상황을 잃지 않기 위해 특정 도구를 사용하는 것이 좋습니다.

  • Model.fit 의 경우 진행률 저장 및 복원을 자동으로 처리하는 BackupAndRestore 콜백을 사용해야 합니다. 예제는 위의 콜백 및 교육 섹션을 참조하세요.

  • 사용자 지정 훈련 루프의 경우 훈련이 시작되기 전에 주기적으로 모델 변수를 체크포인트하고 체크포인트(있는 경우)에서 모델 변수를 로드해야 합니다. 옵티마이저가 체크포인트된 경우 학습 진행 상황은 optimizer.iterations 에서 대략적으로 추론할 수 있습니다.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue 가져오기

함수가 성공적으로 실행되면 RemoteValue 가져오기의 성공이 보장됩니다. 현재 반환값은 함수가 실행된 후 즉시 코디네이터에 복사되기 때문입니다. 복사하는 동안 작업자 오류가 발생하면 사용 가능한 다른 작업자에서 기능이 재시도됩니다. 따라서 성능을 최적화하려면 반환 값 없이 함수를 예약할 수 있습니다.

오류 보고

코디네이터가 매개변수 서버의 UnavailableError 와 같은 오류나 tf.debugging.check_numericsInvalidArgument 와 같은 기타 애플리케이션 오류를 확인하면 오류를 발생시키기 전에 보류 중인 모든 함수와 대기 중인 함수를 취소합니다. 해당 RemoteValue 를 가져오면 CancelledError 가 발생합니다.

오류가 발생한 후 코디네이터는 동일한 오류 또는 취소된 기능의 오류를 발생시키지 않습니다.

성능 개량

ParameterServerStrategyClusterResolver 로 훈련할 때 성능 문제가 발생하는 경우 몇 가지 이유가 있을 수 있습니다.

한 가지 일반적인 이유는 매개변수 서버의 로드 균형이 맞지 않고 로드가 심한 일부 매개변수 서버가 용량에 도달했기 때문입니다. 또한 여러 근본 원인이 있을 수 있습니다. 이 문제를 완화하는 몇 가지 간단한 방법은 다음과 같습니다.

  1. ParameterServerStrategy 를 구성할 때 variable_partitioner 를 지정하여 큰 모델 변수를 분할하십시오.
  2. 가능하면 모든 매개변수 서버에 필요한 핫스팟 변수를 단일 단계로 생성하지 마십시오. 예를 들어, 기본 동작은 학습률이 특정 매개변수 서버에 배치되고 각 단계에서 다른 모든 매개변수 서버에서 요청하는 변수가 되기 때문에 옵티마이저에서 일정한 학습률 또는 하위 클래스 tf.keras.optimizers.schedules.LearningRateSchedule 을 사용합니다. .
  3. Keras 사전 처리 계층에 전달하기 전에 큰 어휘를 섞습니다.

성능 문제의 또 다른 가능한 이유는 조정자입니다. schedule / join 의 첫 번째 구현은 Python 기반이므로 스레딩 오버헤드가 있을 수 있습니다. 또한 조정자와 작업자 사이의 대기 시간이 클 수 있습니다. 만일이 경우라면,

  • Model.fit 의 경우 Model.compile 에 제공된 steps_per_execution 인수를 1보다 큰 값으로 설정할 수 있습니다.

  • 사용자 지정 훈련 루프의 경우 여러 단계를 단일 tf.function 으로 묶을 수 있습니다.

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

라이브러리가 더욱 최적화됨에 따라 앞으로 대부분의 사용자가 수동으로 단계를 압축할 필요가 없기를 바랍니다.

또한 성능 향상을 위한 작은 트릭은 위의 작업 실패 처리 섹션에서 설명한 대로 반환 값 없이 함수를 예약하는 것입니다.

알려진 제한 사항

알려진 제한 사항의 대부분은 이미 위의 섹션에서 다룹니다. 이 섹션에서는 요약을 제공합니다.

ParameterServerStrategy 일반

  • 내결함성이 제대로 작동하려면 os.environment["grpc_fail_fast"]="use_caller" 가 코디네이터를 포함한 모든 작업에 필요합니다.
  • 동기 매개변수 서버 교육은 지원되지 않습니다.
  • 최적의 성능을 달성하려면 일반적으로 여러 단계를 단일 기능으로 압축해야 합니다.
  • 샤딩된 변수를 포함하는 tf.saved_model.load를 통해 tf.saved_model.load 을 로드하는 것은 지원되지 않습니다. TensorFlow Serving을 사용하여 이러한 stored_model을 로드하는 것은 작동할 것으로 예상됩니다.
  • 분할된 옵티마이저 슬롯 변수를 포함하는 체크포인트를 다른 수의 분할로 로드하는 것은 지원되지 않습니다.
  • 코디네이터 작업을 다시 시작하지 않고 매개변수 서버 장애에서 복구하는 것은 지원되지 않습니다.
  • tf.lookup.StaticHashTable ( tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup , tf.keras.layers.TextVectorization 과 같은 일부 Keras 전처리 레이어에서 일반적으로 사용)을 사용하면 리소스가 다음 위치에 배치됩니다. 이때 코디네이터는 매개변수 서버 교육을 받습니다. 이것은 작업자에서 코디네이터로의 조회 RPC에 성능에 영향을 미칩니다. 이것은 현재 해결해야 할 최우선 과제입니다.

Model.fit 사양

  • steps_per_epoch 에는 Model.fit 인수가 필요합니다. Epoch에서 적절한 간격을 제공하는 값을 선택할 수 있습니다.
  • ParameterServerStrategy 는 성능상의 이유로 배치 수준 호출이 있는 사용자 지정 콜백을 지원하지 않습니다. 이러한 호출을 적절하게 선택된 steps_per_epoch 를 사용하여 epoch 수준 호출로 변환하여 모든 steps_per_epoch 단계 수로 호출되도록 해야 합니다. 내장 콜백은 영향을 받지 않습니다. 배치 수준 호출이 성능을 발휘하도록 수정되었습니다. ParameterServerStrategy 에 대한 배치 수준 호출 지원이 계획 중입니다.
  • 같은 이유로 다른 전략과 달리 진행률 표시줄과 메트릭은 Epoch 경계에서만 기록됩니다.
  • run_eagerly 는 지원되지 않습니다.

맞춤형 훈련 루프 사양