오늘 현지 TensorFlow Everywhere 이벤트에 참석하세요!
이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

매개 변수 서버 교육

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

개요

매개 변수 서버 훈련 은 여러 머신에서 모델 훈련을 확장하는 일반적인 데이터 병렬 방법입니다. 매개 변수 서버 훈련 클러스터는 작업자와 매개 변수 서버로 구성됩니다. 변수는 매개 변수 서버에서 생성되며 각 단계에서 작업자가 읽고 업데이트합니다. 기본적으로 작업자는 서로 동기화하지 않고 이러한 변수를 독립적으로 읽고 업데이트합니다. 이것이 때때로 매개 변수 서버 스타일 훈련을 비동기 훈련이라고하는 이유입니다.

TensorFlow 2 매개 변수 서버 학습은 tf.distribute.experimental.coordinator.ClusterCoordinator 클래스를 통해 중앙 코디네이터를 사용합니다.

이 구현에서 workerparameter server 작업은 코디네이터의 요청을 수신하는 tf.distribute.Server 실행합니다. 코디네이터는 리소스를 생성하고, 교육 작업을 발송하고, 체크 포인트를 작성하고, 작업 실패를 처리합니다.

우리는이 아키텍처와 새로운 ClusterCoordinator 클래스가 더 유연하고 단순한 프로그래밍 모델을 제공한다고 믿습니다.

ClusterCoordinator

ClusterCoordinator 클래스는 tf.distribute.Strategy 객체와 함께 작동해야합니다. 이 tf.distribute.Strategy 객체는 클러스터의 정보를 전달하는 데 필요 하며 MirroredStrategy 사용한 사용자 지정 훈련 에서 본 것처럼 훈련 단계를 정의하는 데 사용됩니다. 그런 다음 ClusterCoordinator 개체는 이러한 교육 단계의 실행을 원격 작업자에게 전달합니다. 현재 ClusterCoordinatortf.distribute.experimental.ParameterServerStrategy 에서만 작동합니다.

ClusterCoordinator 객체가 제공하는 가장 중요한 API는 schedule 입니다. schedule API는 tf.function 대기열에 tf.function 미래와 유사한 RemoteValue 즉시 반환합니다. 대기중인 함수는 백그라운드 스레드의 원격 작업자에게 발송되고 RemoteValue 는 비동기 적으로 채워집니다. schedule 에는 작업자 할당이 필요하지 않으므로 전달 된 tf.function 은 사용 가능한 모든 작업자에서 실행할 수 있습니다. 실행 된 워커가 완료되기 전에 사용할 수 없게되면 사용 가능한 다른 워커에서 함수가 다시 시도됩니다. 이 사실과 함수 실행이 원자 적이 지 않기 때문에 함수가 두 번 이상 실행될 수 있습니다.

원격 기능을 디스패치하는 것 외에도 ClusterCoordinator 는 모든 작업자에서 데이터 세트를 생성하고 작업자가 실패에서 복구 될 때 이러한 데이터 세트를 다시 빌드하는 데 도움이됩니다.

튜토리얼 설정

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

클러스터 설정

