질문이있다? TensorFlow 포럼 방문 포럼 에서 커뮤니티와 연결

매개 변수 서버 교육

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

개요

매개 변수 서버 학습 은 여러 머신에서 모델 학습을 확장하는 일반적인 데이터 병렬 방법입니다. 매개 변수 서버 훈련 클러스터는 작업자와 매개 변수 서버로 구성됩니다. 변수는 매개 변수 서버에서 생성되며 각 단계에서 작업자가 읽고 업데이트합니다. 기본적으로 작업자는 서로 동기화하지 않고 이러한 변수를 독립적으로 읽고 업데이트합니다. 이것이 때때로 매개 변수 서버 스타일 훈련을 비동기 훈련이라고하는 이유입니다.

TF2에서 매개 변수 서버 훈련은 tf.distribute.experimental.ParameterServerStrategy 클래스에 의해 구동되며,이 클래스는 훈련 단계를 최대 수천 명의 작업자 (파라미터 서버와 함께)로 확장되는 클러스터에 배포합니다. 지원되는 두 가지 주요 학습 API는 Model.fit 이라고도하는 Model.fit Training API와 Custom Training Loop (CTL)입니다. Model.fit 은 사용자가 높은 수준의 추상화 및 교육 처리를 선호 할 때 권장되는 반면, CTL은 사용자가 교육 루프의 세부 사항을 정의하는 것을 선호 할 때 권장됩니다.

선택한 API에 관계없이 TF2의 분산 교육에는 여러 "작업"이있는 "클러스터"가 포함되며 각 작업에는 하나 이상의 "작업"이있을 수 있습니다. 매개 변수 서버 훈련을 사용하는 경우 하나의 코디네이터 작업 (작업 이름이 chief ), 여러 작업자 작업 (작업 이름 worker ) 및 여러 매개 변수 서버 작업 (작업 이름 ps )이있는 것이 좋습니다.

코디네이터가 리소스를 생성하고, 교육 작업을 발송하고, 체크 포인트를 작성하고, 작업 실패를 처리하는 동안 작업자와 매개 변수 서버는 코디네이터의 요청을 수신하는 tf.distribute.Server 를 실행합니다.

Model.fit API를 사용한 매개 변수 서버 훈련

Model.fit API를 사용한 매개 변수 서버 훈련을 위해서는 조정자가 tf.distribute.experimental.ParameterServerStrategy 개체를 사용하고 tf.distribute.experimental.ParameterServerStrategy 입력으로 tf.keras.utils.experimental.DatasetCreator 합니다. 전략이 없거나 다른 전략이있는 Model.fit 사용과 유사하게 워크 플로에는 모델 생성 및 컴파일, 콜백 준비, Model.fit 호출이 포함됩니다.

사용자 지정 훈련 루프 (CTL) API를 사용한 매개 변수 서버 훈련

CTL을 사용하면 tf.distribute.experimental.coordinator.ClusterCoordinator 클래스가 코디네이터에 사용되는 주요 구성 요소입니다. ClusterCoordinator 클래스는 tf.distribute.Strategy 개체와 함께 작동해야합니다. 이 tf.distribute.Strategy 객체는 클러스터의 정보를 제공하는 데 필요 하며 MirroredStrategy 사용한 사용자 지정 훈련 에서 본 것처럼 훈련 단계를 정의하는 데 사용됩니다. 그런 다음 ClusterCoordinator 개체는 이러한 교육 단계의 실행을 원격 작업자에게 전달합니다. 매개 변수 서버 훈련을 위해 ClusterCoordinatortf.distribute.experimental.ParameterServerStrategy 와 함께 작동해야합니다.

ClusterCoordinator 객체가 제공하는 가장 중요한 API는 schedule 입니다. schedule API는 tf.function 대기열에 tf.function 미래와 유사한 RemoteValue 즉시 반환합니다. 대기중인 함수는 백그라운드 스레드의 원격 작업자에게 발송되고 해당 RemoteValue 가 비동기 적으로 채워집니다. schedule 에는 작업자 할당이 필요하지 않으므로 전달 된 tf.function 은 사용 가능한 모든 작업자에서 실행할 수 있습니다. 실행 된 워커가 완료되기 전에 사용할 수 없게되면 사용 가능한 다른 워커에서 함수가 다시 시도됩니다. 이 사실과 함수 실행이 원자 적이 지 않기 때문에 함수가 두 번 이상 실행될 수 있습니다.

