질문이있다? TensorFlow 포럼 방문 포럼 에서 커뮤니티와 연결

Keras 및 MultiWorkerMirroredStrategy를 사용한 맞춤형 교육 루프

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

개요

이 튜토리얼은 MultiWorkerMirroredStrategy를 통해 배포되는 사용자 정의 훈련 루프 API를 사용한 다중 작업자 훈련을 보여 주므로 단일 작업자 에서 실행되도록 설계된 Keras 모델은 최소한의 코드 변경으로 여러 작업자에서 원활하게 작업 할 수 있습니다.

사용자 지정 교육 루프를 사용하여 모델을 교육하는 이유는 교육에 대한 유연성과 제어력을 높이기 때문입니다. 또한 모델과 학습 루프를 디버그하는 것이 더 쉽습니다. 더 자세한 정보는 처음부터 훈련 루프 작성을 참조하십시오.

keras model.fit 과 함께 MultiWorkerMirroredStrategy 를 사용하는 방법을 찾고 있다면 대신이 튜토리얼을 참조하십시오.

TensorFlow 의 분산 tf.distribute.Strategytf.distribute.Strategy API에 대한 더 깊은 이해에 관심이있는 사람들을 위해 TensorFlow가 지원하는 배포 전략의 개요를 제공합니다.

설정

첫째, 필요한 수입품.

import json
import os
import sys

TensorFlow를 가져 오기 전에 환경을 몇 가지 변경하십시오.

모든 GPU를 비활성화합니다. 이렇게하면 모든 작업자가 동일한 GPU를 사용하려고하여 발생하는 오류를 방지합니다. 실제 응용 프로그램의 경우 각 작업자는 다른 컴퓨터에 있습니다.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG 환경 변수를 재설정하면 나중에 이에 대한 자세한 내용을 볼 수 있습니다.

os.environ.pop('TF_CONFIG', None)

현재 디렉토리가 파이썬의 경로에 있는지 확인하십시오. 이렇게하면 노트북이 나중에 %%writefile 의해 작성된 파일을 가져올 수 있습니다.

if '.' not in sys.path:
  sys.path.insert(0, '.')

이제 TensorFlow를 가져옵니다.

import tensorflow as tf

데이터 세트 및 모델 정의

다음으로 간단한 모델 및 데이터 세트 설정으로 mnist.py 파일을 만듭니다. 이 python 파일은이 자습서의 작업자 프로세스에서 사용됩니다.

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

다중 작업자 구성

이제 다중 작업자 교육의 세계로 들어 갑시다. TensorFlow에서 TF_CONFIG 환경 변수는 각기 다른 역할을 할 수있는 여러 머신에서 학습하는 데 필요합니다. 아래에 사용 된 TF_CONFIG 는 클러스터의 일부인 각 작업자에서 클러스터 구성을 지정하는 데 사용되는 JSON 문자열입니다. 이것은 cluster_resolver.TFConfigClusterResolver 사용하여 클러스터를 지정하는 기본 방법이지만, distribute.cluster_resolver 모듈에서 사용할 수있는 다른 옵션이 있습니다.

클러스터 설명

다음은 구성 예입니다.

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

다음은 JSON 문자열로 직렬화 된 동일한 TF_CONFIG .

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG 에는 clustertask 의 두 가지 구성 요소가 있습니다.

  • cluster 는 모든 작업자에 대해 동일하며 worker 와 같은 다양한 유형의 작업으로 구성된 dict 인 훈련 클러스터에 대한 정보를 제공합니다. 다중 작업자 교육에 MultiWorkerMirroredStrategy , 일반적으로 하나 개가 worker 체크 포인트를 저장하고 정기적 것 외에 TensorBoard에 대한 요약 파일을 작성하는 등의 좀 더 책임을 취 worker 않습니다. 이러한 노동자가라고 chief 노동자, 그리고이 것이 관례 workerindex 0 수석으로 임명되는 worker (사실이 어떻게 tf.distribute.Strategy 구현된다).

  • task 은 현재 task 대한 정보를 제공하며 작업자마다 다릅니다. 해당 작업자의 typeindex 을 지정합니다.

