분산 입력

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

tf.distribute API는 사용자가 단일 머신에서 여러 머신으로 훈련을 쉽게 확장하는 방법을 제공합니다. 모델을 확장할 때 사용자는 입력을 여러 기기로 분산해야 합니다. tf.distribute는 입력을 기기에 자동으로 분산할 수 있는 API를 제공합니다.

이 가이드는 tf.distribute API를 사용하여 분산 데이터세트 및 반복기를 생성할 수 있는 다양한 방법을 보여줍니다. 또한 다음 주제들을 다룹니다.

이 가이드는 Keras API를 사용한 분산 입력 사용법에 대해서는 다루지 않습니다.

분산 데이터세트

tf.distribute API를 사용하여 조정하려면 사용자가 tf.data.Dataset을 사용하여 입력을 나타내는 것이 좋습니다. tf.distributetf.data.Dataset(예: 각 가속기 기기에 데이터 자동 프리페치)에서 효율적으로 작동하도록 만들어졌으며 성능 최적화가 구현에 정기적으로 통합되었습니다. tf.data.Dataset 외에 다른 사용에 대한 사용 사례가 있는 경우 나중에 이 가이드의 섹션을 참고하세요. 비분산 훈련 루프에서 사용자는 먼저 tf.data.Dataset 인스턴스를 만든 다음 요소를 반복합니다. 예를 들면 다음과 같습니다.

# Import TensorFlow
!pip install -q tf-nightly
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

사용자가 존재하는 코드를 최소한으로 변경하면서 tf.distribute 전략을 사용할 수 있도록 tf.data.Dataset 인스턴스를 배포하고 분산 데이터세트 객체를 반환하는 두 개의 API가 도입되었습니다. 그런 다음 사용자는 이 분산 데이터세트 인스턴스를 반복하고 이전과 같이 모델을 훈련할 수 있습니다. 이제 두 가지 API tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.experimental_distribute_datasets_from_function를 자세히 살펴보겠습니다.

tf.distribute.Strategy.experimental_distribute_dataset

사용법

이 API는 tf.data.Dataset 인스턴스를 입력으로 받고 tf.distribute.DistributedDataset 인스턴스를 반환합니다. 입력 데이터세트를 전역 배치 크기와 동일한 값으로 배치해야 합니다. 이 전역 배치 크기는 모든 기기에서 1스텝에서 처리하려는 샘플의 수입니다. 이 분산 데이터세트를 Python 방식으로 반복하거나 iter를 사용하여 반복기를 작성할 수 있습니다. 반환된 객체는 tf.data.Dataset 인스턴스가 아니며 데이터세트를 변환하거나 검사하는 다른 API를 지원하지 않습니다. 다른 복제본에 대해 입력을 샤딩하려는 특정 방법이 없는 경우 권장되는 API입니다.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

속성

배치 처리

tf.distribute는 입력된 tf.data.Dataset 인스턴스를 전역 배치 크기와 동기화된 복제본 수로 나눈 새 배치 크기로 다시 배치합니다. 동기화 중인 복제본의 수는 훈련 중에 그래디언트 올리듀스(allreduce)에 참여하는 기기의 수와 같습니다. 사용자가 분산 반복기에서 next를 호출하면 복제본마다 배치 크기의 데이터가 각 복제본에 반환됩니다. 다시 배치된 데이터세트 카디널리티는 항상 여러 복제본의 배수입니다. 다음은 몇 가지 예입니다.

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • 분산 없음:

      • 배치 1: [0, 1, 2, 3]
      • 배치 2: [4, 5]
    • 2개 복제본에서 분산 포함. 마지막 배치 ([4, 5])는 2개의 복제본으로 분할됩니다.

      • 배치 1:
        • 복제본 1:[0, 1]
        • 복제본 2:[2, 3]
      • 배치 2:
        • 복제본 2: [4]
        • 복제본 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • 분산 없음:
      • 배치 1: [[0], [1], [2], [3]]
    • 5개 복제본에서 분산:
      • 배치 1:
        • 복제본 1: [0]
        • 복제본 2: [1]
        • 복제본 3: [2]
        • 복제본 4: [3]
        • 복제본 5: []
  • tf.data.Dataset.range(8).batch(4)

    • 분산 없음:
      • 배치 1: [0, 1, 2, 3]
      • 배치 2: [4, 5, 6, 7]
    • 3개 복제본에서 분산:
      • 배치 1:
        • 복제본 1: [0, 1]
        • 복제본 2: [2, 3]
        • 복제본 3: []
      • 배치 2:
        • 복제본 1: [4, 5]
        • 복제본 2: [6, 7]
        • 복제본 3: []