원격 기능을 디스패치하는 것 외에도 ClusterCoordinator 는 모든 작업자에서 데이터 세트를 생성하고 작업자가 실패에서 복구 될 때 이러한 데이터 세트를 다시 빌드하는 데 도움이됩니다.

튜토리얼 설정

튜토리얼은 CTL 또는 Model.fit 경로로 분기되며 필요에 맞는 경로를 선택할 수 있습니다. "X를 사용한 교육"이외의 섹션은 두 경로 모두에 적용 할 수 있습니다.

pip install portpicker
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

클러스터 설정

위에서 언급했듯이 매개 변수 서버 학습 클러스터에는 학습 프로그램을 실행하는 코디네이터 작업, TensorFlow 서버를 실행하는 하나 이상의 작업자 및 매개 변수 서버 작업 (예 : tf.distribute.Server , tf.distribute.Server 를 실행하는 추가 평가 작업이 필요합니다. 평가 (아래 사이드카 평가 섹션 참조). 이를 설정하기위한 요구 사항은 다음과 같습니다.

  • 코디네이터 작업은 평가자를 제외한 다른 모든 TensorFlow 서버의 주소와 포트를 알아야합니다.
  • 작업자 및 매개 변수 서버는 수신해야하는 포트를 알아야합니다. 단순화를 위해 일반적으로 이러한 작업에서 TensorFlow 서버를 만들 때 전체 클러스터 정보를 전달합니다.
  • 평가자 작업은 훈련 클러스터의 설정을 알 필요가 없습니다. 그렇다면 학습 클러스터에 연결을 시도해서는 안됩니다.
  • 작업자 및 매개 변수 서버는 각각 "worker"및 "ps"작업 유형을 가져야합니다. 코디네이터는 레거시 이유로 작업 유형으로 "최고"를 사용해야합니다.

이 튜토리얼에서는 전체 파라미터 서버 훈련이 colab에서 실행될 수 있도록 in-process 클러스터를 생성합니다. 이후 섹션에서 실제 클러스터 를 설정하는 방법을 소개합니다.

In-process 클러스터

이 튜토리얼에서는 미리 TensorFlow 서버를 시작하고 나중에 연결할 것입니다. 이것은이 튜토리얼의 데모 목적으로 만 사용되며 실제 교육에서 서버는 작업자 및 PS 머신에서 시작됩니다.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

In-process 클러스터 설정은 단위 테스트에서 자주 사용됩니다. 여기에 한 가지 예가 있습니다.

ParameterServerStrategy 인스턴스화

학습 코드를 살펴보기 전에 ParameterServerStrategy 개체를 인스턴스화 해 보겠습니다. 사용자 지정 학습 루프를 진행하는지 Model.fit 진행하는지에 관계없이 필요합니다. variable_partitioner 인수는 다음 섹션 에서 설명 합니다 .

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:22388', 'localhost:22671'], 'worker': ['localhost:15023', 'localhost:23402', 'localhost:20268']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:22388', 'localhost:22671'], 'worker': ['localhost:15023', 'localhost:23402', 'localhost:20268']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0', '/job:chief/replica:0/task:0/device:GPU:1', '/job:chief/replica:0/task:0/device:GPU:2', '/job:chief/replica:0/task:0/device:GPU:3', '/job:chief/replica:0/task:0/device:GPU:4', '/job:chief/replica:0/task:0/device:GPU:5'], variable_device = '/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

훈련에 GPU를 사용하려면 각 작업자에게 표시되는 GPU를 할당하십시오. ParameterServerStrategy 는 각 작업자에서 사용 가능한 모든 GPU를 사용하며 모든 작업자는 동일한 수의 GPU를 사용할 수 있어야합니다.

가변 샤딩

변수 샤딩은 변수를 여러 개의 작은 변수로 분할하는 것을 말합니다. 이러한 작은 변수를 샤드라고 합니다. 가변 샤딩은 이러한 샤드에 액세스 할 때 네트워크 부하를 분산하는 데 유용 할 수 있습니다. 또한 여러 매개 변수 서버에 걸쳐 일반 변수의 계산 및 저장을 분배하는 데 유용합니다.

