このページは Cloud Translation API によって翻訳されました。
Switch to English

分散入力

TensorFlow.orgで見る Google Colabで実行 GitHubでソースを表示 ノートブックをダウンロード

tf.distribute APIは、ユーザーがトレーニングを単一のマシンから複数のマシンにスケーリングする簡単な方法を提供します。モデルをスケーリングする場合、ユーザーは入力を複数のデバイスに分散させる必要もあります。 tf.distributeは、デバイス間で入力を自動的に分散できるAPIを提供します。

このガイドでは、 tf.distribute APIを使用して分散データセットとイテレータを作成するさまざまな方法をtf.distributeます。さらに、次のトピックについても説明します。

このガイドでは、Keras APIでの分散入力の使用については説明していません。

分散データセット

tf.distribute APIを使用してtf.distributeするには、ユーザーがtf.data.Datasetを使用して入力を表すことをお勧めします。 tf.distributeは、パフォーマンスの最適化が実装に定期的に組み込まれているtf.data.Dataset (たとえば、各アクセラレータデバイスへのデータの自動プリフェッチ)で効率的にtf.data.Datasetするように作成されています。 tf.data.Dataset以外の使用例がある場合は、このガイドの後半のセクションを参照してください。非分散トレーニングループでは、ユーザーはまずtf.data.Datasetインスタンスを作成してから、要素を反復処理します。例えば:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

ユーザーが既存のコードに最小限の変更を加えるtf.distribute戦略を使用できるようにするために、 tf.data.Datasetインスタンスを分散して分散データセットオブジェクトを返す2つのAPIが導入されました。次に、ユーザーはこの分散データセットインスタンスを反復処理し、以前のようにモデルをトレーニングできます。 2つのAPI- tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.experimental_distribute_datasets_from_functionをさらに詳しく見てみましょう。

tf.distribute.Strategy.experimental_distribute_dataset

使用法

このAPIは、かかるtf.data.Dataset入力としてインスタンスを返すとtf.distribute.DistributedDatasetインスタンスを。グローバルバッチサイズと等しい値で入力データセットをバッチ処理する必要があります。このグローバルバッチサイズは、すべてのデバイスで1つのステップで処理するサンプルの数です。 Pythonic方式でこの分散データセットを反復処理するか、 iterを使用してイテレータを作成できます。返されたオブジェクトはtf.data.Datasetインスタンスではなく、データセットを変換または検査する他のAPIをサポートしていません。これは、入力をさまざまなレプリカに分割する特定の方法がない場合に推奨されるAPIです。

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

プロパティ

バッチ処理

tf.distributeは、グローバルバッチサイズを同期しているレプリカの数で割った値に等しい新しいバッチサイズで、入力tf.data.Datasetインスタンスをtf.data.Datasetバッチします。同期しているレプリカの数は、トレーニング中に勾配allreduceに参加しているデバイスの数と同じです。ユーザーがnext分散イテレータを呼び出すと、レプリカごとのバッチサイズのデータ​​が各レプリカに返されます。再バッチされたデータセットのカーディナリティは、常にレプリカ数の倍数になります。次に例をいくつか示します。

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • 配布なし:
    • バッチ1:[0、1、2、3]
    • バッチ2:[4、5]
    • 2つのレプリカに分散します。最後のバッチ([4、5])は2つのレプリカに分割されます。

    • バッチ1:

      • レプリカ1:[0、1]
      • レプリカ2:[2、3]
    • バッチ2:

      • レプリカ2:[4]
      • レプリカ2:[5]
  • tf.data.Dataset.range(4).batch(4)

    • 配布なし:
    • バッチ1:[[0]、[1]、[2]、[3]]
    • 5つのレプリカに分散する場合:
    • バッチ1:
      • レプリカ1:[0]
      • レプリカ2:[1]
      • レプリカ3:[2]
      • レプリカ4:[3]
      • レプリカ5:[]
  • tf.data.Dataset.range(8).batch(4)

    • 配布なし:
    • バッチ1:[0、1、2、3]
    • バッチ2:[4、5、6、7]
    • 3つのレプリカに分散する場合:
    • バッチ1:
      • レプリカ1:[0、1]
      • レプリカ2:[2、3]
      • レプリカ3:[]
    • バッチ2:
      • レプリカ1:[4、5]
      • レプリカ2:[6、7]
      • レプリカ3:[]

データセットを再バッチ処理すると、スペースの複雑さが増し、レプリカの数とともに直線的に増加します。つまり、マルチワーカートレーニングのユースケースでは、入力パイプラインでOOMエラーが発生する可能性があります。

シャーディング