위에서 언급했듯이 매개 변수 서버 학습 클러스터에는 학습 프로그램을 실행하는 코디네이터 작업, TensorFlow 서버를 실행하는 하나 이상의 작업자 및 매개 변수 서버 작업 (예 : tf.distribute.Server , tf.distribute.Server 를 실행하는 추가 평가 작업이 필요합니다. 평가 (아래 사이드카 평가 섹션 참조). 이를 설정하기위한 요구 사항은 다음과 같습니다.

  • 코디네이터 작업은 평가자를 제외한 다른 모든 TensorFlow 서버의 주소와 포트를 알아야합니다.
  • 작업자 및 매개 변수 서버는 수신해야하는 포트를 알아야합니다. 단순화를 위해 일반적으로 이러한 작업에서 TensorFlow 서버를 만들 때 전체 클러스터 정보를 전달합니다.
  • 평가자 작업은 훈련 클러스터의 설정을 알 필요가 없습니다. 그렇다면 학습 클러스터에 연결을 시도하지 않아야합니다.
  • 작업자 및 매개 변수 서버는 각각 "작업자"및 "ps"작업 유형을 가져야합니다. 코디네이터는 레거시 이유로 작업 유형으로 "최고"를 사용해야합니다.

이 튜토리얼에서는 전체 파라미터 서버 훈련이 colab에서 실행될 수 있도록 in-process 클러스터를 생성합니다. 이후 섹션에서 실제 클러스터 를 설정하는 방법을 소개합니다.

In-process 클러스터

이 튜토리얼에서는 미리 TensorFlow 서버를 시작하고 나중에 연결할 것입니다.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

맞춤형 트레이닝 루프로 트레이닝

tf.distribute.Strategy 사용한 사용자 지정 훈련 루프는 훈련 루프를 정의하는 데 큰 유연성을 제공합니다. 현재 TensorFlow 2의 매개 변수 서버 학습에는 커스텀 학습 루프 만 지원됩니다. 여기서는 ParameterServerStrategy 를 사용하여 교육 단계를 정의한 다음 ClusterCoordinator 를 사용하여 원격 작업자에게 교육 단계 실행을 전달합니다.

ParameterServerStrategy 생성

사용자 지정 학습 루프에서 학습 단계를 작성하려면 첫 번째 단계는 ParameterServerStrategy 를 만드는 것입니다. variable_partitioner 나중에 설명하겠습니다.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

그런 다음 다른 tf.distribute.Strategy 와 함께 훈련 루프에서 본 것처럼 모델을 생성하고 데이터 세트 및 단계 함수를 정의합니다. 이 자습서 에서 자세한 내용을 찾을 수 있습니다. 다음 단계에서 이러한 구성 요소를 만들어 보겠습니다.

데이터 설정

먼저 Keras 전처리 계층에서 구현 한 전처리 로직을 포함하는 데이터 세트를 생성하는 함수를 작성합니다. 우리는 외부의 이러한 층이 생성됩니다 dataset_fn 하지만, 내부의 변환을 적용 dataset_fn 당신이 포장하기 때문에 dataset_fntf.function 변수가 그 안에 생성 할 수 없습니다.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

데이터 세트에 장난감 예제를 생성합니다.

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

그런 다음 dataset_fn에 래핑 된 훈련 데이터 세트를 만듭니다.

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

모델 구축

둘째, 모델 및 기타 개체를 만듭니다. strategy.scope 아래에 모든 변수를 만들어야합니다.

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

훈련 단계 정의

셋째, tf.function 래핑 된 학습 단계를 만듭니다.

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

위의 단계 함수에서 step_fn 에서 strategy.runstrategy.reduce 를 호출하면 현재로서는 사소한 구현이 있지만 향후 GPU 또는 다중 복제본 작업자를 지원하는 데 유용합니다.

원격 작업자에게 교육 단계 파견

모든 계산이 ParameterServerStrategy 에 의해 정의 된 후에는 ClusterCoordinator 클래스를 사용하여 리소스를 생성하고 교육 단계를 원격 작업자에게 배포합니다.

먼저 ClusterCoordinator 개체를 만들고 전략 개체를 전달하겠습니다.

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

그런 다음 작업자 별 데이터 세트와 반복자를 만듭니다. 에서 per_worker_dataset_fn 아래의 포장 dataset_fnstrategy.distribute_datasets_from_function 선택 사항이지만 그것의 GPU가 지원하는 경우 미래에 완벽의 GPU에 효율적인 프리 페치를 지원하는 수 ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

마지막 단계는 schedule 사용하여 원격 작업자에게 계산을 배포하는 것입니다. schedule 메소드는 tf.function 대기열에 tf.function 하고 미래와 유사한 RemoteValue 즉시 반환합니다. 대기중인 함수는 백그라운드 스레드의 원격 작업자에게 발송되고 RemoteValue 는 비동기 적으로 채워집니다. join 방법은 예약 된 모든 기능이 실행될 때까지 대기하는 데 사용할 수 있습니다.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

RemoteValue 의 결과를 가져 오는 방법은 다음과 같습니다.

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

또는 모든 단계를 시작하고 완료를 기다리는 동안 작업을 수행 할 수 있습니다.

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

이 특정 예에 대한 전체 교육 및 제공 워크 플로는이 테스트 를 확인하세요.

데이터 세트 생성에 대한 추가 정보

위 코드의 데이터 세트는 create_per_worker_dataset API를 사용하여 생성됩니다. 작업 자당 하나의 데이터 세트를 만들고 컨테이너 개체를 반환합니다. iter 메서드를 호출하여 작업자 별 반복자를 만들 수 있습니다. 작업 자당 반복기는 작업 자당 하나의 반복자를 포함하며 특정 작업자에서 함수가 실행되기 전에 schedule 메서드에 전달 된 함수의 입력 인수에서 작업자의 해당 슬라이스가 대체됩니다.

현재 schedule 방법은 작업자가 동등하다고 가정하므로 다른 작업자의 데이터 세트가 dataset.shuffle 작업을 포함하는 경우 다르게 셔플 될 수 있다는 점을 제외하고는 동일하다고 가정합니다. 따라서 데이터 세트의 OutOfRangeError 에 의존하는 대신 데이터 세트를 무기한 반복하고 한정된 수의 단계를 예약하는 것이 좋습니다.

또 다른 중요한 참고 사항은 tf.data 데이터 세트가 작업 경계에서 암시 적 직렬화 및 역 직렬화를 지원하지 않는다는 것입니다. 따라서 create_per_worker_dataset 전달 된 함수 내에서 전체 데이터 세트를 만드는 것이 중요합니다.

가변 샤딩

변수 샤딩은 변수를 여러 개의 작은 변수로 분할하는 것을 말합니다. 이러한 작은 변수를 샤드라고 합니다. 가변 샤딩은 이러한 샤드에 액세스 할 때 네트워크 부하를 분산하는 데 유용 할 수 있습니다. 또한 여러 매개 변수 서버에 걸쳐 일반 변수의 계산 및 저장을 분산하는 데 유용합니다.

변수 샤딩을 활성화하려면 ParameterServerStrategy 개체를 생성 할 때 variable_partitioner 전달할 수 있습니다. variable_partitioner 는 변수가 생성 될 때마다 호출되며 변수의 각 차원을 따라 샤드 수를 반환 할 것으로 예상됩니다. tf.distribute.experimental.partitioners.FixedShardsPartitioner 와 같은 일부 기본 variable_partitioner 가 제공됩니다.

위의 예에서는 모든 변수를 두 개의 샤드로 분할하고 각 샤드가 다른 매개 변수 서버에 할당되는 FixedShardsPartitioner 를 사용합니다.

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

variable_partitioner 가 전달되고 strategy.scope() 바로 아래에 변수를 만들면 샤드 목록에 대한 액세스를 제공하는 variables 속성이있는 컨테이너 유형이됩니다. 대부분의 경우이 컨테이너는 모든 샤드를 연결하여 자동으로 Tensor로 변환됩니다. 결과적으로 일반 변수로 사용할 수 있습니다. 반면에 tf.nn.embedding_lookup 과 같은 일부 TensorFlow 메서드는이 컨테이너 유형에 대한 효율적인 구현을 제공하며 이러한 메서드에서는 자동 연결이 방지됩니다.

자세한 내용은 ParameterServerStrategy 의 API 문서 문자열을 참조하세요.

평가

분산 교육에서 평가 루프를 정의하고 실행하는 방법은 여러 가지가 있습니다. 각각은 아래에 설명 된대로 고유 한 장단점이 있습니다. 선호 사항이없는 경우 인라인 평가 방법을 사용하는 것이 좋습니다.

인라인 평가

이 방법에서 코디네이터는 교육과 평가를 번갈아 가며 인라인 평가라고합니다. 인라인 평가에는 몇 가지 이점이 있습니다. 예를 들어 단일 작업이 보유 할 수없는 대규모 평가 모델 및 평가 데이터 세트를 지원할 수 있습니다. 다른 예로, 평가 결과를 사용하여 다음 세대 교육을위한 결정을 내릴 수 있습니다.

인라인 평가를 구현하는 두 가지 방법이 있습니다.

  • 직접 평가 -소규모 모델 및 평가 데이터 세트의 경우 코디네이터는 코디네이터의 평가 데이터 세트를 사용하여 분산 모델에서 직접 평가를 실행할 수 있습니다.
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • 분산 평가 -코디네이터에서 직접 실행할 수없는 대규모 모델 또는 데이터 세트의 경우 코디네이터 작업은 schedule / join 방법을 통해 평가 작업을 작업자에게 배포 할 수 있습니다.
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

사이드카 평가

또 다른 방법은 검사 점을 반복적으로 읽고 최신 검사 점에서 평가를 실행하는 전용 평가자 작업을 만드는 사이드카 평가입니다. 평가 결과에 따라 교육 루프를 변경할 필요가없는 경우 교육 프로그램을 조기에 마칠 수 있습니다. 그러나 평가를 트리거하려면 추가 평가자 작업과주기적인 체크 포인트가 필요합니다. 다음은 가능한 사이드카 평가 루프입니다.

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

실제 세계의 클러스터

실제 프로덕션 환경에서는 다른 컴퓨터의 다른 프로세스에서 모든 작업을 실행합니다. 각 작업에 대한 클러스터 정보를 구성하는 가장 간단한 방법은 "TF_CONFIG"환경 변수를 설정하고 TFConfigClusterResolver 를 사용하여 "TF_CONFIG"를 구문 분석하는 것입니다. "TF_CONFIG"환경 변수에 대한 일반적인 설명은 분산 교육 가이드 를 참조하세요.

Kubernetes 또는 기타 구성 템플릿을 사용하여 학습 작업을 시작하는 경우 이러한 템플릿이 이미 "TF_CONFIG"를 설정했을 가능성이 큽니다.

“TF_CONFIG”환경 변수 설정

3 개의 작업자와 2 개의 매개 변수 서버가 있다고 가정하면 작업자 1의 "TF_CONFIG"는 다음과 같을 수 있습니다.

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

평가자의“TF_CONFIG”는 다음과 같을 수 있습니다.

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

평가자에 대한 위의 "TF_CONFIG"문자열에서 "클러스터"부분은 선택 사항입니다.

모든 작업에 동일한 바이너리를 사용하는 경우

단일 바이너리를 사용하여 이러한 모든 작업을 실행하려면 처음에 프로그램이 다른 역할로 분기되도록해야합니다.

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

다음 코드는 TensorFlow 서버를 시작하고 대기합니다.

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

작업 실패 처리

작업자 실패

위에서 언급했듯이 ClusterCoordinator 에는 작업자 실패에 대한 기본 제공 내결함성이 있습니다. 작업자 복구시에 의해 만들어진 데이터 세트의 대응하는 슬라이스 create_per_worker_dataset 범위 여전히 원래 호출하여 재현 될 dataset_fn 에 전달 create_per_worker_dataset .

매개 변수 서버 또는 코디네이터 오류

그러나 코디네이터가 매개 변수 서버 오류를 발견하면 즉시 UnavailableError 또는 AbortedError . 이 경우 코디네이터를 다시 시작할 수 있습니다. 코디네이터 자체도 사용할 수 없게 될 수 있습니다. 따라서 학습 진행률을 많이 잃지 않으려면 학습을 시작하기 전에 모델 변수를 주기적으로 체크 포인트하고 체크 포인트에서 모델 변수를로드하는 것이 중요합니다. 최적화 프로그램이 체크 포인트 optimizer.iterations 경우 학습 진행률은 대략 optimizer.iterations 에서 추론 할 수 있습니다.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue 가져 오기

RemoteValue 가져 오는 것은 함수가 성공적으로 실행되면 성공할 수 있습니다. 이는 현재 반환 값이 함수가 실행 된 후 즉시 코디네이터에 복사되기 때문입니다. 복사 중에 작업자 오류가 발생하면 사용 가능한 다른 작업자에서 함수가 다시 시도됩니다. 따라서 성능을 최적화하려는 경우 반환 값없이 함수를 예약 할 수 있습니다.

오류보고

코디네이터가 매개 변수 서버의 UnavailableError 와 같은 오류 또는 tf.debugging.check_numericsInvalidArgument 와 같은 기타 애플리케이션 오류를 확인하면 오류를 발생시키기 전에 보류 중이거나 대기중인 모든 함수를 취소합니다. 해당 RemoteValue 가져 RemoteValue CancelledError 합니다.

오류가 발생한 후 코디네이터는 취소 된 함수에서 동일한 오류나 오류를 발생시키지 않습니다.

성능 개량

ParameterServerStrategyClusterResolver 하여 훈련 할 때 성능 문제가 발생하면 몇 가지 가능한 이유가 있습니다.

한 가지 일반적인 이유는 매개 변수 서버의 부하가 불균형하고 일부 과부하 매개 변수 서버가 용량에 도달했기 때문입니다. 또한 여러 가지 근본 원인이있을 수 있습니다. 이 문제를 완화하는 몇 가지 간단한 방법은 다음과 같습니다.

  1. 지정을 통해 대형 모델 변수를 샤딩 variable_partitioner 구성 할 때 ParameterServerStrategy .
  2. 가능하면 모든 매개 변수 서버에 필요한 핫스팟 변수를 한 번에 생성하지 마십시오. 예를 들어, 최적화 프로그램에서 일정한 학습률 또는 하위 클래스 tf.keras.optimizers.schedules.LearningRateSchedule 을 사용합니다. 기본 동작은 학습률이 특정 매개 변수 서버에 배치되고 각 단계에서 다른 모든 매개 변수 서버가 요청하는 변수가되기 때문입니다. .
  3. 큰 어휘를 Keras 전처리 레이어로 전달하기 전에 섞습니다.

성능 문제의 또 다른 가능한 이유는 코디네이터입니다. schedule / join 의 첫 번째 구현은 Python 기반이므로 스레딩 오버 헤드가있을 수 있습니다. 또한 코디네이터와 작업자 간의 지연 시간이 클 수 있습니다. 이 경우 여러 단계를 단일 tf.function 할 수 있습니다.

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

코디네이터를 계속 최적화 할 것이며, 대부분의 사용자는 앞으로 수동으로 단계를 포장 할 필요가 없습니다.

또한 성능 향상을위한 작은 비결은 위의 작업 실패 처리 섹션에서 설명한대로 반환 값없이 함수를 예약하는 것입니다.

알려진 제한

알려진 제한의 대부분은 위 섹션에서 다룹니다. 요약은 다음과 같습니다.

  • os.environment["grpc_fail_fast"]="use_caller" 는 내결함성이 제대로 작동하도록 조정자를 포함한 모든 작업에 필요합니다.
  • GPU 작업자는 지원되지 않습니다.
  • 동기 매개 변수 서버 훈련은 지원되지 않습니다.
  • ParameterServerStrategy 는 Keras compilefit API에서 작동하지 않습니다.
  • ClusterCoordinator.schedule 은 데이터 세트에 대한 방문 보장을 지원하지 않습니다.
  • ClusterCoordinator.create_per_worker_dataset 를 사용하는 경우 전달 된 함수 내에서 전체 데이터 세트를 생성해야합니다.
  • 최적의 성능을 얻으려면 일반적으로 여러 단계를 단일 기능으로 압축해야합니다.
  • 샤딩 된 변수가 포함 된 tf.saved_model.load를 통해 tf.saved_model.load 을로드하는 것은 지원되지 않습니다. TensorFlow Serving을 사용하여 이러한 saved_model을로드하는 것은 작동 할 것으로 예상됩니다.
  • 샤딩 된 옵티 마이저 슬롯 변수가 포함 된 체크 포인트를 다른 샤드 수로로드하는 것은 지원되지 않습니다.
  • 코디네이터 태스크를 다시 시작하지 않고 매개 변수 서버 실패에서 복구하는 것은 지원되지 않습니다.