ParameterServerStrategy를 사용한 매개 변수 서버 교육

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

개요

매개 변수 서버 훈련은 여러 시스템에서 모델 교육을 확장하는 일반적인 데이터 병렬 방법이다.

매개 변수 서버 교육 클러스터는 노동자와 매개 변수를 서버로 구성됩니다. 변수는 매개변수 서버에서 생성되며 각 단계에서 작업자가 읽고 업데이트합니다. 기본적으로 작업자는 이러한 변수를 서로 동기화하지 않고 독립적으로 읽고 업데이트합니다. 이 비동기 교육이라고하는 이유 때로는 서버 스타일의 훈련 매개 변수입니다.

TensorFlow 2에서, 매개 변수 서버 훈련에 의해 제공됩니다 tf.distribute.experimental.ParameterServerStrategy 노동자 수천까지 확장 클러스터에 훈련 단계를 배포 클래스 (매개 변수 서버와 함께).

지원되는 교육 방법

지원되는 두 가지 주요 교육 방법은 다음과 같습니다.

작업 및 태스크가 있는 클러스터

에 관계없이 선택의 API (의 Model.fit A : 또는 사용자 정의 훈련 루프), TensorFlow 2 분산 교육 관련 'cluster' 몇 가지와 'jobs' 하고, 작업은 각각 하나 이상있을 수 있습니다 'tasks' .

매개변수 서버 교육을 사용할 때 다음을 수행하는 것이 좋습니다.

  • 코디네이터 작업 (작업 이름이있는 chief )
  • 여러 작업자 작업 (작업 이름의 worker ); 그리고
  • 여러 매개 변수 서버 작업 (작업 이름 ps )

코디네이터는 자원, 작업 훈련 파견을 생성하는 동안, 체크 포인트를 기록하고, 작업 실패, 노동자와 매개 변수를 서버와 거래를 실행 tf.distribute.Server 코디네이터로부터 요청을 수신.

와 매개 변수 서버 교육 Model.fit API

와 매개 변수 서버 교육 Model.fit API는 사용하는 코디네이터가 필요 tf.distribute.experimental.ParameterServerStrategy 객체를, 그리고 tf.keras.utils.experimental.DatasetCreator 입력으로합니다. 유사 Model.fit , 또는 다른 전략, 워크 플로 생성하고 모델을 컴파일하는 다음 콜백, 준비 포함 어떤 전략을 사용 Model.fit 전화를.

커스텀 트레이닝 루프를 사용한 파라미터 서버 트레이닝

맞춤형 교육 루프로, tf.distribute.experimental.coordinator.ClusterCoordinator 클래스는 코디네이터에 사용되는 핵심 구성 요소입니다.

에 의해 제공되는 가장 중요한 API ClusterCoordinator 목적은 schedule :

  • schedule API는 큐에 넣습니다 tf.function 향후 같은 반환 RemoteValue 즉시.
  • 대기중인 기능은 백그라운드 스레드에서 원격 근로자 파견되고 그 RemoteValue 들 비동기 적으로 채워집니다.
  • 이후 schedule 노동자 할당을 필요로하지 않습니다는 tf.function 가능한 근로자에서 실행할 수 있습니다 전달.
  • 실행된 작업자가 완료되기 전에 사용할 수 없게 되면 사용 가능한 다른 작업자에서 함수가 재시도됩니다.
  • 이 사실과 함수 실행이 원자적이지 않다는 사실 때문에 함수는 두 번 이상 실행될 수 있습니다.

원격 기능을 파견뿐만 아니라, ClusterCoordinator 이러한 데이터 세트를 모든 근로자에 대한 데이터 세트를 생성하고 재건하는 데 도움이 때 실패에서 작업자 복구합니다.

튜토리얼 설정

자습서로 분기됩니다 Model.fit 및 맞춤형 교육 루프 경로, 당신은 당신의 요구에 맞는 하나를 선택할 수 있습니다. "Training with X" 이외의 섹션은 두 경로 모두에 적용할 수 있습니다.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