변수 샤딩을 활성화하려면 ParameterServerStrategy 개체를 생성 할 때 variable_partitioner 전달할 수 있습니다. variable_partitioner 는 변수가 생성 될 때마다 호출되며 변수의 각 차원에 따라 샤드 수를 반환 할 것으로 예상됩니다. tf.distribute.experimental.partitioners.FixedShardsPartitioner 와 같은 일부 기본 variable_partitioner 가 제공됩니다.

variable_partitioner 가 전달되고 strategy.scope() 바로 아래에 변수를 만들면 샤드 목록에 대한 액세스를 제공하는 variables 속성이있는 컨테이너 유형이됩니다. 대부분의 경우이 컨테이너는 모든 샤드를 연결하여 자동으로 Tensor로 변환됩니다. 결과적으로 일반 변수로 사용할 수 있습니다. 반면에 tf.nn.embedding_lookup 과 같은 일부 TensorFlow 메서드는이 컨테이너 유형에 대한 효율적인 구현을 제공하며 이러한 메서드에서는 자동 연결이 방지됩니다.

자세한 내용은 ParameterServerStrategy 의 API 문서 문자열을 참조하세요.

Model.fitModel.fit 훈련

Model.fit 는 재정의 가능한 train_step 과 체크 포인트 저장 또는 TensorBoard에 대한 요약 저장과 같은 기능을 제공하는 콜백으로 train_step 에서 학습 루프를 처리하는 Model.fit 을 통해 사용하기 쉬운 학습 API를 제공합니다. Model.fit 사용하면 전략 객체를 간단히 교체하여 다른 전략에 동일한 학습 코드를 사용할 수 있습니다.

입력 데이터

매개 변수 서버 학습을 사용하는 Model.fit 사용하려면 tf.distribute.InputContext 유형의 단일 인수를 취하고 tf.distribute.InputContext 을 반환하는tf.data.Dataset 에서 입력 데이터를 제공해야합니다. 그런 다음 생성 tf.keras.utils.experimental.DatasetCreator 같은 소요 오브젝트 callable 하고, 옵션 tf.distribute.InputOptions 통해 객체 input_options 인수. 매개 변수 서버 훈련을 사용하여 데이터를 steps_per_epoch 반복하는 것이 좋습니다. 라이브러리가 epoch 경계를 알 수 있도록 fit 호출에서 steps_per_epoch 를 지정 steps_per_epoch .

InputContext 인수에 대한 자세한 내용은 분산 입력 가이드를 참조하세요.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn 의 코드는 각 작업자 컴퓨터의 입력 장치 (일반적으로 CPU)에서 호출됩니다.

모델 구성 및 컴파일

이제, 당신이 만듭니다 tf.keras.Model (사소한 선택의 API를 사용을 tf.keras.models.Sequential a로, 모델은 여기에 데모로 사용되는) 다음 Model.compile 같은 최적화와 같은 구성 요소를 통합하는 전화, 측정 항목 또는 steps_per_execution 과 같은 매개 변수 :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

콜백 및 교육

실제 학습을 위해 model.fit 을 호출하기 전에 다음과 같은 일반적인 작업에 필요한 콜백을 준비하겠습니다.

  • ModelCheckpoint 모델 무게를 저장합니다.

  • BackupAndRestore 교육 진행 상황이 자동으로 백업되고 클러스터에 사용할 수없는 경우 (예 : 중단 또는 선점) 복구되는지 확인합니다.

  • TensorBoard 진행 보고서를 TensorBoard 도구에서 시각화되는 요약 파일로 저장합니다.

성능 고려 사항으로 인해 사용자 지정 콜백은 ParameterServerStrategy 와 함께 사용할 때 일괄 수준 콜백을 ​​재정의 할 수 없습니다. 사용자 지정 콜백을 수정하여 epoch 수준 호출을 수행하고 steps_per_epoch 를 적절한 값으로 조정 steps_per_epoch . 또한 steps_per_epochParameterServerStrategy 와 함께 사용할 때 Model.fit 의 필수 인수입니다.

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 4s - loss: 0.3194
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.2504
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2720
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7fe1586eb7a0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7fe15c122c20> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2094
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.2215
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<keras.callbacks.History at 0x7fe70c179950>

