이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

분산 입력

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

tf.distribute API는 사용자가 교육을 단일 머신에서 여러 머신으로 쉽게 확장 할 수있는 방법을 제공합니다. 모델을 확장 할 때 사용자는 입력을 여러 장치에 분산해야합니다. tf.distribute 는 장치간에 입력을 자동으로 배포 할 수있는 API를 제공합니다.

이 가이드는 tf.distribute API를 사용하여 분산 데이터 세트 및 반복기를 생성 할 수있는 다양한 방법을 보여줍니다. 또한 다음 주제를 다룹니다.

이 가이드에서는 Keras API를 사용한 분산 입력 사용에 대해서는 다루지 않습니다.

분산 데이터 세트

tf.distribute API를 사용하여 확장하려면 사용자가tf.data.Dataset 을 사용하여 입력을 나타내는 것이 좋습니다. tf.distributetf.data.Dataset (예 : 각 가속기 장치에 대한 데이터 자동 프리 페치)과 효율적으로 작동하도록 만들어졌으며 성능 최적화가 정기적으로 구현에 통합되었습니다. 당신이 아닌 다른 사용에 대한 사용 사례가있는 경우tf.data.Dataset 나중에 참조하시기 바랍니다 섹션 이 설명서에. 비 분산 학습 루프에서 사용자는 먼저tf.data.Dataset 인스턴스를 만든 다음 요소를 반복합니다. 예를 들면 :

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

사용자가 사용자의 기존 코드를 최소한으로 변경하여 tf.distribute 전략을 사용할 수 있도록tf.data.Dataset 인스턴스를 배포하고 분산 된 데이터 세트 객체를 반환하는 두 개의 API가 도입되었습니다. 그런 다음 사용자는이 분산 된 데이터 세트 인스턴스를 반복하고 이전과 같이 모델을 훈련 할 수 있습니다. 이제 tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.experimental_distribute_datasets_from_function 의 두 API를 자세히 살펴 보겠습니다.

tf.distribute.Strategy.experimental_distribute_dataset

용법

이 API는 소요tf.data.Dataset 입력으로 인스턴스를하고 돌려 tf.distribute.DistributedDataset 인스턴스를. 전역 배치 크기와 동일한 값으로 입력 데이터 세트를 배치해야합니다. 이 전역 배치 크기는 모든 장치에서 1 단계로 처리하려는 샘플 수입니다. 이 분산 데이터 세트를 Pythonic 방식으로 반복하거나 iter 사용하여 반복기를 만들 수 있습니다. 반환 된 객체는tf.data.Dataset 인스턴스가 아니며 어떤 방식 으로든 데이터 세트를 변환하거나 검사하는 다른 API를 지원하지 않습니다. 다른 복제본을 통해 입력을 분할하려는 특정 방법이없는 경우 권장되는 API입니다.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

속성

일괄 처리

tf.distribute 는 동기화 된 복제본 수로 나눈 전역 배치 크기와 동일한 새 배치 크기로 입력tf.data.Dataset 인스턴스를 다시 배치합니다. 동기화 된 복제본의 수는 훈련 중에 Gradient AllReduce에 참여하는 디바이스의 수와 같습니다. 사용자가 분산 반복기에서 next 을 호출하면 각 복제본에서 복제 본당 일괄 처리 데이터 크기가 반환됩니다. 다시 배치 된 데이터 세트 카디널리티는 항상 복제본 수의 배수입니다. 다음은 몇 가지 예입니다.

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • 배포하지 않고 :
    • 배치 1 : [0, 1, 2, 3]
    • 배치 2 : [4, 5]
    • 2 개의 복제본을 통해 배포합니다. 마지막 배치 ([4, 5])는 2 개의 복제본으로 분할됩니다.

    • 배치 1 :

      • 복제본 1 : [0, 1]
      • 복제본 2 : [2, 3]
    • 배치 2 :

      • 복제 2 : [4]
      • 복제 2 : [5]
  • tf.data.Dataset.range(4).batch(4)

    • 배포하지 않고 :
    • 배치 1 : [[0], [1], [2], [3]]
    • 5 개의 복제본을 통한 배포 :
    • 배치 1 :
      • 복제본 1 : [0]
      • 복제본 2 : [1]
      • 복제본 3 : [2]
      • 복제 4 : [3]
      • 복제본 5 : []
  • tf.data.Dataset.range(8).batch(4)

    • 배포하지 않고 :
    • 배치 1 : [0, 1, 2, 3]
    • 배치 2 : [4, 5, 6, 7]
    • 3 개의 복제본을 통한 배포 :
    • 배치 1 :
      • 복제본 1 : [0, 1]
      • 복제본 2 : [2, 3]
      • 복제본 3 : []
    • 배치 2 :
      • 복제본 1 : [4, 5]
      • 복제 2 : [6, 7]
      • 복제본 3 : []