tf.distributeは、マルチワーカートレーニングの入力データセットも自動tf.distributeます。各データセットは、ワーカーのCPUデバイスで作成されます。一連のワーカーでデータセットを自動シャーディングすると、各ワーカーにはデータセット全体のサブセットが割り当てられます(適切なtf.data.experimental.AutoShardPolicyが設定されている場合)。これは、各ステップで、重複しないデータセット要素のグローバルバッチサイズが各ワーカーによって確実に処理されるようにするためです。 Autoshardingを使用して指定することができ、さまざまなオプションをいくつ持っているtf.data.experimental.DistributeOptions

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

tf.data.experimental.AutoShardPolicy設定できる3つの異なるオプションがあります。

  • AUTO:これはデフォルトのオプションであり、FILEによってシャーディングが試行されることを意味します。ファイルベースのデータセットが検出されない場合、FILEによるシャーディングの試行は失敗します。その後、 tf.distributeはDATAによるシャーディングにフォールバックします。入力データセットがファイルベースであり、ファイルの数がワーカーの数より少ない場合、エラーが発生することに注意してください。
  • FILE:これは、入力ファイルをすべてのワーカーにシャーディングする場合のオプションです。ファイルの数がワーカーの数より少ない場合、エラーが発生します。入力ファイルの数がワーカーの数よりはるかに多く、ファイル内のデータが均等に分散されている場合は、このオプションを使用する必要があります。このオプションの欠点は、ファイル内のデータが均等に分散されていない場合に、ワーカーがアイドル状態になることです。たとえば、2つのファイルを2つのワーカーに分散させ、それぞれに1つのレプリカを割り当てます。ファイル1には[0、1、2、3、4、5]が含まれ、ファイル2には[6、7、8、9、10、11]が含まれます。同期しているレプリカの総数を2、グローバルバッチサイズを4とします。

    • ワーカー0:
    • バッチ1 =レプリカ1:[0、1]
    • バッチ2 =レプリカ1:[2、3]
    • バッチ3 =レプリカ1:[4]
    • バッチ4 =レプリカ1:[5]
    • 労働者1:
    • バッチ1 =レプリカ2:[6、7]
    • バッチ2 =レプリカ2:[8、9]
    • バッチ3 =レプリカ2:[10]
    • バッチ4 =レプリカ2:[11]
  • データ:これにより、すべてのワーカー間で要素が自動分割されます。各ワーカーはデータセット全体を読み取り、それに割り当てられたシャードのみを処理します。他のすべてのシャードは破棄されます。これは一般に、入力ファイルの数がワーカーの数より少なく、すべてのワーカー間でより良いデータのシャーディングが必要な場合に使用されます。欠点は、データセット全体が各ワーカーで読み取られることです。たとえば、1つのファイルを2つのワーカーに分散させます。ファイル1には[0、1、2、3、4、5、6、7、8、9、10、11]が含まれています。同期しているレプリカの総数を2とします。

    • ワーカー0:
    • バッチ1 =レプリカ1:[0、1]
    • バッチ2 =レプリカ1:[4、5]
    • バッチ3 =レプリカ1:[8、9]
    • 労働者1:
    • バッチ1 =レプリカ2:[2、3]
    • バッチ2 =レプリカ2:[6、7]
    • バッチ3 =レプリカ2:[10、11]
  • オフ:自動シャーディングをオフにすると、各ワーカーがすべてのデータを処理します。たとえば、2つのワーカーに1つのファイルを配布するとします。ファイル1には[0、1、2、3、4、5、6、7、8、9、10、11]が含まれています。同期しているレプリカの総数を2とすると、各ワーカーには次の分布が表示されます。

    • ワーカー0:
    • バッチ1 =レプリカ1:[0、1]
    • バッチ2 =レプリカ1:[2、3]
    • バッチ3 =レプリカ1:[4、5]
    • バッチ4 =レプリカ1:[6、7]
    • バッチ5 =レプリカ1:[8、9]
    • バッチ6 =レプリカ1:[10、11]

    • 労働者1:

    • バッチ1 =レプリカ2:[0、1]

    • バッチ2 =レプリカ2:[2、3]

    • バッチ3 =レプリカ2:[4、5]

    • バッチ4 =レプリカ2:[6、7]

    • バッチ5 =レプリカ2:[8、9]

    • バッチ6 =レプリカ2:[10、11]

プリフェッチ

デフォルトでは、 tf.distributeは、ユーザー提供のtf.data.Datasetインスタンスの最後にプリフェッチ変換を追加します。 buffer_sizeあるプリフェッチ変換の引数は、同期しているレプリカの数と同じbuffer_size

tf.distribute.Strategy.experimental_distribute_datasets_from_function

使用法