ClusterCoordinator 직접 사용 (선택 사항)

Model.fit 훈련 경로를 선택하더라도 선택적으로 ClusterCoordinator 객체를 인스턴스화하여 작업자에서 실행하려는 다른 기능을 예약 할 수 있습니다. 자세한 내용과 예제 는 사용자 지정 교육 루프를 사용한 교육 섹션을 참조하십시오.

맞춤형 교육 루프를 사용한 교육

tf.distribute.Strategy 사용한 사용자 지정 훈련 루프는 훈련 루프를 정의하는 데 큰 유연성을 제공합니다. 위에서 정의한 ParameterServerStrategy 를 사용하면 ClusterCoordinator 를 사용하여 원격 작업자에게 교육 단계 실행을 디스패치합니다.

그런 다음 다른 tf.distribute.Strategy 와 함께 훈련 루프에서 본 것처럼 모델을 생성하고 데이터 세트 및 단계 함수를 정의합니다. 이 자습서 에서 자세한 내용을 찾을 수 있습니다.

효율적인 데이터 세트 미리 가져 오기를 보장하려면 아래 원격 작업자에게 교육 단계 파견 섹션에 언급 된 권장 분산 데이터 세트 생성 API를 사용 하십시오 . 또한 작업자에 할당 된 GPU를 최대한 활용하려면 worker_fn 내부에서 strategy.run 을 호출해야합니다. 나머지 단계는 GPU를 사용하거나 사용하지 않는 학습에 대해 동일합니다.

다음 단계에서 이러한 구성 요소를 만들어 보겠습니다.

데이터 설정

먼저 Keras 전처리 계층에서 구현 한 전처리 로직을 포함하는 데이터 세트를 생성하는 함수를 작성합니다. 우리는 외부의 이러한 층이 생성됩니다 dataset_fn 하지만, 내부의 변환을 적용 dataset_fn 당신이 포장하기 때문에 dataset_fntf.function 변수가 그 안에 생성 할 수 없습니다.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab,
                                          mask_token=None)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

데이터 세트에서 장난감 예제를 생성합니다.

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

그런 다음 dataset_fn에 래핑 된 훈련 데이터 세트를 만듭니다.

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

모델 구축

둘째, 모델 및 기타 개체를 만듭니다. strategy.scope 아래에 모든 변수를 만들어야합니다.

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

FixedShardsPartitioner 사용하여 모든 변수를 두 개의 샤드로 분할하고 각 샤드가 다른 매개 변수 서버에 할당되었는지 확인하겠습니다.

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

훈련 단계 정의

셋째, tf.function 래핑 된 학습 단계를 만듭니다.

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

위의 step 함수에서 step_fn 에서 strategy.runstrategy.reduce 를 호출하면 작업 step_fn 여러 GPU를 지원할 수 있습니다. 작업자에 GPU가 할당 된 경우 strategy.run 은 데이터 세트를 여러 복제본에 배포합니다.

원격 작업자에게 교육 단계 파견

모든 계산이 ParameterServerStrategy 에 의해 정의 된 후에는 ClusterCoordinator 클래스를 사용하여 리소스를 생성하고 교육 단계를 원격 작업자에게 배포합니다.

먼저 ClusterCoordinator 개체를 만들고 전략 개체를 전달하겠습니다.

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

그런 다음 작업자 별 데이터 세트와 반복자를 만듭니다. 에서 per_worker_dataset_fn 아래의 포장 dataset_fnstrategy.distribute_datasets_from_function 완벽하게 GPU는 효율적인 프리 페칭을 허용하는 것이 좋습니다.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

마지막 단계는 schedule 사용하여 원격 작업자에게 계산을 배포하는 것입니다. schedule 메소드는 tf.function 대기열에 tf.function 미래와 유사한 RemoteValue 즉시 반환합니다. 대기중인 함수는 백그라운드 스레드의 원격 작업자에게 발송되고 RemoteValue 는 비동기 적으로 채워집니다. join 방법을 사용하여 예약 된 모든 기능이 실행될 때까지 대기 할 수 있습니다.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.493750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