클러스터 설정

위에서 언급 한 바와 같이, 매개 변수 서버 교육 클러스터가 실행 TensorFlow 서버 - 훈련 프로그램, 하나 또는 여러 노동자와 매개 변수 서버 작업을 실행하는 코디네이터 작업이 필요 tf.distribute.Server - 그리고 아마도 추가적인 평가 작업이 실행 측면 - 자동차 평가 (아래 사이드카 평가 섹션 참조). 설정 요구 사항은 다음과 같습니다.

  • 코디네이터 작업은 평가자를 제외한 다른 모든 TensorFlow 서버의 주소와 포트를 알아야 합니다.
  • 작업자와 매개변수 서버는 수신해야 하는 포트를 알아야 합니다. 단순성을 위해 일반적으로 이러한 작업에서 TensorFlow 서버를 생성할 때 전체 클러스터 정보를 전달할 수 있습니다.
  • 평가자 작업은 훈련 클러스터의 설정을 알 필요가 없습니다. 그렇다면 훈련 클러스터에 연결을 시도해서는 안 됩니다.
  • 노동자와 매개 변수를 서버로 작업 유형을 가져야한다 "worker""ps" 각각. 코디네이터는 사용해야합니다 "chief" 기존의 이유로 작업 유형으로.

이 튜토리얼에서는 전체 매개변수 서버 교육을 Colab에서 실행할 수 있도록 in-process 클러스터를 생성합니다. 당신은 설정하는 방법을 배웁니다 실제 클러스터를 나중에 섹션.

진행 중인 클러스터

먼저 여러 TensorFlow 서버를 만들고 나중에 연결합니다. 참고이는이 튜토리얼의 시연을 목적으로하고, 실제 교육의 서버가 시작됩니다 "worker""ps" 기계.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

인 - 프로세스 클러스터 설정은 종종 같은 단위 테스트에 사용됩니다 여기에 .

현지 테스트를위한 또 다른 옵션은 밖으로 로컬 시스템 검사에서 프로세스를 실행하는 것입니다 Keras 멀티 - 작업자 교육을 이 방법의 예.

ParameterServerStrategy 인스턴스화

당신이 훈련 코드로 다이빙을하기 전에,이 인스턴스화 할 수 ParameterServerStrategy 개체를. 이 관계없이 진행 여부에 필요하다고 주 Model.fit 또는 사용자 정의 훈련 루프. variable_partitioner 인수는 설명한다 변수 샤딩 섹션 .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

훈련에 GPU를 사용하려면 각 작업자에게 표시되는 GPU를 할당합니다. ParameterServerStrategy 모든 근로자를 사용할 GPU의 같은 번호를 가지고해야한다는 제한, 각 근로자에 사용 가능한 모든 GPU를 사용합니다.

가변 샤딩

가변 샤딩 분할에 파편 불리는 여러 더 작은 변수로 변수를 의미한다. 가변 샤딩은 이러한 샤드에 액세스할 때 네트워크 부하를 분산하는 데 유용할 수 있습니다. 여러 매개변수 서버에 일반 변수의 계산 및 저장을 분산하는 것도 유용합니다.

변수 샤딩을 사용하려면, 당신은에 전달할 수 있습니다 variable_partitioner 구성 할 때 ParameterServerStrategy 개체를. variable_partitioner 변수가 생성 될 때마다 호출됩니다 변수의 각 차원에 따라 파편의 수를 반환 할 것으로 예상된다. 일부 아웃 - 오브 - 박스 variable_partitioner 들 등이 제공된다 tf.distribute.experimental.partitioners.MinSizePartitioner . 이는 같은 크기 기반위한 구분자를 사용하는 것이 좋습니다 tf.distribute.experimental.partitioners.MinSizePartitioner 모델 훈련 속도에 부정적인 영향을 미칠 수있는 작은 변수를 분할 피하기 위해.