이 예에서는 작업 type"worker" 로 설정하고 작업 index0 합니다. 이 기계는 최초의 작업자로 최고 작업자로 임명되어 다른 기계보다 더 많은 작업을 수행합니다. 다른 컴퓨터도 TF_CONFIG 환경 변수를 설정해야하며 동일한 cluster 사전을 가져야하지만 해당 컴퓨터의 역할에 따라 다른 작업 type 또는 작업 index 있어야합니다.

설명을 위해이 가이드에서는 localhost 작업자 2 TF_CONFIG 를 설정하는 방법을 보여줍니다. 실제로 사용자는 외부 IP 주소 / 포트에 여러 작업자를 만들고 각 작업자에 대해 TF_CONFIG 를 적절하게 설정합니다.

이 예에서는 작업자 2 개를 사용하고 첫 번째 작업자의 TF_CONFIG 가 위에 표시됩니다. 두 번째 작업자의 경우 tf_config['task']['index']=1

위의 tf_config 는 파이썬의 지역 변수 tf_config 입니다. 학습을 구성하는 데 실제로 사용하려면이 사전을 JSON으로 직렬화하고 TF_CONFIG 환경 변수에 배치 TF_CONFIG 합니다.

노트북의 환경 변수 및 하위 프로세스

하위 프로세스는 부모로부터 환경 변수를 상속합니다. 따라서이 jupyter notebook 프로세스에서 환경 변수를 설정하면 :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

하위 프로세스에서 환경 변수에 액세스 할 수 있습니다.

echo ${GREETINGS}
Hello TensorFlow!

다음 섹션에서는이를 사용하여 TF_CONFIG 를 작업자 하위 프로세스에 전달합니다. 실제로 이런 방식으로 작업을 시작하지는 않지만이 자습서의 목적에는 충분합니다. 최소 다중 작업자 예제를 보여주기 위해.

MultiWorkerMirrored 전략

모델을 학습 시키려면 tf.distribute.MultiWorkerMirroredStrategy 의 인스턴스를 사용합니다.이 인스턴스는 모든 작업자에 걸쳐 각 기기의 모델 레이어에있는 모든 변수의 복사본을 만듭니다. tf.distribute.Strategy 가이드 에이 전략에 대한 자세한 내용이 있습니다.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

tf.distribute.Strategy.scope 를 사용하여 모델을 빌드 할 때 전략을 사용해야 함을 지정합니다. 이렇게하면이 전략에 대한 " 교차 복제 컨텍스트 "가됩니다. 이는 전략이 가변 배치와 같은 것을 제어한다는 것을 의미합니다.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

작업자간에 데이터 자동 분할

다중 작업자 교육에서 데이터 세트 분할이 반드시 필요하지는 않지만 정확히 한 번 의미 체계를 제공하여 더 많은 교육을 재현 가능하게 만듭니다. 즉, 여러 작업자에 대한 교육은 한 작업자에 대한 교육과 동일해야합니다. 참고 : 경우에 따라 성능이 영향을받을 수 있습니다.

참조 : distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

사용자 지정 훈련 루프 정의 및 모델 훈련

최적화 프로그램 지정

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

tf.function 으로 훈련 단계 정의

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

체크 포인트 저장 및 복원

커스텀 트레이닝 루프에서 체크 포인트를 구현하려면 사용자가 keras 콜백을 사용하는 대신이를 처리해야합니다. 전체 모델을 저장하지 않고도 모델의 가중치를 저장하고 복원 할 수 있습니다.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

여기에서 모델을 추적하는 하나의 tf.train.Checkpoint 를 생성합니다. 모델은 tf.train.CheckpointManager 의해 관리되므로 최신 체크 포인트 만 유지됩니다.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

이제 복원이 필요할 때 편리한 tf.train.latest_checkpoint 함수를 사용하여 저장된 최신 체크 포인트를 찾을 수 있습니다.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

체크 포인트를 복원 한 후 사용자 지정 교육 루프 교육을 계속할 수 있습니다.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.835826, train_loss: 0.532438.
Epoch: 1, accuracy: 0.940402, train_loss: 0.201307.
Epoch: 2, accuracy: 0.959821, train_loss: 0.136761.

작업자에 대한 전체 코드 설정

실제로 MultiWorkerMirroredStrategy 로 실행하려면 작업자 프로세스를 실행하고 TF_CONFIG 를 전달해야합니다.

이전에 작성한 mnist.py 파일과 마찬가지로 여기에이 colab에서 이전에 단계별로 살펴본 것과 동일한 코드가 포함 된 main.py 가 있습니다. 파일에 작성하여 각 작업자가 실행할 수 있습니다.