RemoteValue 의 결과를 가져 오는 방법은 다음과 같습니다.

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.020666

또는 모든 단계를 시작하고 완료를 기다리는 동안 작업을 수행 할 수 있습니다.

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

이 특정 예에 대한 전체 교육 및 제공 워크 플로는이 테스트 를 확인하세요.

데이터 세트 생성에 대한 추가 정보

위 코드의 데이터 세트는 create_per_worker_dataset API를 사용하여 생성됩니다. 작업 자당 하나의 데이터 세트를 만들고 컨테이너 객체를 반환합니다. iter 메서드를 호출하여 작업자 별 반복자를 만들 수 있습니다. 작업 자당 반복기는 작업 자당 하나의 반복자를 포함하며 특정 작업자에서 함수가 실행되기 전에 schedule 메서드에 전달 된 함수의 입력 인수에서 작업자의 해당 슬라이스가 대체됩니다.

현재 schedule 방법은 작업자가 동일하다고 가정하므로 다른 작업자의 데이터 세트가 dataset.shuffle 작업을 포함하는 경우 다르게 셔플 될 수 있다는 점을 제외하고는 동일하다고 가정합니다. 따라서 데이터 세트의 OutOfRangeError 에 의존하는 대신 데이터 세트를 무기한 반복하고 한정된 수의 단계를 예약하는 것이 좋습니다.

또 다른 중요한 참고 사항은 tf.data 데이터 세트가 작업 경계를 넘어 암시 적 직렬화 및 역 직렬화를 지원하지 않는다는 것입니다. 따라서 create_per_worker_dataset 전달 된 함수 내에서 전체 데이터 세트를 생성하는 것이 중요합니다.

평가

분산 교육에서 평가 루프를 정의하고 실행하는 방법은 여러 가지가 있습니다. 각각에는 아래에 설명 된대로 고유 한 장단점이 있습니다. 선호 사항이없는 경우 인라인 평가 방법을 사용하는 것이 좋습니다.

인라인 평가

이 방법에서 코디네이터는 교육과 평가를 번갈아 가며 인라인 평가라고합니다. 인라인 평가에는 몇 가지 이점이 있습니다. 예를 들어 단일 작업이 보유 할 수없는 대규모 평가 모델 및 평가 데이터 세트를 지원할 수 있습니다. 또 다른 예로, 평가 결과를 사용하여 다음 세대 교육을위한 결정을 내릴 수 있습니다.

인라인 평가를 구현하는 방법에는 두 가지가 있습니다.

  • 직접 평가 -소규모 모델 및 평가 데이터 세트의 경우 코디네이터는 코디네이터의 평가 데이터 세트를 사용하여 분산 모델에서 직접 평가를 실행할 수 있습니다.
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • 분산 평가 -코디네이터에서 직접 실행할 수없는 대규모 모델 또는 데이터 세트의 경우 코디네이터 작업은 schedule / join 방법을 통해 작업자에게 평가 작업을 배포 할 수 있습니다.
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

사이드카 평가

또 다른 방법은 검사 점을 반복적으로 읽고 최신 검사 점에서 평가를 실행하는 전용 평가자 작업을 만드는 사이드카 평가라고합니다. 평가 결과에 따라 교육 루프를 변경할 필요가없는 경우 교육 프로그램을 조기에 마칠 수 있습니다. 그러나 평가를 트리거하려면 추가 평가자 작업과주기적인 체크 포인트가 필요합니다. 다음은 가능한 사이드카 평가 루프입니다.

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

실제 세계의 클러스터

실제 프로덕션 환경에서는 다른 컴퓨터의 다른 프로세스에서 모든 작업을 실행합니다. 각 작업에 대해 클러스터 정보를 구성하는 가장 간단한 방법은 "TF_CONFIG"환경 변수를 설정하고 tf.distribute.cluster_resolver.TFConfigClusterResolver 를 사용하여 "TF_CONFIG"를 구문 분석하는 tf.distribute.cluster_resolver.TFConfigClusterResolver 입니다. "TF_CONFIG"환경 변수에 대한 일반적인 설명은 분산 교육 가이드 를 참조하세요.