variable_partitioner 전달되고, 당신이 바로 아래에 변수를 만드는 경우 strategy.scope() , 그것은을 가진 컨테이너 유형이 될 것이다 variables 파편의 목록에 대한 액세스를 제공하는 속성입니다. 대부분의 경우 이 컨테이너는 모든 샤드를 연결하여 자동으로 Tensor로 변환됩니다. 결과적으로 일반 변수로 사용할 수 있습니다. 반면에, 같은 일부 TensorFlow 방법 tf.nn.embedding_lookup 이 컨테이너 유형에 대한 자동 연결은 피해야한다 이러한 방법의 효율적인 구현을 제공합니다.

의 API 문서를 참조하십시오 tf.distribute.experimental.ParameterServerStrategy 자세한 내용은.

함께 훈련 Model.fit

Keras을 통해 사용하기 쉬운 교육 API를 제공 Model.fit 그 핸들 교육 재정의의 유연성 후드 루프, train_step 같은 TensorBoard에 대한 절약 체크 포인트 절약 또는 요약 등의 기능을 제공하고, 콜백. 로 Model.fit , 같은 훈련 코드는 전략 객체의 간단한 스왑과 다른 전략을 사용할 수 있습니다.

입력 데이터

Model.fit 매개 변수 서버 훈련은 입력 데이터 타입의 인수를 소요하는 호출에 제공 할 것을 요구 tf.distribute.InputContext , 그리고 반환 tf.data.Dataset . 그런 다음 생성 tf.keras.utils.experimental.DatasetCreator 같은 소요 오브젝트 callable 하고, 옵션 tf.distribute.InputOptions 통해 객체 input_options 인수.

이 셔플 및 매개 변수 서버 훈련 데이터를 반복하고 지정하는 것을 권장합니다 steps_per_epochfit 라이브러리가 시대의 경계를 알 수 있도록 전화.

참조하십시오 분산 입력 에 대한 자세한 내용은 자습서를 InputContext 인수.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

의 코드 dataset_fn 작업자 각 머신에, 일반적으로 CPU 인 입력 장치에 호출됩니다.

모델 구성 및 컴파일

이제, 당신은 만들 것이다 tf.keras.Model -a 사소한 tf.keras.models.Sequential 데모-다음에 의해 모델 Model.compile 등과 같은 최적화, 통계, 또는 매개 변수와 같은 구성 요소, 통합 전화 steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

콜백 및 교육

당신이 전화하기 전에 model.fit 실제 훈련,의와 같은 일반적인 작업을위한 필요한 콜백을 준비하자 :

  • ModelCheckpoint : 모델 가중치를 저장합니다.
  • BackupAndRestore : 확실히 교육 진행이 자동으로 백업하고, 경우에 복구하려면 (예를 중단 또는 선점 등) 클러스터 환경의 가용성; 또는
  • TensorBoard : TensorBoard 도구에서 시각화 얻을 요약 파일에 진행 보고서를 저장합니다.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

직접 사용 ClusterCoordinator (선택 사항)

당신이 선택하더라도 Model.fit 교육 경로를, 당신은 선택적으로 인스턴스화 할 수 tf.distribute.experimental.coordinator.ClusterCoordinator 사용자가 근로자에 실행하고자하는 다른 기능을 예약 할 개체를. 참고 항목 사용자 정의 교육 루프와 교육 자세한 내용과 예제 섹션을 참조하십시오.

맞춤형 훈련 루프를 사용한 훈련

와 사용자 교육 루프를 사용 tf.distribute.Strategy 교육 루프를 정의하기위한 유연성을 제공한다. 으로 ParameterServerStrategy (위와 같이 정의 된 strategy ), 당신은 사용 tf.distribute.experimental.coordinator.ClusterCoordinator 원격 작업자에 대한 교육 단계의 실행을 파견 할 수 있습니다.