참고 : 위의 예는 전역 배치가 다른 복제본에서 분할되는 방법만 보여줍니다. 구현에 따라 각 복제본은 변경될 수 있으므로 각 복제본에서 발생 수 있는 실제 값에 의존하지 않는 것이 좋습니다.

데이터세트를 다시 일괄 처리하면 복제본 수에 비례하여 공간 복잡성이 선형적으로 증가합니다. 이는 다중 작업자 훈련 사용 사례에서 입력 파이프라인에 OOM 오류가 발생할 수 있음을 의미합니다.

샤딩

tf.distribute는 또한 다중 작업자 훈련에서 입력 데이터세트를 자동 샤딩합니다. 작업자의 CPU 기기에 각 데이터세트가 생성됩니다. 작업자 집합에 대한 데이터세트를 자동 샤딩하면 각 작업자에게 전체 데이터세트의 하위 세트가 할당됩니다(올바른 tf.data.experimental.AutoShardPolicy가 설정된 경우). 이는 각 스텝에서 겹치지 않는 데이터세트 요소의 전역 배치 크기가 각 작업자에 의해 처리되도록 합니다. 자동 샤딩에는 tf.data.experimental.DistributeOptions을 사용해 지정할 수 있는 몇 개의 다양한 옵션이 있습니다.

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

