![]() | ![]() | ![]() | ![]() |
tf.distribute APIは、ユーザーがトレーニングを単一のマシンから複数のマシンに拡張するための簡単な方法を提供します。モデルをスケーリングする場合、ユーザーは入力を複数のデバイスに分散する必要もあります。 tf.distribute
は、入力をデバイス間で自動的に分散できるAPIを提供します。
このガイドでは、 tf.distribute
を使用して分散データセットとイテレータを作成するさまざまな方法を紹介します。さらに、次のトピックについても説明します。
-
tf.distribute.Strategy.experimental_distribute_dataset
およびtf.distribute.Strategy.distribute_datasets_from_function
を使用する場合の使用法、シャーディング、およびバッチ処理のオプション。 - 分散データセットを反復処理するさまざまな方法。
-
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
とtf.data
の違い、およびユーザーが使用時に遭遇する可能性のある制限。
このガイドでは、KerasAPIでの分散入力の使用については説明していません。
分散データセット
tf.distribute
APIを使用してスケーリングするには、ユーザーがtf.data.Dataset
を使用して入力を表すことをお勧めします。 tf.distribute
は、 tf.data.Dataset
(たとえば、各アクセラレータデバイスへのデータの自動プリフェッチ)と効率的に連携するように作成されており、パフォーマンスの最適化が定期的に実装に組み込まれています。 tf.data.Dataset
以外のものを使用するユースケースがある場合は、このガイドの後のセクションを参照してください。非分散トレーニングループでは、ユーザーは最初にtf.data.Dataset
インスタンスを作成し、次に要素を反復処理します。例えば:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
ユーザーがユーザーの既存のコードに最小限の変更を加えてtf.distribute
戦略を使用できるようにするために、 tf.data.Dataset
インスタンスを配布し、配布されたデータセットオブジェクトを返す2つのAPIが導入されました。次に、ユーザーはこの分散データセットインスタンスを反復処理し、以前と同じようにモデルをトレーニングできます。次に、2つのAPI( tf.distribute.Strategy.experimental_distribute_dataset
とtf.distribute.Strategy.distribute_datasets_from_function
)を詳しく見てみましょう。
tf.distribute.Strategy.experimental_distribute_dataset
使用法
このAPIは、 tf.data.Dataset
インスタンスを入力として受け取り、 tf.distribute.DistributedDataset
インスタンスを返します。グローバルバッチサイズと等しい値で入力データセットをバッチ処理する必要があります。このグローバルバッチサイズは、1つのステップですべてのデバイスにわたって処理するサンプルの数です。この分散データセットをPythonのように反復するか、 iter
を使用してイテレーターを作成できます。返されるオブジェクトはtf.data.Dataset
インスタンスではなく、データセットを変換または検査する他のAPIをサポートしていません。これは、さまざまなレプリカに入力をシャーディングする特定の方法がない場合に推奨されるAPIです。
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
プロパティ
バッチ処理
tf.distribute
は、入力tf.data.Dataset
インスタンスを、グローバルバッチサイズを同期中のレプリカの数で割った値に等しい新しいバッチサイズで再バッチ処理します。同期しているレプリカの数は、トレーニング中に勾配allreduceに参加しているデバイスの数と同じです。ユーザーが分散イテレーターでnext
呼び出すと、レプリカごとのバッチサイズのデータが各レプリカで返されます。再バッチ処理されたデータセットのカーディナリティは、常にレプリカの数の倍数になります。次にいくつかの例を示します。
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- 配布なし:
- バッチ1:[0、1、2、3]
- バッチ2:[4、5]
2つのレプリカに分散しています。最後のバッチ([4、5])は2つのレプリカに分割されます。
バッチ1:
- レプリカ1:[0、1]
- レプリカ2:[2、3]
バッチ2:
- レプリカ2:[4]
- レプリカ2:[5]
tf.data.Dataset.range(4).batch(4)
- 配布なし:
- バッチ1:[[0]、[1]、[2]、[3]]
- 5つのレプリカに分散する場合:
- バッチ1:
- レプリカ1:[0]
- レプリカ2:[1]
- レプリカ3:[2]
- レプリカ4:[3]
- レプリカ5:[]
tf.data.Dataset.range(8).batch(4)
- 配布なし:
- バッチ1:[0、1、2、3]
- バッチ2:[4、5、6、7]
- 3つのレプリカに分散する場合:
- バッチ1:
- レプリカ1:[0、1]
- レプリカ2:[2、3]
- レプリカ3:[]
- バッチ2:
- レプリカ1:[4、5]
- レプリカ2:[6、7]
- レプリカ3:[]
データセットの再バッチ処理には、レプリカの数に比例して増加するスペースの複雑さがあります。これは、マルチワーカートレーニングのユースケースでは、入力パイプラインでOOMエラーが発生する可能性があることを意味します。
シャーディング
tf.distribute
は、 MultiWorkerMirroredStrategy
およびTPUStrategy
を使用したマルチワーカートレーニングの入力データセットも自動シャーディングします。各データセットは、ワーカーのCPUデバイス上に作成されます。一連のワーカーに対してデータセットを自動シャーディングするということは、各ワーカーにデータセット全体のサブセットが割り当てられることを意味します(適切なtf.data.experimental.AutoShardPolicy
が設定されている場合)。これは、各ステップで、重複しないデータセット要素のグローバルバッチサイズが各ワーカーによって処理されるようにするためです。自動シャーディングには、 tf.data.experimental.DistributeOptions
を使用して指定できるいくつかの異なるオプションがあります。 ParameterServerStrategy
を使用したマルチワーカートレーニングには自動シャーディングがないことに注意してください。この戦略を使用したデータセット作成の詳細については、 Parameter ServerStrategyチュートリアルを参照してください。
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
tf.data.experimental.AutoShardPolicy
に設定できる3つの異なるオプションがあります。
- AUTO:これはデフォルトのオプションであり、FILEによってシャードが試行されることを意味します。ファイルベースのデータセットが検出されない場合、FILEによるシャーディングの試行は失敗します。その後、
tf.distribute
はDATAによるシャーディングにフォールバックします。入力データセットがファイルベースであるが、ファイルの数がワーカーの数より少ない場合、InvalidArgumentError
が発生することに注意してください。これが発生した場合は、ポリシーを明示的にAutoShardPolicy.DATA
に設定するか、ファイルの数がワーカーの数よりも多くなるように、入力ソースを小さなファイルに分割します。 ファイル:これは、すべてのワーカーで入力ファイルをシャーディングする場合のオプションです。入力ファイルの数がワーカーの数よりもはるかに多く、ファイル内のデータが均等に分散されている場合は、このオプションを使用する必要があります。このオプションの欠点は、ファイル内のデータが均等に分散されていない場合にアイドル状態のワーカーが存在することです。ファイルの数がワーカーの数より少ない場合、
InvalidArgumentError
が発生します。これが発生した場合は、ポリシーをAutoShardPolicy.DATA
に明示的に設定してください。たとえば、それぞれ1つのレプリカを持つ2つのワーカーに2つのファイルを配布するとします。ファイル1には[0、1、2、3、4、5]が含まれ、ファイル2には[6、7、8、9、10、11]が含まれます。同期しているレプリカの総数を2、グローバルバッチサイズを4とします。- ワーカー0:
- バッチ1 =レプリカ1:[0、1]
- バッチ2 =レプリカ1:[2、3]
- バッチ3 =レプリカ1:[4]
- バッチ4 =レプリカ1:[5]
- 労働者1:
- バッチ1 =レプリカ2:[6、7]
- バッチ2 =レプリカ2:[8、9]
- バッチ3 =レプリカ2:[10]
- バッチ4 =レプリカ2:[11]
データ:これにより、すべてのワーカー間で要素が自動シャーディングされます。各ワーカーはデータセット全体を読み取り、割り当てられたシャードのみを処理します。他のすべてのシャードは破棄されます。これは通常、入力ファイルの数がワーカーの数より少なく、すべてのワーカー間でデータをより適切にシャーディングする場合に使用されます。欠点は、データセット全体が各ワーカーで読み取られることです。たとえば、1つのファイルを2人のワーカーに配布するとします。ファイル1には、[0、1、2、3、4、5、6、7、8、9、10、11]が含まれています。同期しているレプリカの総数を2とします。
- ワーカー0:
- バッチ1 =レプリカ1:[0、1]
- バッチ2 =レプリカ1:[4、5]
- バッチ3 =レプリカ1:[8、9]
- 労働者1:
- バッチ1 =レプリカ2:[2、3]
- バッチ2 =レプリカ2:[6、7]
- バッチ3 =レプリカ2:[10、11]
オフ:自動シャーディングをオフにすると、各ワーカーがすべてのデータを処理します。たとえば、1つのファイルを2人のワーカーに配布するとします。ファイル1には、[0、1、2、3、4、5、6、7、8、9、10、11]が含まれています。同期しているレプリカの総数を2とすると、各ワーカーには次の分布が表示されます。
- ワーカー0:
- バッチ1 =レプリカ1:[0、1]
- バッチ2 =レプリカ1:[2、3]
- バッチ3 =レプリカ1:[4、5]
- バッチ4 =レプリカ1:[6、7]
- バッチ5 =レプリカ1:[8、9]
バッチ6 =レプリカ1:[10、11]
労働者1:
バッチ1 =レプリカ2:[0、1]
バッチ2 =レプリカ2:[2、3]
バッチ3 =レプリカ2:[4、5]
バッチ4 =レプリカ2:[6、7]
バッチ5 =レプリカ2:[8、9]
バッチ6 =レプリカ2:[10、11]
プリフェッチ
デフォルトでは、 tf.distribute
は、ユーザーが指定したtf.data.Dataset
インスタンスの最後にプリフェッチ変換を追加します。 buffer_size
であるプリフェッチ変換の引数は、同期しているレプリカの数と同じです。
tf.distribute.Strategy.distribute_datasets_from_function
使用法
このAPIは入力関数を受け取り、 tf.distribute.DistributedDataset
インスタンスを返します。ユーザーが渡す入力関数にはtf.distribute.InputContext
引数があり、 tf.data.Dataset
インスタンスを返す必要があります。このAPIを使用すると、 tf.distribute
は、入力関数から返されたユーザーのtf.data.Dataset
インスタンスにそれ以上の変更を加えません。データセットをバッチ処理してシャーディングするのはユーザーの責任です。 tf.distribute
は、各ワーカーのCPUデバイスで入力関数を呼び出します。ユーザーが独自のバッチ処理およびシャーディングロジックを指定できるようにするだけでなく、このAPIは、マルチワーカートレーニングに使用した場合、 tf.distribute.Strategy.experimental_distribute_dataset
と比較して優れたスケーラビリティとパフォーマンスを示します。
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
プロパティ
バッチ処理
input関数の戻り値であるtf.data.Dataset
インスタンスは、レプリカごとのバッチサイズを使用してバッチ処理する必要があります。レプリカごとのバッチサイズは、グローバルバッチサイズを同期トレーニングに参加しているレプリカの数で割ったものです。これは、 tf.distribute
が各ワーカーのCPUデバイスで入力関数を呼び出すためです。特定のワーカーで作成されたデータセットは、そのワーカーのすべてのレプリカで使用できるようになっている必要があります。
シャーディング
ユーザーの入力関数への引数として暗黙的に渡されるtf.distribute.InputContext
オブジェクトは、内部でtf.distribute
によって作成されます。ワーカーの数、現在のワーカーIDなどに関する情報があります。この入力関数は、 tf.distribute.InputContext
オブジェクトの一部であるこれらのプロパティを使用して、ユーザーが設定したポリシーに従ってシャーディングを処理できます。
プリフェッチ
tf.distribute
は、ユーザー提供の入力関数によって返されるtf.data.Dataset
の最後にプリフェッチ変換を追加しません。
分散イテレータ
非分散tf.data.Dataset
インスタンスと同様に、 tf.distribute.DistributedDataset
インスタンスに反復子を作成して、反復処理し、 tf.distribute.DistributedDataset
の要素にアクセスする必要があります。以下は、 tf.distribute.DistributedIterator
を作成し、それを使用してモデルをトレーニングする方法です。
使用法
Pythonicforループ構造を使用する
ユーザーフレンドリーなPythonicループを使用して、 tf.distribute.DistributedDataset
を反復処理できます。 tf.distribute.DistributedIterator
から返される要素は、単一のtf.Tensor
またはレプリカごとの値を含むtf.distribute.DistributedValues
にすることができます。 tf.function
内にループを配置すると、パフォーマンスが向上します。ただし、現在、 break
とreturn
は、 tf.function
内に配置されているtf.distribute.DistributedDataset
のループではサポートされていません。
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
iter
を使用して明示的なイテレータを作成します
tf.distribute.DistributedDataset
インスタンスの要素を反復処理するには、 iter
を使用してtf.distribute.DistributedIterator
を作成します。明示的なイテレータを使用すると、固定数のステップで反復できます。 tf.distribute.DistributedIterator
インスタンスdist_iterator
から次の要素を取得するには、 next(dist_iterator)
、 dist_iterator.get_next()
、またはdist_iterator.get_next_as_optional()
を呼び出すことができます。前の2つは本質的に同じです:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
next()
またはtf.distribute.DistributedIterator.get_next()
を使用して、 tf.distribute.DistributedIterator
が終了に達した場合、OutOfRangeエラーがスローされます。クライアントはPython側でエラーをキャッチし、チェックポインティングや評価などの他の作業を続行できます。ただし、これは、次のようなホストトレーニングループを使用している場合(つまり、 tf.function
ごとに複数のステップを実行している場合)は機能しません。
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
には、ステップ本体をtf.range
内にラップすることにより、複数のステップが含まれています。この場合、依存関係のないループ内のさまざまな反復が並行して開始される可能性があるため、前の反復の計算が終了する前に、後の反復でOutOfRangeエラーがトリガーされる可能性があります。 OutOfRangeエラーがスローされると、関数内のすべての操作がすぐに終了します。これを避けたい場合は、OutOfRangeエラーをスローしない代替手段はtf.distribute.DistributedIterator.get_next_as_optional()
です。 get_next_as_optional
は、 tf.experimental.Optional
を返します。これには、次の要素が含まれるか、 tf.distribute.DistributedIterator
が終了した場合は値が含まれません。
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])プレースホルダー16
element_spec
プロパティの使用
分散データセットの要素をtf.function
に渡し、 tf.TypeSpec
保証が必要な場合は、 tf.function
のinput_signature
引数を指定できます。分散データセットの出力はtf.distribute.DistributedValues
であり、単一のデバイスまたは複数のデバイスへの入力を表すことができます。この分散値に対応するtf.TypeSpec
を取得するには、分散データセットまたは分散イテレータオブジェクトのelement_spec
プロパティを使用できます。
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])プレースホルダー18
部分バッチ
部分的なバッチは、ユーザーが作成するtf.data.Dataset
インスタンスに、レプリカの数で均等に割り切れないバッチサイズが含まれている場合、またはデータセットインスタンスのカーディナリティがバッチサイズで割り切れない場合に発生します。これは、データセットが複数のレプリカに分散されている場合、一部のイテレーターをnext
呼び出すとOutOfRangeErrorが発生することを意味します。このユースケースを処理するために、 tf.distribute
は、処理するデータがこれ以上ないレプリカで、バッチサイズ0のダミーバッチを返します。
シングルワーカーの場合、イテレーターでのnext
呼び出しによってデータが返されない場合、0バッチサイズのダミーバッチが作成され、データセット内の実際のデータとともに使用されます。部分的なバッチの場合、データの最後のグローバルバッチには、データのダミーバッチとともに実際のデータが含まれます。データ処理の停止条件は、レプリカのいずれかにデータがあるかどうかをチェックするようになりました。どのレプリカにもデータがない場合、OutOfRangeエラーがスローされます。
マルチワーカーの場合、各ワーカーのデータの存在を表すブール値は、クロスレプリカ通信を使用して集計されます。これは、すべてのワーカーが分散データセットの処理を終了したかどうかを識別するために使用されます。これにはクロスワーカー通信が含まれるため、パフォーマンスが低下します。
警告
複数のワーカー設定で
tf.distribute.Strategy.experimental_distribute_dataset
を使用する場合、ユーザーはファイルから読み取るtf.data.Dataset
を渡します。tf.data.experimental.AutoShardPolicy
がAUTO
またはFILE
に設定されている場合、実際のステップごとのバッチサイズは、ユーザー定義のグローバルバッチサイズよりも小さい場合があります。これは、ファイルの残りの要素がグローバルバッチサイズよりも小さい場合に発生する可能性があります。ユーザーは、実行するステップ数に依存せずにデータセットを使い果たすか、tf.data.experimental.AutoShardPolicy
をDATA
に設定して回避することができます。ステートフルデータセット変換は現在
tf.distribute
でサポートされておらず、データセットが持つ可能性のあるステートフル操作は現在無視されます。たとえば、データセットにmap_fn
を使用して画像を回転させるtf.random.uniform
がある場合、Pythonプロセスが実行されているローカルマシンの状態(つまりランダムシード)に依存するデータセットグラフがあります。デフォルトで無効になっている実験的
tf.data.experimental.OptimizationOptions
は、特定のコンテキスト(tf.distribute
と一緒に使用する場合など)でパフォーマンスの低下を引き起こす可能性があります。それらが分散設定でワークロードのパフォーマンスに役立つことを検証した後でのみ、それらを有効にする必要があります。一般に
tf.data
を使用して入力パイプラインを最適化する方法については、このガイドを参照してください。いくつかの追加のヒント:複数のワーカーがあり、
tf.data.Dataset.list_files
を使用して1つ以上のグロブパターンに一致するすべてのファイルからデータセットを作成している場合は、seed
引数を設定するか、shuffle=False
を設定して、各ワーカーがファイルを一貫してシャーディングするようにしてください。次の例に示すように、入力パイプラインにレコードレベルでのデータのシャッフルとデータの解析の両方が含まれている場合、未解析のデータが解析済みのデータよりも大幅に大きい場合を除き(通常はそうではありません)、最初にシャッフルしてから解析します。これにより、メモリ使用量とパフォーマンスが向上する可能性があります。
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
は、buffer_size
要素の内部バッファーを維持するため、buffer_size
を減らすとOOMの問題が軽減される可能性があります。tf.distribute.experimental_distribute_dataset
またはtf.distribute.distribute_datasets_from_function
を使用するときに、データがワーカーによって処理される順序は保証されません。これは通常、tf.distribute
を使用してスケール予測を行う場合に必要です。ただし、バッチ内の各要素にインデックスを挿入し、それに応じて出力を並べ替えることができます。次のスニペットは、出力を並べ替える方法の例です。
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
正規のtf.data.Datasetインスタンスを使用していない場合、データを配布するにはどうすればよいですか?
ユーザーがtf.data.Dataset
を使用して入力を表現したり、上記のAPIを使用してデータセットを複数のデバイスに配布したりできない場合があります。このような場合、生のテンソルまたはジェネレーターからの入力を使用できます。
任意のテンソル入力にexperimental_distribute_values_from_functionを使用します
strategy.run
は、 next(iterator)
の出力であるtf.distribute.DistributedValues
を受け入れます。テンソル値を渡すには、 experimental_distribute_values_from_function
を使用して、生のテンソルからtf.distribute.DistributedValues
を作成します。
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
入力がジェネレーターからのものである場合は、tf.data.Dataset.from_generatorを使用します
使用するジェネレーター関数がある場合は、 from_generator
を使用してtf.data.Dataset
インスタンスを作成できます。
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.