파일 : main.py

Writing main.py

교육 및 평가

현재 디렉토리에는 이제 두 Python 파일이 모두 포함됩니다.

ls *.py
main.py
mnist.py

따라서 TF_CONFIG json-serialize하고 환경 변수에 추가합니다.

os.environ['TF_CONFIG'] = json.dumps(tf_config)

이제 main.py 를 실행하고 TF_CONFIG 사용할 작업자 프로세스를 시작할 수 있습니다.

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

위의 명령에 대해 몇 가지 유의해야 할 사항이 있습니다.

  1. 일부 bash 명령을 실행하기 위해 노트북 "마법"인 %%bash 를 사용합니다.
  2. 이 작업자는 종료되지 않기 때문에 --bg 플래그를 사용하여 백그라운드에서 bash 프로세스를 실행합니다. 시작하기 전에 모든 작업자를 기다립니다.

백그라운드 작업자 프로세스는이 노트북에 출력을 인쇄하지 않으므로 &> 는 출력을 파일로 리디렉션하므로 어떤 일이 발생했는지 확인할 수 있습니다.

따라서 프로세스가 시작될 때까지 몇 초 정도 기다리십시오.

import time
time.sleep(20)

이제 지금까지 작업자의 로그 파일에 출력 된 내용을 살펴보십시오.

cat job_0.log
2021-05-19 11:28:34.994731: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-05-19 11:28:36.180646: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-05-19 11:28:36.188591: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-05-19 11:28:36.188631: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1521845
2021-05-19 11:28:36.188639: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1521845
2021-05-19 11:28:36.188728: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.19.1
2021-05-19 11:28:36.188755: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.19.1
2021-05-19 11:28:36.188762: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.19.1
2021-05-19 11:28:36.189315: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-05-19 11:28:36.194547: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-05-19 11:28:36.195001: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

로그 파일의 마지막 줄은 다음 Started server with target: grpc://localhost:12345 같아야합니다. Started server with target: grpc://localhost:12345 . 이제 첫 번째 작업자가 준비되었으며 다른 모든 작업자가 진행할 준비가 될 때까지 기다리고 있습니다.

따라서 두 번째 작업자의 프로세스가 선택할 수 있도록 tf_config 를 업데이트합니다.

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

이제 두 번째 작업자를 시작하십시오. 모든 작업자가 활동 중이므로 교육이 시작됩니다 (따라서이 프로세스를 배경으로 할 필요가 없습니다).

python main.py > /dev/null 2>&1

이제 첫 번째 작업자가 작성한 로그를 다시 확인하면 해당 모델이 학습에 참여했음을 알 수 있습니다.

cat job_0.log
2021-05-19 11:28:34.994731: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-05-19 11:28:36.180646: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-05-19 11:28:36.188591: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-05-19 11:28:36.188631: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1521845
2021-05-19 11:28:36.188639: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1521845
2021-05-19 11:28:36.188728: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.19.1
2021-05-19 11:28:36.188755: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.19.1
2021-05-19 11:28:36.188762: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.19.1
2021-05-19 11:28:36.189315: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-05-19 11:28:36.194547: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-05-19 11:28:36.195001: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-05-19 11:28:57.264178: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-05-19 11:28:57.264494: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000189999 Hz
Epoch: 0, accuracy: 0.821763, train_loss: 0.589336.
Epoch: 1, accuracy: 0.927679, train_loss: 0.239795.
Epoch: 2, accuracy: 0.946987, train_loss: 0.171661.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

다중 작업자 심층 교육

이 자습서는 다중 작업자 설정의 Custom Training Loop 워크 플로를 보여줍니다. 다른 주제에 대한 자세한 설명은 multi-worker 설정의 model.fit's guide 에서 사용할 수 있으며 CTL에 적용 할 수 있습니다.

또한보십시오

  1. TensorFlow의 분산 학습 가이드는 사용 가능한 배포 전략에 대한 개요를 제공합니다.
  2. 여러 배포 전략을 실행하도록 구성 할 수있는 공식 모델 .
  3. 가이드의 성능 섹션에서는 TensorFlow 모델의 성능을 최적화하는 데 사용할 수있는 다른 전략 및 도구 에 대한 정보를 제공합니다.