tf.data.experimental.AutoShardPolicy에 대해서 세 가지 다른 옵션을 설정할 수 있습니다.

  • AUTO: 기본 옵션으로 FILE별 샤딩 시도가 이루어짐을 의미합니다. 파일 기반 데이터세트가 탐지되지 않으면 FILE별 샤딩 시도가 실패합니다. 그러면 tf.distribute가 DATA별 샤딩으로 폴백합니다. 입력 데이터세트가 파일 기반이지만 파일 수가 작업자 수보다 적으면, InvalidArgumentError가 ​​발생합니다. 이 경우, 정책을 명시적으로 AutoShardPolicy.DATA로 설정하거나 파일 수가 작업자 수보다 많도록 입력 소스를 더 작은 파일로 분할합니다.

  • FILE: 모든 작업자에 대해 입력 파일을 샤딩하려는 경우 사용하는 옵션입니다. 입력 파일의 수가 작업자의 수보다 훨씬 많고 파일의 데이터가 균등하게 분산된 경우, 이 옵션을 사용해야 합니다. 이 옵션의 단점은 파일의 데이터가 균등하게 분산되어 있지 않으면 유휴 작업자가 생긴다는 것입니다. 파일의 수가 작업자의 수보다 적으면 InvalidArgumentError가 발생합니다. 이 경우에는 정책을 명시적으로 AutoShardPolicy.DATA로 설정합니다. 예를 들어, 각각 1개의 복제본이 있는 두 작업자에 2개의 파일을 배포해 보겠습니다. 파일 1에는 [0, 1, 2, 3, 4, 5]가 포함되고 파일 2에는 [6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로, 전역 배치 크기를 4로 설정합니다.

    • 작업자 0:
      • 배치 1 = 복제본 1: [0, 1]
      • 배치 2 = 복제본 1: [2, 3]
      • 배치 3 = 복제본 1: [4]
      • 배치 4 = 복제본 1: [5]
    • 작업자 1:
      • 배치 1 = 복제본 2: [6, 7]
      • 배치 2 = 복제본 2: [8, 9]
      • 배치 3 = 복제본 2: [10]
      • 배치 4 = 복제본 2: [11]
  • DATA: 모든 작업자에 걸쳐 요소를 자동 샤딩합니다. 각 작업자는 전체 데이터세트를 읽고 할당된 샤드만 처리합니다. 다른 모든 샤드는 삭제됩니다. 이 옵션은 일반적으로 입력 파일의 수가 작업자의 수보다 적고 모든 작업자에서 데이터를 더 잘 샤딩하려는 경우에 사용됩니다. 단점은 각 작업자에서 전체 데이터세트를 읽는다는 것입니다. 예를 들어, 두 작업자에 1개의 파일을 배포해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로 설정합니다.

    • 작업자 0:
      • 배치 1 = 복제본 1: [0, 1]
      • 배치 2 = 복제본 1: [4, 5]
      • 배치 3 = 복제본 1: [8, 9]
    • 작업자 1:
      • 배치 1 = 복제본 2: [2, 3]
      • 배치 2 = 복제본 2: [6, 7]
      • 배치 3 = 복제본 2: [10, 11]
  • OFF: 자동 샤딩을 끄면 각 작업자가 모든 데이터를 처리합니다. 예를 들어, 두 작업자에 1개의 파일을 배포해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로 설정합니다. 그러면 각 작업자는 다음과 같은 분산을 볼 수 있습니다.

    • 작업자 0:

      • 배치 1 = 복제본 1: [0, 1]
      • 배치 2 = 복제본 1: [2, 3]
      • 배치 3 = 복제본 1: [4, 5]
      • 배치 4 = 복제본 1: [6, 7]
      • 배치 5 = 복제본 1: [8, 9]
      • 배치 6 = 복제본 1: [10, 11]
    • 작업자 1:

      • 배치 1 = 복제본 2: [0, 1]
      • 배치 2 = 복제본 2: [2, 3]
      • 배치 3 = 복제본 2: [4, 5]
      • 배치 4 = 복제본 2: [6, 7]
      • 배치 5 = 복제본 2: [8, 9]
      • 배치 6 = 복제본 2: [10, 11]
프리페치

기본적으로 tf.distribute는 사용자 제공 tf.data.Dataset 인스턴스의 끝에 프리페치 변환을 추가합니다. 프리페치 변환에 대한 인수 buffer_size는 동기화 중인 복제본의 수와 같습니다.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

사용법

이 API는 입력 함수를 받고 tf.distribute.DistributedDataset 인스턴스를 반환합니다. 사용자가 전달하는 입력 함수는tf.distribute.InputContext 인수를 갖고 tf.data.Dataset 인스턴스를 반환해야 합니다. 이 API를 사용하면 tf.distribute는 더 이상 입력 함수로부터 반환된 사용자의 tf.data.Dataset 인스턴스를 변경하지 않습니다. 데이터세트를 배치하고 샤딩하는 것은 사용자가 해야 합니다. tf.distribute는 작업자 각각의 CPU 기기에서 입력 함수를 호출합니다. 사용자가 자체 배치 처리 및 샤딩 로직을 지정할 수 있게 해주는 것 이외에도, 다중 작업자 훈련을 사용할 때 tf.distribute.Strategy.experimental_distribute_dataset과 비교하여 확장성과 성능이 더 우수합니다.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From <ipython-input-1-fae0baaaedb8>:12: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function

속성

배치 처리

입력 함수의 반환 값인 tf.data.Dataset 인스턴스는 복제본별 배치 크기를 사용하여 배치해야 합니다. 복제본별 배치 크기는 전역 배치 크기를 동기화 훈련에 참여하는 복제본의 수로 나눈 값입니다. tf.distribute가 각 작업자의 CPU 기기에서 입력 함수를 호출하기 때문입니다. 지정된 작업자에서 생성된 데이터세트는 해당 작업자의 모든 복제본에서 사용할 수 있어야 합니다.

샤딩

사용자의 입력 함수에 대한 인수로 암시적으로 전달되는 tf.distribute.InputContext 객체는 내부에서 tf.distribute 에 의해 생성됩니다. 작업자 수, 현재 작업자 ID 등에 대한 정보가 있습니다.이 입력 함수는 tf.distribute.InputContext 오브젝트의 일부인 이러한 특성을 사용하여 사용자가 설정 한 정책에 따라 샤딩을 처리 할 수 있습니다.

프리페치

tf.distribute 는 사용자가 제공한 입력 함수가 반환한 tf.data.Dataset 의 끝에 프리페치 변환을 추가하지 않습니다.

참고 : tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.experimental_distribute_datasets_from_function tf.data.Dataset 유형이 아닌 tf.distribute.DistributedDataset 인스턴스를 반환합니다. Distributed Iterators 섹션에 표시된 대로 이러한 인스턴스를 반복하고 element_spec 속성을 사용할 수 있습니다.

분산 반복기

비분산 tf.data.Dataset 인스턴스와 유사하게, tf.distribute.DistributedDataset에서 요소에 접근하여 반복하려면 tf.distribute.DistributedDataset 인스턴스에 반복기를 생성해야 합니다. 다음은 tf.distribute.DistributedIterator를 생성하고 모델을 훈련할 때 사용하는 방법입니다.

사용법

Python 같은 루프 구성 사용하기

Python 같은 사용자 친화적인 루프를 사용하여 tf.distribute.DistributedDataset를 반복할 수 있습니다. tf.distribute.DistributedIterator에서 반환된 요소는 단일 tf.Tensor 또는 복제본별 값을 포함하는 tf.distribute.DistributedValues입니다. tf.function 내에 루프를 배치하면 성능이 향상됩니다. 그러나 tf.function 내부에 배치된 tf.distribute.DistributedDataset를 통한 루프는 현재 breakreturn이 지원되지 않습니다. .function {/ code8}. tf.distribute.experimental.MultiWorkerMirroredStrategytf.distribute.TPUStrategy와 같은 다중 작업자 전략을 사용할 때, tf.function 내에 루프를 배치하는 것도 지원하지 않습니다. 단일 작업자 tf.distribute.TPUStrategy를 위해 tf.function 내에 루프를 배치하는 것은 동작하지만, TPU pod를 사용할 때는 동작하지 않습니다.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

iter를 사용하여 명시적인 반복기 만들기

tf.distribute.DistributedDataset 인스턴스의 요소를 반복하기 위해 iter API를 사용하여 tf.distribute.DistributedIterator를 생성할 수 있습니다. 명시적인 반복기를 사용하면 고정된 수의 스텝을 반복할 수 있습니다. tf.distribute.DistributedIterator 인스턴스 dist_iterator에서 다음 요소를 가져오려면 next(dist_iterator), dist_iterator.get_next() 또는 dist_iterator.get_next_as_optional()을 호출할 수 있습니다. 앞의 두 개는 본질적으로 동일합니다.

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

next() 또는 tf.distribute.DistributedIterator.get_next()를 사용하여 tf.distribute.DistributedIterator의 끝에 도달하면 OutOfRange 오류가 발생합니다. 클라이언트는 Python 측에서 오류를 포착하고 체크포인트 및 평가와 같은 다른 작업을 계속할 수 있습니다. 그러나 호스트 훈련 루프를 사용하는 경우(예: tf.function당 여러 스텝 실행) 다음과 같이 작동하지 않습니다.

@tf.function def train_fn(iterator):   for _ in tf.range(steps_per_loop):     strategy.run(step_fn, args=(next(iterator),))

train_fn은 스텝 본문을 tf.range안에 배치하여 여러 스텝을 래핑합니다. 이 경우 종속성이 없는 루프에서 다른 반복이 병렬로 시작될 수 있으므로 이전 반복 계산이 완료되기 전에 이후 반복에서 OutOfRange 오류가 트리거될 수 있습니다. OutOfRange 오류가 발생하면 함수의 모든 op가 즉시 종료됩니다. 이런 경우를 피하려면 OutOfRange 오류를 발생시키지 않는 대안은 tf.distribute.DistributedIterator.get_next_as_optional()입니다. get_next_as_optional은 다음 요소를 포함하거나 tf.distribute.DistributedIterator가 끝에 도달한 경우 값이 없는 tf.experimental.Optional을 반환합니다.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

element_spec 속성 사용

분산된 데이터세트의 요소를 tf.function으로 전달하여 tf.TypeSpec 보장을 원할 경우, tf.functioninput_signature 인수를 지정합니다. 분산 데이터세트의 출력은 tf.distribute.DistributedValues이며 단일 기기 또는 여러 기기에 대한 입력을 나타낼 수 있습니다. 이 분산 값에 해당하는 tf.TypeSpec을 가져오려면 분산 데이터세트 또는 분산 반복기 객체의 element_spec 속성을 사용할 수 있습니다.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

부분 배치

사용자가 생성한 tf.data.Dataset 인스턴스에 복제본의 수로 균등하게 나눌 수 없는 배치 크기가 포함되어 있거나 데이터세트 인스턴스의 카디널리티가 배치 크기로 나눌 수 없는 경우 부분 배치가 발생합니다. 이는 데이터세트가 여러 복제본에 분산될 때 일부 반복기에 대한 next 호출로 OutOfRangeError가 발생함을 의미합니다. 이 사용 사례를 처리하기 위해 tf.distribute는 처리할 데이터가 더 이상 없는 복제본에서 배치 크기가 0인 더미 배치를 반환합니다.

단일 작업자 사례의 경우 반복기에서 next 호출로 데이터가 반환되지 않으면 배치 크기가 0인 더미 배치가 작성되어 데이터세트의 실제 데이터와 함께 사용됩니다. 부분 배치의 경우 마지막 전역 배치 데이터에는 더미 배치 데이터와 함께 실제 데이터가 포함됩니다. 데이터 처리를 위한 중지 조건이 이제 복제본에 데이터가 있는지 확인합니다. 복제본에 데이터가 없으면 OutOfRange 오류가 발생합니다.

다중 작업자 사례의 경우, 각 작업자에서 데이터의 존재를 나타내는 boolean 값은 교차 복제본 통신을 사용하여 집계하며, 이는 모든 작업자가 분산 데이터세트의 처리를 완료했는지 식별하는 데 사용됩니다. 여기에는 작업자 간 의사소통이 포함되므로 성능에 약간의 불이익이 따릅니다.

경고 사항

  • 다중 작업자 설정에서 tf.distribute.Strategy.experimental_distribute_dataset API를 사용할 때, 사용자는 파일로부터 읽은 tf.data.Dataset을 전달합니다. tf.data.experimental.AutoShardPolicyAUTO 또는 FILE로 설정된 경우, 스텝당 실제 배치 사이즈는 사용자가 정의한 전역 배치 크기보다 더 작을 수 있습니다. 파일에 남아 있는 요소가 전역 배치 크기보다 더 적을 때 이런 경우가 발생할 수 있습니다. 사용자는 실행 스텝의 수에 의존하지 않고 데이터세트를 소진하거나 tf.data.experimental.AutoShardPolicyDATA로 설정하여 문제를 해결할 수 있습니다.

  • 상태 저장 데이터세트 변환은 현재 tf.distribute에서 지원되지 않으며 데이터세트에 있을 수 있는 상태 저장 연산은 현재 무시됩니다. 예를 들어, 데이터세트에 map_fn을 사용하여 이미지를 회전시키는 tf.random.uniform이 있는 경우, Python 프로세스가 실행되는 로컬 머신의 상태(예: 임의 시드)에 의존하는 데이터세트 그래프가 있습니다.

  • tf.distribute와 함께 사용될 때와 같이, 특정 컨텍스트에서 기본적으로 비활성화되어 있는 실험적인 tf.data.experimental.OptimizationOptions은 성능 저하를 유발합니다. 배포 설정에서 워크로드의 성능 향상이 확인된 후에만 이 옵션을 활성하세요.

  • tf.distribute.experimental_distribute_dataset 또는 tf.distribute.experimental_distribute_datasets_from_function을 사용할 때 작업자에 의해 처리되는 데이터의 순서는 보장되지 않습니다. 일반적으로 tf.distribute을 사용하여 예측을 조정하는 경우 요구됩니다. 하지만 배치의 각 요소에 인덱스를 삽입하고 그에 맞게 출력을 정렬하면 됩니다. 다음은 출력을 정렬하는 방법에 대한 예제 코드 조각입니다.

참고: 편의상 tf.distribute.MirroredStrategy()를 사용합니다. 여러 작업자를 사용하고 tf.distribute.MirroredStrategy를 사용하여 단일 작업자에게 훈련을 배포하는 경우에만 입력 순서를 다시 지정하면 됩니다.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

# 표준 tf.data.Dataset 인스턴스를 사용하지 않는 경우 데이터를 어떻게 배포하나요?

때때로 사용자가 tf.data.Dataset을 사용하여 입력을 나타내고 이후에 언급한 API를 사용하여 데이터 세트를 여러 기기에 분배할 수 없습니다. 이런 경우 생성기의 원시 텐서 또는 입력을 사용할 수 있습니다.

임의의 텐서 입력에 experiment_distribute_values_from_function 사용하기

strategy.runnext(iterator)의 출력인 tf.distribute.DistributedValues를 허용합니다. 텐서 값을 전달하려면 experimental_distribute_values_from_function을 사용하여 원시 텐서에서 tf.distribute.DistributedValues를 구성합니다.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

생성기에서 입력한 경우 tf.data.Dataset.from_generator 사용하기

사용하려는 생성기 함수가 있는 경우, from_generator API를 사용하여 tf.data.Dataset 인스턴스를 생성할 수 있습니다.

참고: 현재 tf.distribute.TPUStrategy에서는 지원하지 않습니다.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)