그런 다음, 모델을 만들 것입니다 다른과 훈련 루프에서했던 것처럼, 데이터 집합 및 스텝 함수를 정의 tf.distribute.Strategy 의. 당신은에서 자세한 내용 찾을 수 있습니다 tf.distribute.Strategy의와 정의 훈련 자습서를.

효율적인 데이터 세트의 프리 페치를 보장하기 위해,이에 언급 된 데이터 세트 생성 API를 분산 권장 사용 원격 근로자 파견 훈련 단계 아래 절을 참조하십시오. 또한, 전화를해야합니다 Strategy.run 내부 worker_fn 노동자에 할당 된 GPU를 최대한 활용할 수 있습니다. 나머지 단계는 GPU가 있거나 없는 훈련에 대해 동일합니다.

다음 단계에서 이러한 구성 요소를 생성해 보겠습니다.

데이터 설정

먼저, 전처리에 의해 구현되는 로직을 포함하는 데이터 세트 생성 함수 작성 Keras 전처리 층 .

당신은 외부 이러한 층이 생성됩니다 dataset_fn 내부의 변환을하지만 적용 dataset_fn 당신이 포장하기 때문에, dataset_fntf.function 변수가 그 안에 생성 할 수 없습니다.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

데이터세트에서 장난감 예제 생성:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

그런 다음에 싸여 훈련 데이터 집합 생성 dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

모델 구축

다음으로 모델 및 기타 개체를 만듭니다. 아래의 모든 변수를 생성해야합니다 strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

를 사용하는 것을하자 확인 FixedShardsPartitioner 이 개 파편에 모든 변수를 분할하고 각각의 파편은 서로 다른 매개 변수를 서버에 할당 :

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

훈련 단계 정의

셋째,에 랩 훈련 단계 만들 tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

위의 훈련 단계 함수에서 호출 Strategy.runStrategy.reducestep_fn 노동자 당 복수의 GPU를 지원할 수 있습니다. 노동자가 GPU에 할당 한 경우, Strategy.run 여러 복제본의 데이터 세트를 배포합니다.

원격 작업자에게 교육 단계 디스패치

모든 계산에 의해 정의 된 후 ParameterServerStrategy , 당신은 사용 tf.distribute.experimental.coordinator.ClusterCoordinator 리소스를 생성하는 클래스를 원격 근로자에게 훈련 단계를 배포 할 수 있습니다.

의 처음 만들어 보자 ClusterCoordinator 개체 및 전략 객체를 전달합니다 :

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

그런 다음 작업자별 데이터 세트와 반복자를 만듭니다. 에서 per_worker_dataset_fn 아래의 포장 dataset_fnstrategy.distribute_datasets_from_function 완벽하게 GPU는 효율적인 프리 페칭을 허용하는 것이 좋습니다.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

마지막 단계는 사용 원격 근로자 계산 배포하는 ClusterCoordinator.schedule :

  • schedule 방법은 큐에 넣습니다 tf.function 향후 같은 반환 RemoteValue 즉시. 대기중인 기능은 백그라운드 스레드에서 원격 근로자 파견되고 RemoteValue 비동기 적으로 채워집니다.
  • join 방법 ( ClusterCoordinator.join ) 모든 예약 기능이 실행될 때까지 대기 할 수있다.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

다음은 결과 가져올 수있는 방법입니다 RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

또는 모든 단계를 시작하고 완료를 기다리는 동안 작업을 수행할 수 있습니다.

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

이 특정 예를 들어, 완전한 훈련 및 제공하는 워크 플로우의 경우,이 확인하시기 바랍니다 테스트를 .

데이터세트 생성에 대해 자세히 알아보기