Kubernetes 또는 기타 구성 템플릿을 사용하여 학습 작업을 시작하는 경우 이러한 템플릿이 이미 "TF_CONFIG"를 설정했을 가능성이 큽니다.

“TF_CONFIG”환경 변수 설정

3 개의 작업자와 2 개의 매개 변수 서버가 있다고 가정하면 작업자 1의 "TF_CONFIG"는 다음과 같을 수 있습니다.

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

평가자의“TF_CONFIG”는 다음과 같습니다.

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

평가자에 대한 위의 "TF_CONFIG"문자열에서 "클러스터"부분은 선택 사항입니다.

모든 작업에 동일한 바이너리를 사용하는 경우

단일 바이너리를 사용하여 이러한 모든 작업을 실행하려면 처음에 프로그램이 다른 역할로 분기되도록해야합니다.

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

다음 코드는 TensorFlow 서버를 시작하고 대기합니다.

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

작업 실패 처리

작업자 실패

ClusterCoordinator 또는 Model.fit 은 작업자 실패에 대한 기본 제공 내결함성을 제공합니다. 작업자 복구되면, 이전에 데이터 세트 기능을 제공 (하나에 create_per_worker_dataset CTL에 대한, 또는 DatasetCreator 에 대한 Model.fit )에 근로자에 호출 될 데이터 집합을 다시 만들 수 있습니다.

매개 변수 서버 또는 조정자 오류

그러나 코디네이터가 매개 변수 서버 오류를 발견하면 즉시 UnavailableError 또는 AbortedError . 이 경우 코디네이터를 다시 시작할 수 있습니다. 코디네이터 자체도 사용할 수 없게 될 수 있습니다. 따라서 교육 진행 상황을 잃지 않도록 특정 도구를 사용하는 것이 좋습니다.

  • Model.fit 경우 진행률 저장 및 복원을 자동으로 처리하는 BackupAndRestore 콜백을 사용해야합니다. 예제는 위의 콜백 및 교육 섹션을 참조하십시오.

  • CTL의 경우 학습을 시작하기 전에 모델 변수를 주기적으로 체크 포인트하고 체크 포인트에서 모델 변수를로드해야합니다. 최적화 프로그램이 체크 포인트 optimizer.iterations 경우 학습 진행률은 대략 optimizer.iterations 에서 추론 할 수 있습니다.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue 가져 오기

RemoteValue 가져 오기는 함수가 성공적으로 실행되면 성공할 수 있습니다. 이는 현재 반환 값이 함수가 실행 된 후 즉시 코디네이터에 복사되기 때문입니다. 복사 중에 작업자 오류가 발생하면 사용 가능한 다른 작업자에서 함수가 다시 시도됩니다. 따라서 성능을 최적화하려는 경우 반환 값없이 함수를 예약 할 수 있습니다.

오류보고

코디네이터가 매개 변수 서버의 UnavailableError 와 같은 오류 또는 tf.debugging.check_numericsInvalidArgument 와 같은 기타 애플리케이션 오류를 확인하면 오류를 발생시키기 전에 보류 중이거나 대기중인 모든 함수를 취소합니다. 해당 RemoteValue 가져 RemoteValue CancelledError 합니다.

오류가 발생한 후 코디네이터는 취소 된 함수에서 동일한 오류나 오류를 발생시키지 않습니다.

성능 개선

ParameterServerStrategyClusterResolver 훈련 할 때 성능 문제가 발생하는 경우 몇 가지 가능한 이유가 있습니다.

한 가지 일반적인 이유는 매개 변수 서버의 부하가 불균형하고 일부 과부하 매개 변수 서버가 용량에 도달했기 때문입니다. 또한 여러 근본 원인이있을 수 있습니다. 이 문제를 완화하는 몇 가지 간단한 방법은 다음과 같습니다.

  1. 지정을 통해 대형 모델 변수를 샤딩 variable_partitioner 구성 할 때 ParameterServerStrategy .
  2. 가능하면 모든 매개 변수 서버에 필요한 핫스팟 변수를 한 번에 생성하지 마십시오. 예를 들어, 최적화 프로그램에서 일정한 학습률 또는 하위 클래스 tf.keras.optimizers.schedules.LearningRateSchedule 을 사용합니다. 기본 동작은 학습률이 특정 매개 변수 서버에 배치되고 각 단계에서 다른 모든 매개 변수 서버가 요청하는 변수가되기 때문입니다. .
  3. 큰 어휘를 Keras 전처리 레이어로 전달하기 전에 섞습니다.