데이터 세트를 다시 배치하면 복제본 수에 따라 선형 적으로 증가하는 공간 복잡성이 있습니다. 즉, 다중 작업자 교육 사용 사례의 경우 입력 파이프 라인에 OOM 오류가 발생할 수 있습니다.

샤딩

tf.distribute 는 또한 다중 작업자 훈련에서 입력 데이터 세트를 자동 샤딩합니다. 각 데이터 세트는 작업자의 CPU 장치에 생성됩니다. 작업자 집합에서 데이터 세트를 tf.data.experimental.AutoShardPolicy 한다는 것은 각 작업자에게 전체 데이터 세트의 하위 집합이 할당된다는 것을 의미합니다 (올바른 tf.data.experimental.AutoShardPolicy 가 설정된 경우). 이는 각 단계에서 겹치지 않는 데이터 세트 요소의 전역 배치 크기가 각 작업자에 의해 처리되도록하기위한 것입니다. tf.data.experimental.DistributeOptions 사용하여 지정할 수있는 몇 가지 다른 옵션이 있습니다.

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

tf.data.experimental.AutoShardPolicy 대해 설정할 수있는 세 가지 옵션이 있습니다.

  • AUTO : 기본 옵션으로 FILE에 의해 샤딩을 시도합니다. 파일 기반 데이터 세트가 감지되지 않으면 FILE로 분할하려는 시도가 실패합니다. 그런 다음 tf.distribute 는 DATA로 tf.distribute 됩니다. 입력 데이터 세트가 파일 기반이지만 파일 수가 작업자 수보다 적 으면 InvalidArgumentError 가 발생합니다. 이 경우 명시 적으로 정책을 AutoShardPolicy.DATA 설정하거나 파일 수가 작업자 수보다 많도록 입력 소스를 더 작은 파일로 분할합니다.
  • FILE : 모든 작업자에 대해 입력 파일을 분할하려는 경우 옵션입니다. 입력 파일 수가 작업자 수보다 훨씬 많고 파일의 데이터가 균등하게 분산 된 경우이 옵션을 사용해야합니다. 이 옵션의 단점은 파일의 데이터가 균등하게 분산되지 않은 경우 유휴 작업자가 있다는 것입니다. 파일 수가 작업자 수보다 적 으면 InvalidArgumentError 가 발생합니다. 이 경우 정책을 명시 적으로 AutoShardPolicy.DATA 설정하십시오. 예를 들어 각각 1 개의 복제본이있는 2 명의 작업자에게 2 개의 파일을 배포 해 보겠습니다. 파일 1에는 [0, 1, 2, 3, 4, 5]가 포함되고 파일 2에는 [6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화 된 총 복제본 수를 2 개로, 전역 배치 크기를 4 개로 설정합니다.

    • 작업자 0 :
    • 배치 1 = 복제본 1 : [0, 1]
    • 배치 2 = 복제본 1 : [2, 3]
    • 배치 3 = 복제본 1 : [4]
    • 배치 4 = 복제본 1 : [5]
    • 작업자 1 :
    • 배치 1 = 복제본 2 : [6, 7]
    • 배치 2 = 복제본 2 : [8, 9]
    • 배치 3 = 복제본 2 : [10]
    • 배치 4 = 복제본 2 : [11]
  • 데이터 : 이것은 모든 작업자에 걸쳐 요소를 자동 샤딩합니다. 각 작업자는 전체 데이터 세트를 읽고 할당 된 샤드 만 처리합니다. 다른 모든 샤드는 삭제됩니다. 이는 일반적으로 입력 파일 수가 작업자 수보다 적고 모든 작업자에서 데이터를 더 잘 분할하려는 경우에 사용됩니다. 단점은 각 작업자에서 전체 데이터 세트를 읽는다는 것입니다. 예를 들어 2 명의 작업자에게 1 개의 파일을 배포 해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 있습니다. 동기화 된 총 복제본 수를 2로 둡니다.

    • 작업자 0 :
    • 배치 1 = 복제본 1 : [0, 1]
    • 배치 2 = 복제본 1 : [4, 5]
    • 배치 3 = 복제본 1 : [8, 9]
    • 작업자 1 :
    • 배치 1 = 복제본 2 : [2, 3]
    • 배치 2 = 복제본 2 : [6, 7]
    • 배치 3 = 복제본 2 : [10, 11]
  • 끄기 : 자동 샤딩을 끄면 각 작업자가 모든 데이터를 처리합니다. 예를 들어 2 명의 작업자에게 1 개의 파일을 배포 해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 있습니다. 동기화 된 총 복제본 수를 2로 둡니다. 그러면 각 작업자는 다음 배포를 볼 수 있습니다.

    • 작업자 0 :
    • 배치 1 = 복제본 1 : [0, 1]
    • 배치 2 = 복제본 1 : [2, 3]
    • 배치 3 = 복제본 1 : [4, 5]
    • 배치 4 = 복제본 1 : [6, 7]
    • 배치 5 = 복제본 1 : [8, 9]
    • 배치 6 = 복제본 1 : [10, 11]

    • 작업자 1 :

    • 배치 1 = 복제본 2 : [0, 1]

    • 배치 2 = 복제본 2 : [2, 3]

    • 배치 3 = 복제본 2 : [4, 5]

    • 배치 4 = 복제본 2 : [6, 7]

    • 배치 5 = 복제본 2 : [8, 9]

    • 배치 6 = 복제본 2 : [10, 11]

프리 페치

기본적으로 tf.distribute 는 사용자가 제공 한tf.data.Dataset 인스턴스의 끝에 프리 페치 변환을 추가합니다. buffer_size 프리 페치 변환에 대한 인수는 동기화 된 복제본 수와 같습니다.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

용법

이 API는 입력 함수를 받아 tf.distribute.DistributedDataset 인스턴스를 반환합니다. 사용자가 전달하는 입력 함수에는 tf.distribute.InputContext 인수가 있으며tf.data.Dataset 인스턴스를 반환해야합니다. 이 API를 사용하면 tf.distribute 는 입력 함수에서 반환 된 사용자의tf.data.Dataset 인스턴스를 더 이상 변경하지 않습니다. 데이터 세트를 일괄 처리하고 분할하는 것은 사용자의 책임입니다. tf.distribute 는 각 작업자의 CPU 장치에서 입력 함수를 호출합니다. 사용자가 자신의 일괄 처리 및 분할 논리를 지정할 수 있도록 허용하는 것 외에도이 API는 다중 작업자 교육에 사용될 때 tf.distribute.Strategy.experimental_distribute_dataset 비해 더 나은 확장 성과 성능을 보여줍니다.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

속성

일괄 처리

입력 함수의 반환 값인tf.data.Dataset 인스턴스는tf.data.Dataset 배치 크기를 사용하여 배치되어야합니다. 복제 본당 배치 크기는 전역 배치 크기를 동기화 학습에 참여하는 복제본 수로 나눈 값입니다. 이는 tf.distribute 가 각 작업자의 CPU 장치에서 입력 함수를 호출하기 때문입니다. 지정된 작업자에서 생성 된 데이터 세트는 해당 작업자의 모든 복제본에서 사용할 준비가되어 있어야합니다.

샤딩

사용자의 입력 함수에 대한 인수로 암시 적으로 전달되는 tf.distribute.InputContext 개체는 내부적으로 tf.distribute 에 의해 생성됩니다. 작업자 수, 현재 작업자 ID 등에 대한 정보가 있습니다.이 입력 함수는 tf.distribute.InputContext 객체의 일부인 이러한 속성을 사용하여 사용자가 설정 한 정책에 따라 샤딩을 처리 할 수 ​​있습니다.

프리 페치

tf.distribute 는 사용자 제공 입력 함수에서 반환 한tf.data.Dataset 끝에 프리 페치 변환을 추가하지 않습니다.

분산 반복기

비 분산 유사tf.data.Dataset 경우, 당신은에 반복자 만들어야합니다 tf.distribute.DistributedDataset 그것과 접근을 통해의 요소 반복 인스턴스를 tf.distribute.DistributedDataset . 다음은 tf.distribute.DistributedIterator 생성하고 tf.distribute.DistributedIterator 사용하여 모델을 학습시키는 방법입니다.

사용법

Pythonic for 루프 구문 사용

사용자 친화적 인 Pythonic 루프를 사용하여 tf.distribute.DistributedDataset 을 반복 할 수 있습니다. tf.distribute.DistributedIterator 에서 반환 된 요소는 단일 tf.Tensor 또는 복제 tf.distribute.DistributedValues 값을 포함하는 tf.distribute.DistributedValues 수 있습니다. tf.function 안에 루프를 배치하면 성능이 향상됩니다. 그러나, breakreturn 현재 이상 루프 지원되지 않습니다 tf.distribute.DistributedDataset (A)의 내부에 배치 tf.function . 또한 tf.distribute.experimental.MultiWorkerMirroredStrategytf.distribute.TPUStrategy 와 같은 다중 작업자 전략을 사용할 때 tf.function 내부에 루프를 배치하는 것을 지원하지 않습니다. tf.function 내부에 루프를 배치하는 것은 단일 작업자 tf.distribute.TPUStrategy 대해 작동하지만 TPU pod를 사용할 때는 작동하지 않습니다.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

iter 를 사용하여 명시 적 반복자를 만듭니다.

tf.distribute.DistributedDataset 인스턴스의 요소를 반복하려면 iter API를 사용하여 tf.distribute.DistributedIterator 를 만들 수 있습니다. 명시 적 반복기를 사용하면 고정 된 수의 단계를 반복 할 수 있습니다. tf.distribute.DistributedIterator 인스턴스 dist_iterator 에서 다음 요소를 가져 오기 위해 next(dist_iterator) , dist_iterator.get_next() 또는 dist_iterator.get_next_as_optional() 호출 할 수 있습니다. 앞의 두 가지는 본질적으로 동일합니다.

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

next() 또는 tf.distribute.DistributedIterator.get_next() 를 사용하여 tf.distribute.DistributedIterator 가 끝에 도달하면 OutOfRange 오류가 발생합니다. 클라이언트는 파이썬 측에서 오류를 포착하고 체크 포인트 및 평가와 같은 다른 작업을 계속할 수 있습니다. 그러나 다음과 같은 호스트 학습 루프 (예 : tf.function 당 여러 단계 실행)를 사용하는 경우에는 작동하지 않습니다.

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn 돌며 단계 본체 배치하여 여러 단계를 포함 tf.range . 이 경우 종속성이없는 루프의 다른 반복이 병렬로 시작될 수 있으므로 이전 반복의 계산이 완료되기 전에 이후 반복에서 OutOfRange 오류가 트리거 될 수 있습니다. OutOfRange 오류가 발생하면 함수의 모든 작업이 즉시 종료됩니다. 이 경우를 피하고 싶은 경우 OutOfRange 오류를 발생시키지 않는 대안은 tf.distribute.DistributedIterator.get_next_as_optional() 입니다. get_next_as_optional 은 다음 요소를 포함하거나 tf.distribute.DistributedIterator 가 끝에 도달 한 경우 값이없는 tf.experimental.Optional 을 반환합니다.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

element_spec 속성 사용

당신이에 분산 된 데이터 세트의 요소를 전달하는 경우 tf.function 하고 싶은 tf.TypeSpec 보장을, 당신은 지정할 수 있습니다 input_signature 의 인수 tf.function . 분산 데이터 세트의 출력은 tf.distribute.DistributedValues 이며 단일 장치 또는 여러 장치에 대한 입력을 나타낼 수 있습니다. 이 분산 값에 해당하는 tf.TypeSpec 을 가져 오기 위해 분산 데이터 세트 또는 분산 반복기 객체의 element_spec 속성을 사용할 수 있습니다.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

부분 배치

사용자가tf.data.Dataset 인스턴스에 복제본 수로 균등하게 나눌 수없는 일괄 처리 크기가 포함되어 있거나 데이터 세트 인스턴스의 카디널리티가 일괄 처리 크기로 나눌 수없는 경우 부분 일괄 처리가 발생합니다. 즉, 데이터 세트가 여러 복제본에 분산 될 때 일부 반복기에 대한 next 호출로 인해 OutOfRangeError가 발생합니다. 이 사용 사례를 처리하기 위해 tf.distribute 는 처리 할 데이터가 더 이상없는 복제본에서 배치 크기 0의 더미 배치를 반환합니다.

단일 작업자의 경우 반복기의 next 호출에서 데이터가 반환되지 않으면 배치 크기가 0 인 더미 배치가 생성되어 데이터 세트의 실제 데이터와 함께 사용됩니다. 부분 배치의 경우 데이터의 마지막 전역 배치에는 더미 데이터 배치와 함께 실제 데이터가 포함됩니다. 데이터 처리를위한 중지 조건은 이제 복제본에 데이터가 있는지 확인합니다. 복제본에 데이터가 없으면 OutOfRange 오류가 발생합니다.

다중 작업자의 경우 각 작업자의 데이터 존재를 나타내는 부울 값은 교차 복제본 통신을 사용하여 집계되며 모든 작업자가 분산 된 데이터 세트 처리를 완료했는지 식별하는 데 사용됩니다. 여기에는 작업자 간 의사 소통이 포함되므로 성능이 저하됩니다.

주의 사항

  • 다중 작업자 설정으로 tf.distribute.Strategy.experimental_distribute_dataset API를 사용하는 경우 사용자는 파일에서 읽는tf.data.Dataset 를 전달합니다. tf.data.experimental.AutoShardPolicyAUTO 또는 FILE 로 설정된 경우 실제 단계별 배치 크기는 사용자가 정의한 전역 배치 크기보다 작을 수 있습니다. 이것은 파일의 나머지 요소가 전역 배치 크기보다 작을 때 발생할 수 있습니다. 사용자는 실행할 단계 수에 의존하지 않고 데이터 세트를 소진하거나 tf.data.experimental.AutoShardPolicyDATA 로 설정하여 tf.data.experimental.AutoShardPolicytf.data.experimental.AutoShardPolicy 수 있습니다.

  • 상태 저장 데이터 세트 변환은 현재 tf.distribute 지원되지 않으며 데이터 세트에있을 수있는 상태 저장 작업은 현재 무시됩니다. 예를 들어 데이터 세트에 map_fn 을 사용하여 이미지를 회전하는 tf.random.uniform 이있는 경우 Python 프로세스가 실행되는 로컬 머신의 상태 (즉, 임의 시드)에 의존하는 데이터 세트 그래프가 있습니다.

  • 기본적으로 비활성화 된 실험적 tf.data.experimental.OptimizationOptions 는 특정 컨텍스트 (예 : tf.distribute 와 함께 사용되는 경우)에서 성능 저하를 유발할 수 있습니다. 배포 설정에서 워크로드의 성능에 도움이되는지 확인한 후에 만 ​​활성화해야합니다.

  • tf.distribute.experimental_distribute_dataset 또는 tf.distribute.experimental_distribute_datasets_from_function 사용할 때 작업자가 데이터를 처리하는 tf.distribute.experimental_distribute_datasets_from_function 는 보장되지 않습니다. 이는 일반적으로 tf.distribute 를 사용하여 예측을 확장하는 경우 필요합니다. 그러나 배치의 각 요소에 대한 색인을 삽입하고 그에 따라 출력을 정렬 할 수 있습니다. 다음 스 니펫은 출력을 주문하는 방법의 예입니다.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

표준 tf.data.Dataset 인스턴스를 사용하지 않는 경우 데이터를 어떻게 배포합니까?

때때로 사용자는tf.data.Dataset 을 사용하여 입력을 나타내고 위에 언급 된 API를 사용하여 데이터 세트를 여러 장치에 배포 할 수 없습니다. 이러한 경우 원시 텐서 또는 생성기의 입력을 사용할 수 있습니다.

임의의 텐서 입력에 experiment_distribute_values_from_function 사용

strategy.runnext(iterator) 의 출력 인 tf.distribute.DistributedValues 를 허용 next(iterator) . 텐서 값을 전달하려면 experimental_distribute_values_from_function 을 사용하여 원시 텐서에서 tf.distribute.DistributedValues 를 생성합니다.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

생성기에서 입력 한 경우 tf.data.Dataset.from_generator 사용

사용하려는 생성기 함수가있는 경우 from_generator API를 사용하여tf.data.Dataset 인스턴스를 만들 수 있습니다.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)