위의 코드의 데이터 세트는 사용하여 만들어집니다 ClusterCoordinator.create_per_worker_dataset ) API를. 작업자당 하나의 데이터세트를 생성하고 컨테이너 개체를 반환합니다. 당신은 호출 할 수 있습니다 iter 당 - 노동자 반복자를 만드는 데에 방법을. 당 작업자 반복자 작업자 하나씩 반복기를 포함하고 작업자의 대응하는 슬라이스가로 전달 함수의 입력 인자로 치환 될 ClusterCoordinator.schedule 함수를 특정 작업자 실행되기 전에 방법.

현재 ClusterCoordinator.schedule 방법은 노동자들이 동등하다고 가정하고, 따라서 그들이이 포함 된 다른 경우 단행 할 수있다를 제외하고 다른 노동자에 대한 데이터 세트가 동일하다고 가정 Dataset.shuffle 작업을. 이 때문에, 또한 데이터 세트 무한정 반복 할 것을 권장하고 대신에 의존 단계 한정된 수의 일정을 OutOfRangeError 데이터 세트에서합니다.

또 다른 중요한 점은 있다는 것입니다 tf.data 데이터 세트 작업의 경계를 넘어 암시 직렬화 및 역 직렬화를 지원하지 않습니다. 그래서 전달 된 함수 내에서 전체 데이터 세트 작성하는 것이 중요하다 ClusterCoordinator.create_per_worker_dataset .

평가

분산 교육에서 평가 루프를 정의하고 실행하는 방법은 여러 가지가 있습니다. 각각은 아래와 같이 장단점이 있습니다. 선호하지 않는 경우 인라인 평가 방법을 권장합니다.

인라인 평가

이 방법에있어서, 훈련 및 평가함으로써 그것이라고 인라인 평가의 코디네이터 번갈아.

인라인 평가에는 몇 가지 이점이 있습니다. 예를 들어:

  • 단일 작업이 보유할 수 없는 대규모 평가 모델 및 평가 데이터 세트를 지원할 수 있습니다.
  • 평가 결과는 다음 Epoch 훈련을 위한 결정을 내리는 데 사용할 수 있습니다.

인라인 평가를 구현하는 방법에는 직접 평가와 분산 평가의 두 가지가 있습니다.

  • 직접 평가 : 작은 모델 및 평가 데이터 세트의 경우, 코디네이터는 코디네이터에 대한 평가 데이터 세트와 분산 모델에 직접 평가를 실행할 수 있습니다 :
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • 분산 평가 : 대형 모델이나 코디네이터에서 직접 실행하는 것이 불가능하다 데이터 세트를 들면,를 통해 노동자들에게 평가 작업을 배포 할 수 있습니다 코디네이터 작업 ClusterCoordinator.schedule / ClusterCoordinator.join 방법 :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

사이드카 평가

또 다른 방법은 당신이 반복적으로 체크 포인트를 읽고 최신 검사 점에 평가를 실행하는 전용 평가 작업을 만들 측면 차 평가라고합니다. 평가 결과에 따라 교육 루프를 변경할 필요가 없는 경우 교육 프로그램을 조기에 완료할 수 있습니다. 그러나 평가를 트리거하려면 추가 평가자 작업과 주기적인 체크포인트가 필요합니다. 다음은 가능한 사이드카 평가 루프입니다.

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

현실 세계의 클러스터

실제 프로덕션 환경에서는 다른 시스템의 다른 프로세스에서 모든 작업을 실행합니다. 각 작업에 구성 클러스터 정보를 가장 간단한 방법은 설정하는 것입니다 "TF_CONFIG" 환경 변수와 사용 tf.distribute.cluster_resolver.TFConfigClusterResolver 구문 분석 "TF_CONFIG" .

에 대한 일반적인 설명은 "TF_CONFIG" 환경 변수의 참조 분산 교육 가이드.

당신이는 Kubernetes 또는 기타 구성 템플릿을 사용하여 훈련 작업을 시작하면, 그것은 매우 가능성이 템플릿이 이미 설정 한 것입니다 “TF_CONFIG" 당신을 위해.

설정 "TF_CONFIG" 환경 변수를