성능 문제의 또 다른 가능한 이유는 코디네이터입니다. 첫 번째 schedule / join 구현은 Python 기반이므로 스레딩 오버 헤드가있을 수 있습니다. 또한 코디네이터와 작업자 간의 지연 시간이 클 수 있습니다. 만일이 경우라면,

  • 들어 Model.fit , 당신은 설정할 수 있습니다 steps_per_execution 제공 인수 Model.compile 1보다 큰 값으로한다.

  • CTL의 경우 여러 단계를 단일 tf.function 할 수 있습니다.

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

라이브러리를 계속 최적화하기 때문에 대부분의 사용자가 향후 수동으로 단계를 포장 할 필요가 없기를 바랍니다.

또한 성능 향상을위한 작은 비결은 위의 작업 실패 처리 섹션에서 설명한대로 반환 값없이 함수를 예약하는 것입니다.

알려진 제한

알려진 제한의 대부분은 위 섹션에서 다룹니다. 이 섹션에서는 요약을 제공합니다.

ParameterServerStrategy 일반

  • os.environment["grpc_fail_fast"]="use_caller" 는 내결함성이 올바르게 작동하도록 조정자를 포함한 모든 작업에 필요합니다.
  • 동기 매개 변수 서버 훈련은 지원되지 않습니다.
  • 최적의 성능을 얻으려면 일반적으로 여러 단계를 하나의 기능으로 압축해야합니다.
  • 샤딩 된 변수가 포함 된 tf.saved_model.load를 통해 tf.saved_model.load 을로드하는 것은 지원되지 않습니다. TensorFlow Serving을 사용하여 이러한 saved_model을로드하는 것은 작동 할 것으로 예상됩니다.
  • 샤딩 된 옵티 마이저 슬롯 변수가 포함 된 체크 포인트를 다른 샤드 수로로드하는 것은 지원되지 않습니다.
  • 코디네이터 태스크를 다시 시작하지 않고 매개 변수 서버 실패에서 복구하는 것은 지원되지 않습니다.
  • tf.lookup.StaticHashTable ( IntegerLookup , StringLookupTextVectorization 과 같은 일부 tf.keras.layers.experimental.preprocessing 레이어에서 일반적으로 사용됨)을 사용하면 PS 교육을 통해 현재 코디네이터에 리소스가 배치됩니다. 이는 작업자에서 코디네이터로의 RPC 조회 성능에 영향을줍니다. 이것은 현재 해결해야 할 높은 우선 순위입니다.

Model.fit 사양

  • steps_per_epoch 인수가 필요합니다 Model.fit . Epoch에서 적절한 간격을 제공하는 값을 선택할 수 있습니다.
  • ParameterServerStrategy 는 성능상의 이유로 배치 수준 호출이있는 사용자 지정 콜백을 지원하지 않습니다. 이러한 호출을 적절하게 선택한 steps_per_epoch 사용하여 epoch 수준 호출로 변환하여 모든 steps_per_epoch 수의 단계로 호출되도록해야합니다. 기본 제공 콜백은 영향을받지 않습니다. 배치 수준 호출이 성능을 발휘하도록 수정되었습니다. ParameterServerStrategy 에 대한 배치 수준 호출 지원이 계획 중입니다.
  • 같은 이유로 다른 전략과 달리 진행률 표시 줄과 메트릭은 에포크 경계에서만 기록됩니다.
  • 의 입력 Model.fit 단지 형 소요 DatasetCreator .
  • run_eagerly 는 지원되지 않습니다.
  • Model.fit 평가는 아직 지원되지 않습니다. 이것이 우선 순위 중 하나입니다.
  • Model.evaluateModel.predict 는 아직 지원되지 않습니다.

맞춤형 교육 루프 세부 사항