このAPIは入力関数を取り、 tf.distribute.DistributedDatasetインスタンスを返します。ユーザーが渡す入力関数にはtf.distribute.InputContext引数があり、 tf.data.Datasetインスタンスを返す必要がtf.data.Datasetます。このAPIを使用すると、 tf.distributeは、入力関数から返されたユーザーのtf.data.Datasetインスタンスにそれ以上変更を加えません。データセットをバッチ処理してシャーディングするのは、ユーザーの責任です。 tf.distributeは、各ワーカーのCPUデバイスの入力関数を呼び出します。ユーザーが独自のバッチ処理とシャーディングのロジックを指定できるようにするだけでなく、このAPIは、マルチワーカートレーニングに使用した場合、 tf.distribute.Strategy.experimental_distribute_datasetと比較して、より優れたスケーラビリティとパフォーマンスも示します。

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

プロパティ

バッチ処理

入力関数の戻り値であるtf.data.Datasetインスタンスは、レプリカごとのバッチサイズを使用してバッチ処理する必要があります。レプリカごとのバッチサイズは、グローバルバッチサイズを、同期トレーニングに参加しているレプリカの数で割ったものです。これは、 tf.distributeが各ワーカーのCPUデバイスの入力関数を呼び出すためtf.distribute 。特定のワーカーで作成されたデータセットは、そのワーカーのすべてのレプリカで使用できる状態になっている必要があります。

シャーディング

暗黙的に引数としてユーザーの入力関数に渡されるtf.distribute.InputContextオブジェクトは、 tf.distributetf.distributeによって作成されます。ワーカーの数、現在のワーカーIDなどに関する情報があります。この入力関数は、 tf.distribute.InputContextオブジェクトの一部であるこれらのプロパティを使用して、ユーザーが設定したポリシーに従ってシャーディングを処理できます。

プリフェッチ

tf.distributeは、ユーザー提供の入力関数によって返されたtf.data.Datasetの最後にプリフェッチ変換を追加しません。

分散イテレータ

非分散tf.data.Datasetインスタンスと同様に、 tf.distribute.DistributedDatasetインスタンスで反復tf.distribute.DistributedDatasetを作成し、それを反復してtf.distribute.DistributedDataset要素にアクセスする必要があります。以下は、 tf.distribute.DistributedIteratorを作成し、 tf.distribute.DistributedIteratorを使用してモデルをトレーニングする方法です。

使い方

Pythonic forループ構造を使用する

ユーザーフレンドリーなPythonicループを使用して、 tf.distribute.DistributedDatasetを反復できます。 tf.distribute.DistributedIteratorから返される要素は、単一のtf.Tensorまたはtf.distribute.DistributedValuesであり、レプリカごとの値を含みます。ループをtf.function内に配置すると、パフォーマンスが向上します。しかし、 breakreturn現在、以上のループのためにサポートされていないtf.distribute.DistributedDatasetの内側に配置されてtf.function 。また、 tf.distribute.experimental.MultiWorkerMirroredStrategytf.distribute.TPUStrategyなどのマルチワーカー戦略を使用する場合、 tf.function内にループを配置することもサポートしていません。 tf.function内にループを配置すると、単一のワーカーtf.distribute.TPUStrategyが、TPUポッドを使用する場合は機能しません。

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

iterを使用して明示的なイテレーターを作成する

tf.distribute.DistributedDatasetインスタンスの要素を反復処理するには、その上でiter APIを使用してtf.distribute.DistributedIteratorを作成できます。明示的なイテレータを使用すると、一定数のステップを反復できます。 tf.distribute.DistributedIteratorインスタンスdist_iteratorから次の要素を取得するために、next next(dist_iterator)dist_iterator.get_next() 、またはdist_iterator.get_next_as_optional()を呼び出すことができます。前者の2つは基本的に同じです。

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

next()またはtf.distribute.DistributedIterator.get_next()を使用すると、 tf.distribute.DistributedIteratorが最後に到達すると、OutOfRangeエラーがスローされます。クライアントはpython側でエラーをキャッチし、チェックポイントや評価などの他の作業を続行できます。ただし、次のようなホストトレーニングループ(つまり、 tf.functionごとに複数のステップを実行)を使用している場合、これは機能しません。

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fnは、ステップ本体をtf.range内でラップすることにより、複数のステップが含まれています。この場合、依存関係のないループの異なる反復が並行して開始される可能性があるため、前の反復の計算が終了する前に、後の反復でOutOfRangeエラーがトリガーされる可能性があります。 OutOfRangeエラーがスローされると、関数内のすべての操作がすぐに終了します。これが避けたいケースである場合、OutOfRangeエラーをスローしない代替手段はtf.distribute.DistributedIterator.get_next_as_optional()です。 get_next_as_optional戻りtf.experimental.Optional場合は次の要素又は全く値が含まtf.distribute.DistributedIterator終わりに達しました。

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