당신이 3 명 노동자와 2 개 매개 변수 서버가 가정의 "TF_CONFIG" 노동자 1이 될 수 있습니다 :

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" 평가자의가 될 수 있습니다 :

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" 위의 부분 "TF_CONFIG" 평가자에 대한 문자열은 선택 사항입니다.

모든 작업에 동일한 바이너리를 사용하는 경우

단일 바이너리를 사용하여 이러한 모든 작업을 실행하는 것을 선호하는 경우 처음에 프로그램을 다른 역할로 분기하도록 해야 합니다.

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

다음 코드는 TensorFlow 서버를 시작하고 기다립니다.

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

작업 실패 처리

작업자 실패

tf.distribute.experimental.coordinator.ClusterCoordinator 또는 Model.fit 내장을 제공 작업자 실패에 대한 내결함성. 작업자 복구되면, 이전에 제공된 데이터 세트 (중 하나에 기능 ClusterCoordinator.create_per_worker_dataset 맞춤 교육 루프, 또는 tf.keras.utils.experimental.DatasetCreator 에 대한 Model.fit )에 근로자에 호출 될 데이터 집합을 다시 만들 수 있습니다.

매개변수 서버 또는 코디네이터 오류

코디네이터는 매개 변수 서버 오류를 볼 때, 그것은 올릴 것이다 UnavailableError 또는 AbortedError 즉시. 이 경우 코디네이터를 다시 시작할 수 있습니다. 조정자 자체도 사용할 수 없게 될 수 있습니다. 따라서 교육 진행 상황을 잃지 않기 위해 특정 도구를 사용하는 것이 좋습니다.

  • 들어 Model.fit , 당신은 사용해야 BackupAndRestore 자동 진행 절약 및 복원을 처리하는 콜백을. 참조 콜백과 훈련 예는 위의 절을 참조하십시오.

  • 사용자 지정 훈련 루프의 경우 훈련이 시작되기 전에 주기적으로 모델 변수를 체크포인트하고 체크포인트(있는 경우)에서 모델 변수를 로드해야 합니다. 교육 진행은에서 약 추론 할 수 optimizer.iterations 옵티 마이저가 체크 포인트 경우 :

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

페칭 RemoteValue

페칭 RemoteValue 함수가 성공적으로 실행하면 성공이 보장됩니다. 현재는 함수가 실행된 후 반환 값이 즉시 코디네이터에 복사되기 때문입니다. 복사하는 동안 작업자 오류가 발생하면 사용 가능한 다른 작업자에서 기능이 재시도됩니다. 따라서 성능을 최적화하려면 반환 값 없이 함수를 예약할 수 있습니다.

오류 보고

코디네이터가 같은 오류가보고되면 UnavailableError 같은 같은 매개 변수를 서버 또는 다른 응용 프로그램 오류에서를 InvalidArgument 에서 tf.debugging.check_numerics , 그것은 오류를 제기하기 전에 모든 보류중인 대기 기능을 자동 삭제됩니다. 그에 상응하는 가져 오기 RemoteValue 의 것은 올릴 것이다 CancelledError .

오류가 발생한 후 코디네이터는 동일한 오류 또는 취소된 기능의 오류를 발생시키지 않습니다.

성능 향상

당신이 훈련 할 때 성능 문제를 보면 몇 가지 이유가 있습니다 ParameterServerStrategyClusterResolver .

한 가지 일반적인 이유는 매개변수 서버의 로드 균형이 맞지 않고 로드가 과중한 일부 매개변수 서버가 용량에 도달했기 때문입니다. 또한 여러 근본 원인이 있을 수 있습니다. 이 문제를 완화하는 몇 가지 간단한 방법은 다음과 같습니다.

  1. 지정을 통해 대형 모델 변수를 파편 variable_partitioner 구성 할 때 ParameterServerStrategy .
  2. 가능한 한 단일 단계에서 모든 매개변수 서버에 필요한 핫스팟 변수를 작성하지 마십시오. 예를 들어, 일정한 학습 속도 나 서브 클래스의 사용 tf.keras.optimizers.schedules.LearningRateSchedule 기본 동작은 학습 속도가 각 단계에서 다른 모든 매개 변수는 서버에서 특정 매개 변수를 서버에 배치 요청 변수가 될 것입니다 때문에 최적화에를 .
  3. Keras 사전 처리 계층으로 전달하기 전에 큰 어휘를 섞습니다.

성능 문제의 또 다른 가능한 이유는 조정자입니다. 귀하의 첫 번째 구현 schedule / join 파이썬 기반이며, 따라서 오버 헤드 스레딩 수 있습니다. 또한 코디네이터와 작업자 사이의 대기 시간이 클 수 있습니다. 만일이 경우라면,

  • 들어 Model.fit , 당신은 설정할 수 있습니다 steps_per_execution 제공 인수 Model.compile 1보다 큰 값으로한다.

  • 사용자 정의 교육 루프를 들어, 하나에 여러 단계 포장 할 수 tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

라이브러리가 더욱 최적화됨에 따라 앞으로 대부분의 사용자가 수동으로 단계를 압축할 필요가 없기를 바랍니다.

또한 성능 향상을 위한 작은 트릭은 위의 작업 실패 처리 섹션에서 설명한 대로 반환 값 없이 함수를 예약하는 것입니다.

알려진 제한 사항

알려진 제한 사항의 대부분은 이미 위의 섹션에서 다룹니다. 이 섹션에서는 요약을 제공합니다.

ParameterServerStrategy 일반

  • os.environment["grpc_fail_fast"]="use_caller" 제대로 내결함성 작업을 만들기 위해, 코디네이터를 포함한 모든 작업에 필요합니다.
  • 동기 매개변수 서버 교육은 지원되지 않습니다.
  • 최적의 성능을 달성하려면 일반적으로 여러 단계를 단일 기능으로 압축해야 합니다.
  • 이를 통해 saved_model로드 지원되지 tf.saved_model.load 분산됩니다 변수를 포함한다. TensorFlow Serving을 사용하여 이러한 stored_model을 로드하는 것은 작동할 것으로 예상됩니다.
  • 분할된 옵티마이저 슬롯 변수를 포함하는 체크포인트를 다른 수의 분할로 로드하는 것은 지원되지 않습니다.
  • 코디네이터 작업을 다시 시작하지 않고 매개변수 서버 장애에서 복구하는 것은 지원되지 않습니다.
  • 사용량 tf.lookup.StaticHashTable (통상적으로 일부 채용 tf.keras.layers.experimental.preprocessing 같은 층 IntegerLookup , StringLookupTextVectorization 파라미터 서버 훈련 이때 코디네이터 배치 자원 결과). 이것은 작업자에서 코디네이터로의 조회 RPC에 성능에 영향을 미칩니다. 이것은 현재 해결해야 할 최우선 과제입니다.

Model.fit 특성

  • steps_per_epoch 인수가 필요합니다 Model.fit . Epoch에서 적절한 간격을 제공하는 값을 선택할 수 있습니다.
  • ParameterServerStrategy 성능상의 이유로 배치 수준의 통화를 사용자 정의 콜백을 지원하지 않습니다. 당신은 적절하게 포착과 획기적인 수준의 통화로 이러한 호출을 변환해야합니다 steps_per_epoch 가 모든라고 그래서, steps_per_epoch 단계의 수입니다. 내장 콜백은 영향을 받지 않습니다. 배치 수준 호출이 성능을 발휘하도록 수정되었습니다. 에 배치 수준의 전화 지원 ParameterServerStrategy 계획되고있다.
  • 같은 이유로 다른 전략과 달리 진행률 표시줄과 메트릭은 Epoch 경계에서만 기록됩니다.
  • run_eagerly 지원되지 않습니다.

맞춤형 교육 루프 세부 사항