element_specプロパティの使用

あなたがに配布データセットの要素を渡すとtf.functionとしたいtf.TypeSpec保証を、次のように指定することができますinput_signature引数tf.function 。分散データセットの出力はtf.distribute.DistributedValuesで、単一のデバイスまたは複数のデバイスへの入力を表すことができます。この分散値に対応するtf.TypeSpecを取得するには、分散データセットまたは分散イテレーターオブジェクトのelement_specプロパティを使用できます。

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

部分バッチ

ユーザーが作成するtf.data.Datasetインスタンスに、レプリカの数で割り切れないバッチサイズが含まれている場合や、データセットインスタンスのカーディナリティがバッチサイズで割り切れない場合、部分的なバッチが発生します。これは、データセットが複数のレプリカに分散されている場合、一部のイテレータのnext呼び出しでOutOfRangeErrorが発生することを意味します。この使用例を処理するために、 tf.distributeは、処理するデータがそれ以上ないレプリカでバッチサイズ0のダミーバッチを返します。

シングルワーカーの場合、イテレータのnext呼び出しでデータが返されない場合は、バッチサイズ0のダミーバッチが作成され、データセット内の実際のデータとともに使用されます。部分的なバッチの場合、データの最後のグローバルバッチには、ダミーのデータバッチとともに実際のデータが含まれます。データ処理の停止条件により、レプリカにデータがあるかどうかが確認されるようになりました。どのレプリカにもデータがない場合、OutOfRangeエラーがスローされます。

マルチワーカーの場合、各ワーカーのデータの存在を表すブール値はクロスレプリカ通信を使用して集約され、これはすべてのワーカーが分散データセットの処理を完了したかどうかを識別するために使用されます。これには、クロスワーカーの通信が含まれるため、パフォーマンスが低下する場合があります。

注意事項

  • 複数のワーカーセットアップでtf.distribute.Strategy.experimental_distribute_dataset APIを使用する場合、ユーザーはファイルから読み取るtf.data.Datasetを渡します。 tf.data.experimental.AutoShardPolicyAUTOまたはFILE設定されている場合、実際のステップごとのバッチサイズは、ユーザー定義のグローバルバッチサイズよりも小さい場合があります。これは、ファイル内の残りの要素がグローバルバッチサイズより小さい場合に発生する可能性があります。ユーザーは、実行するステップの数に依存せずにデータセットを使い果たすことも、 tf.data.experimental.AutoShardPolicyDATAに設定してDATAセットをtf.data.experimental.AutoShardPolicyすることもできます。

  • ステートフルデータセット変換は現在tf.distributeではサポートされておらず、データセットが持つ可能性のあるステートフル操作は現在無視されています。たとえば、データセットにmap_fnを使用して画像を回転させるtf.random.uniformがある場合、Pythonプロセスが実行されているローカルマシンの状態(つまり、ランダムシード)に依存するデータセットグラフがあります。

  • デフォルトで無効になっている実験的なtf.data.experimental.OptimizationOptionsは、特定のコンテキスト( tf.distribute一緒に使用される場合など)で、パフォーマンスの低下を引き起こす可能性があります。分散設定でワークロードのパフォーマンスにメリットがあることを検証した後でのみ、それらを有効にする必要があります。

  • tf.distribute.experimental_distribute_datasetまたはtf.distribute.experimental_distribute_datasets_from_functionを使用するときにワーカーがデータを処理するtf.distribute.experimental_distribute_datasets_from_functionは保証されていません。これは通常、 tf.distributeを使用して予測をtf.distributeする場合に必要です。ただし、バッチの各要素にインデックスを挿入し、それに応じて出力を並べ替えることができます。次のスニペットは、出力を注文する方法の例です。

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

正規のtf.data.Datasetインスタンスを使用していない場合、データをどのように配布しますか?

時々、ユーザーはtf.data.Datasetを使用して入力を表すことができず、続いて上記のAPIを使用してデータセットを複数のデバイスに配布できません。このような場合、生テンソルまたはジェネレーターからの入力を使用できます。

任意のテンソル入力にexperimental_distribute_values_from_functionを使用します

strategy.runは、 next(iterator)出力であるtf.distribute.DistributedValuesを受け入れnext(iterator) 。テンソル値を渡すには、 experimental_distribute_values_from_functionを使用して、生テンソルからtf.distribute.DistributedValuesします。

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

入力がジェネレーターからのものである場合は、tf.data.Dataset.from_generatorを使用します。

あなたが使用することジェネレータ機能を持っている場合は、作成することができtf.data.Dataset使用してインスタンスをfrom_generator APIを。

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)