![]() | ![]() | ![]() | ![]() |
tf.distribute APIは、ユーザーがトレーニングを単一のマシンから複数のマシンに拡張するための簡単な方法を提供します。モデルをスケーリングする場合、ユーザーは入力を複数のデバイスに分散する必要もあります。 tf.distribute
は、入力をデバイス間で自動的に分散できるAPIを提供します。
このガイドでは、 tf.distribute
を使用して分散データセットとイテレーターを作成するさまざまな方法をtf.distribute
ます。さらに、次のトピックについても説明します。
-
tf.distribute.Strategy.experimental_distribute_dataset
およびtf.distribute.Strategy.distribute_datasets_from_function
を使用する場合の使用法、シャーディング、およびバッチ処理のオプション。 - 分散データセットを反復処理するさまざまな方法。
-
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
とtf.data
違い、およびユーザーが使用時に遭遇する可能性のある制限。
このガイドでは、KerasAPIでの分散入力の使用については説明していません。
分散データセット
tf.distribute
APIを使用してtf.distribute
するには、ユーザーがtf.data.Dataset
を使用して入力を表すことをお勧めします。 tf.distribute
は、tf.data.Dataset
(たとえば、各アクセラレータデバイスへのデータの自動プリフェッチ)と効率的にtf.data.Dataset
するように作成されており、パフォーマンスの最適化が定期的に実装に組み込まれています。tf.data.Dataset
以外のものを使用するユースケースがある場合は、このガイドの後のセクションを参照してください。非分散トレーニングループでは、ユーザーは最初にtf.data.Dataset
インスタンスを作成し、次に要素を反復処理します。例えば:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.4.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
ユーザーがユーザーの既存のコードに最小限の変更を加えてtf.distribute
戦略を使用できるようにするために、tf.data.Dataset
インスタンスを配布し、配布されたデータセットオブジェクトを返す2つのAPIが導入されました。次に、ユーザーはこの分散データセットインスタンスを繰り返し処理し、以前と同じようにモデルをトレーニングできます。ここで、2つのAPI( tf.distribute.Strategy.experimental_distribute_dataset
とtf.distribute.Strategy.distribute_datasets_from_function
)を詳しく見てみましょう。
tf.distribute.Strategy.experimental_distribute_dataset
使用法
このAPIは、かかるtf.data.Dataset
入力としてインスタンスを返すとtf.distribute.DistributedDataset
インスタンスを。グローバルバッチサイズと等しい値で入力データセットをバッチ処理する必要があります。このグローバルバッチサイズは、1つのステップですべてのデバイスにわたって処理するサンプルの数です。この分散データセットをPythonのように反復するか、 iter
を使用してイテレーターを作成できます。返されるオブジェクトはtf.data.Dataset
インスタンスではなく、データセットを変換または検査する他のAPIをサポートしていません。これは、さまざまなレプリカに入力をシャーディングする特定の方法がない場合に推奨されるAPIです。
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>)
プロパティ
バッチ処理
tf.distribute
は、入力tf.data.Dataset
インスタンスを、グローバルバッチサイズを同期中のレプリカの数で割った値に等しい新しいバッチサイズで再バッチtf.data.Dataset
ます。同期しているレプリカの数は、トレーニング中に勾配allreduceに参加しているデバイスの数と同じです。ユーザーが分散イテレーターでnext
を呼び出すと、レプリカごとのバッチサイズのデータが各レプリカで返されます。再バッチ処理されたデータセットのカーディナリティは、常にレプリカの数の倍数になります。次にいくつかの例を示します。
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- 配布なし:
- バッチ1:[0、1、2、3]
- バッチ2:[4、5]
2つのレプリカに分散しています。最後のバッチ([4、5])は、2つのレプリカに分割されます。
バッチ1:
- レプリカ1:[0、1]
- レプリカ2:[2、3]
バッチ2:
- レプリカ2:[4]
- レプリカ2:[5]
tf.data.Dataset.range(4).batch(4)
- 配布なし:
- バッチ1:[[0]、[1]、[2]、[3]]
- 5つのレプリカに分散する場合:
- バッチ1:
- レプリカ1:[0]
- レプリカ2:[1]
- レプリカ3:[2]
- レプリカ4:[3]
- レプリカ5:[]
tf.data.Dataset.range(8).batch(4)
- 配布なし:
- バッチ1:[0、1、2、3]
- バッチ2:[4、5、6、7]
- 3つのレプリカに分散する場合:
- バッチ1:
- レプリカ1:[0、1]
- レプリカ2:[2、3]
- レプリカ3:[]
- バッチ2:
- レプリカ1:[4、5]
- レプリカ2:[6、7]
- レプリカ3:[]
データセットの再バッチ処理には、レプリカの数に比例して増加するスペースの複雑さがあります。これは、マルチワーカートレーニングのユースケースでは、入力パイプラインでOOMエラーが発生する可能性があることを意味します。
シャーディング
tf.distribute
は、 MultiWorkerMirroredStrategy
およびTPUStrategy
をMultiWorkerMirroredStrategy
したマルチワーカートレーニングで入力データセットを自動TPUStrategy
ます。各データセットは、ワーカーのCPUデバイス上に作成されます。一連のワーカーに対してデータセットを自動シャーディングするということは、各ワーカーにデータセット全体のサブセットが割り当てられることを意味します(適切なtf.data.experimental.AutoShardPolicy
が設定されている場合)。これは、各ステップで、重複しないデータセット要素のグローバルバッチサイズが各ワーカーによって処理されるようにするためです。 Autoshardingを使用して指定することができ、さまざまなオプションをいくつ持っているtf.data.experimental.DistributeOptions
。 ParameterServerStrategy
したマルチワーカートレーニングには自動シャーディングがないことに注意してください。この戦略を使用したデータセット作成の詳細については、 Parameter ServerStrategyチュートリアルを参照してください。
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
tf.data.experimental.AutoShardPolicy
設定できる3つの異なるオプションがあります。
- AUTO:これはデフォルトのオプションであり、FILEによってシャードが試行されることを意味します。ファイルベースのデータセットが検出されない場合、FILEによるシャーディングの試行は失敗します。その後、
tf.distribute
はDATAによるシャーディングにフォールバックします。入力データセットがファイルベースであるが、ファイルの数がワーカーの数より少ない場合、InvalidArgumentError
が発生することに注意してください。これが発生した場合は、ポリシーを明示的にAutoShardPolicy.DATA
設定するか、ファイルの数がワーカーの数よりも多くなるように、入力ソースを小さなファイルに分割してください。 ファイル:これは、入力ファイルをすべてのワーカーに分割する場合のオプションです。入力ファイルの数がワーカーの数よりもはるかに多く、ファイル内のデータが均等に分散されている場合は、このオプションを使用する必要があります。このオプションの欠点は、ファイル内のデータが均等に分散されていない場合にアイドル状態のワーカーがいることです。ファイルの数がワーカーの数より少ない場合、
InvalidArgumentError
が発生します。これが発生した場合は、ポリシーを明示的にAutoShardPolicy.DATA
設定してAutoShardPolicy.DATA
。たとえば、それぞれ1つのレプリカを持つ2つのワーカーに2つのファイルを配布するとします。ファイル1には[0、1、2、3、4、5]が含まれ、ファイル2には[6、7、8、9、10、11]が含まれます。同期しているレプリカの総数を2、グローバルバッチサイズを4とします。- ワーカー0:
- バッチ1 =レプリカ1:[0、1]
- バッチ2 =レプリカ1:[2、3]
- バッチ3 =レプリカ1:[4]
- バッチ4 =レプリカ1:[5]
- 労働者1:
- バッチ1 =レプリカ2:[6、7]
- バッチ2 =レプリカ2:[8、9]
- バッチ3 =レプリカ2:[10]
- バッチ4 =レプリカ2:[11]
データ:これにより、すべてのワーカー間で要素が自動シャーディングされます。各ワーカーはデータセット全体を読み取り、割り当てられたシャードのみを処理します。他のすべてのシャードは破棄されます。これは通常、入力ファイルの数がワーカーの数より少なく、すべてのワーカー間でデータをより適切にシャーディングする場合に使用されます。欠点は、データセット全体が各ワーカーで読み取られることです。たとえば、1つのファイルを2つのワーカーに配布するとします。ファイル1には、[0、1、2、3、4、5、6、7、8、9、10、11]が含まれています。同期しているレプリカの総数を2とします。
- ワーカー0:
- バッチ1 =レプリカ1:[0、1]
- バッチ2 =レプリカ1:[4、5]
- バッチ3 =レプリカ1:[8、9]
- 労働者1:
- バッチ1 =レプリカ2:[2、3]
- バッチ2 =レプリカ2:[6、7]
- バッチ3 =レプリカ2:[10、11]
オフ:自動シャーディングをオフにすると、各ワーカーがすべてのデータを処理します。たとえば、1つのファイルを2つのワーカーに配布するとします。ファイル1には、[0、1、2、3、4、5、6、7、8、9、10、11]が含まれています。同期しているレプリカの総数を2とすると、各ワーカーには次の分布が表示されます。
- ワーカー0:
- バッチ1 =レプリカ1:[0、1]
- バッチ2 =レプリカ1:[2、3]
- バッチ3 =レプリカ1:[4、5]
- バッチ4 =レプリカ1:[6、7]
- バッチ5 =レプリカ1:[8、9]
バッチ6 =レプリカ1:[10、11]
労働者1:
バッチ1 =レプリカ2:[0、1]
バッチ2 =レプリカ2:[2、3]
バッチ3 =レプリカ2:[4、5]
バッチ4 =レプリカ2:[6、7]
バッチ5 =レプリカ2:[8、9]
バッチ6 =レプリカ2:[10、11]
プリフェッチ
デフォルトでは、 tf.distribute
は、ユーザーがtf.data.Dataset
インスタンスの最後にプリフェッチ変換を追加します。 buffer_size
あるプリフェッチ変換の引数は、同期しているレプリカの数と同じbuffer_size
。
tf.distribute.Strategy.distribute_datasets_from_function
使用法
このAPIは入力関数をtf.distribute.DistributedDataset
、 tf.distribute.DistributedDataset
インスタンスを返します。ユーザーが渡す入力関数にはtf.distribute.InputContext
引数があり、tf.data.Dataset
インスタンスを返す必要がtf.data.Dataset
ます。このAPIを使用すると、 tf.distribute
は、入力関数から返されたユーザーのtf.data.Dataset
インスタンスにそれ以上の変更を加えません。データセットをバッチ処理してシャーディングするのはユーザーの責任です。 tf.distribute
は、各ワーカーのCPUデバイスで入力関数を呼び出します。ユーザーが独自のバッチ処理およびシャーディングロジックを指定できるようにするだけでなく、このAPIは、マルチワーカートレーニングに使用した場合、 tf.distribute.Strategy.experimental_distribute_dataset
と比較して優れたスケーラビリティとパフォーマンスを示します。
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
プロパティ
バッチ処理
入力関数の戻り値であるtf.data.Dataset
インスタンスは、レプリカごとのバッチサイズを使用してバッチ処理する必要があります。レプリカごとのバッチサイズは、グローバルバッチサイズを同期トレーニングに参加しているレプリカの数で割ったものです。これは、 tf.distribute
が各ワーカーのCPUデバイスで入力関数を呼び出すためtf.distribute
。特定のワーカーで作成されたデータセットは、そのワーカーのすべてのレプリカで使用できるようになっている必要があります。
シャーディング
ユーザーの入力関数に引数として暗黙的に渡されるtf.distribute.InputContext
オブジェクトは、 tf.distribute
でtf.distribute
によって作成されます。ワーカーの数、現在のワーカーIDなどに関する情報があります。この入力関数は、 tf.distribute.InputContext
オブジェクトの一部であるこれらのプロパティを使用して、ユーザーが設定したポリシーに従ってシャーディングを処理できます。
プリフェッチ
tf.distribute
は、ユーザー提供の入力関数によって返されるtf.data.Dataset
の最後にプリフェッチ変換を追加しません。
分散イテレーター
非分散tf.data.Dataset
インスタンスと同様に、 tf.distribute.DistributedDataset
インスタンスにイテレーターを作成して、それを反復処理し、 tf.distribute.DistributedDataset
要素にアクセスする必要があります。以下は、 tf.distribute.DistributedIterator
を作成し、 tf.distribute.DistributedIterator
を使用してモデルをトレーニングする方法です。
使用法
Pythonicforループ構造を使用する
ユーザーフレンドリーなPythonicループを使用して、 tf.distribute.DistributedDataset
を反復処理できます。返された要素tf.distribute.DistributedIterator
単一であってもよいtf.Tensor
又はtf.distribute.DistributedValues
レプリカあたり値を含みます。 tf.function
内にループを配置すると、パフォーマンスが向上します。しかし、 break
やreturn
現在、以上のループのためにサポートされていないtf.distribute.DistributedDataset
の内側に配置されてtf.function
。
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
iter
を使用して明示的なイテレータを作成します
内の要素を反復処理するにはtf.distribute.DistributedDataset
インスタンスを、あなたが作成することができtf.distribute.DistributedIterator
使用してiter
それにAPIを。明示的な反復子を使用すると、固定数のステップで反復できます。 tf.distribute.DistributedIterator
インスタンスdist_iterator
から次の要素を取得するには、next next(dist_iterator)
、 dist_iterator.get_next()
、またはdist_iterator.get_next_as_optional()
を呼び出すことができます。前の2つは本質的に同じです:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
next()
またはtf.distribute.DistributedIterator.get_next()
で、 tf.distribute.DistributedIterator
が終了に達した場合、OutOfRangeエラーがスローされます。クライアントは、python側でエラーをキャッチし、チェックポイントや評価などの他の作業を続行できます。ただし、次のようなホストトレーニングループを使用している場合(つまり、 tf.function
ごとに複数のステップを実行している場合)は機能しません。
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
は、ステップ本体をtf.range
内にラップすることにより、複数のステップが含まれています。この場合、依存関係のないループ内の異なる反復が並行して開始される可能性があるため、前の反復の計算が終了する前に、後の反復でOutOfRangeエラーがトリガーされる可能性があります。 OutOfRangeエラーがスローされると、関数内のすべての操作がすぐに終了します。これを避けたい場合は、OutOfRangeエラーをスローしない代替手段はtf.distribute.DistributedIterator.get_next_as_optional()
です。 get_next_as_optional
は、 tf.experimental.Optional
を返します。これには、次の要素が含まれるか、 tf.distribute.DistributedIterator
が終了した場合は値が含まれません。
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
element_spec
プロパティの使用
あなたがに配布データセットの要素を渡すとtf.function
としたいtf.TypeSpec
保証を、次のように指定することができますinput_signature
引数tf.function
。分散データセットの出力はtf.distribute.DistributedValues
あり、単一のデバイスまたは複数のデバイスへの入力を表すことができます。この分散値に対応するtf.TypeSpec
を取得するには、分散データセットまたは分散イテレーターオブジェクトのelement_spec
プロパティを使用できます。
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
部分バッチ
部分的なバッチは、ユーザーが作成するtf.data.Dataset
インスタンスに、レプリカの数で均等に分割できないバッチサイズが含まれている場合、またはデータセットインスタンスのカーディナリティがバッチサイズで分割できない場合に発生します。これは、データセットが複数のレプリカに分散されている場合、一部のイテレーターをnext
呼び出すとOutOfRangeErrorが発生することを意味します。このユースケースを処理するために、 tf.distribute
は、処理するデータがこれ以上ないレプリカで、バッチサイズ0のダミーバッチを返します。
シングルワーカーの場合、イテレーターでのnext
呼び出しによってデータが返されない場合、0バッチサイズのダミーバッチが作成され、データセット内の実際のデータとともに使用されます。部分バッチの場合、データの最後のグローバルバッチには、データのダミーバッチとともに実際のデータが含まれます。データ処理の停止条件で、レプリカにデータがあるかどうかがチェックされるようになりました。どのレプリカにもデータがない場合、OutOfRangeエラーがスローされます。
マルチワーカーの場合、各ワーカーのデータの存在を表すブール値は、クロスレプリカ通信を使用して集計され、これを使用して、すべてのワーカーが分散データセットの処理を終了したかどうかを識別します。これにはクロスワーカー通信が含まれるため、パフォーマンスが低下します。
警告
複数のワーカー設定で
tf.distribute.Strategy.experimental_distribute_dataset
使用する場合、ユーザーはファイルから読み取るtf.data.Dataset
を渡します。tf.data.experimental.AutoShardPolicy
がAUTO
またはFILE
設定されている場合、実際のステップごとのバッチサイズは、ユーザー定義のグローバルバッチサイズよりも小さい場合があります。これは、ファイル内の残りの要素がグローバルバッチサイズよりも小さい場合に発生する可能性があります。ユーザーは、実行するステップ数に依存せずにデータセットを使い果たすか、tf.data.experimental.AutoShardPolicy
をDATA
に設定してtf.data.experimental.AutoShardPolicy
することができます。ステートフルデータセット変換は現在
tf.distribute
でサポートされておらず、データセットが持つ可能性のあるステートフルopsは現在無視されます。たとえば、データセットにmap_fn
を使用して画像を回転させるtf.random.uniform
がある場合、pythonプロセスが実行されているローカルマシンの状態(つまりランダムシード)に依存するデータセットグラフがあります。デフォルトで無効になっている実験的な
tf.data.experimental.OptimizationOptions
は、特定のコンテキスト(tf.distribute
一緒に使用する場合など)でパフォーマンスの低下を引き起こす可能性があります。それらが分散設定でワークロードのパフォーマンスに役立つことを検証した後でのみ、それらを有効にする必要があります。一般に
tf.data
して入力パイプラインを最適化する方法については、このガイドを参照してください。いくつかの追加のヒント:複数のワーカーがあり、
tf.data.Dataset.list_files
を使用して1つ以上のglobパターンに一致するすべてのファイルからデータセットを作成している場合は、seed
引数を設定するか、shuffle=False
設定して、各ワーカーがファイルを一貫してtf.data.Dataset.list_files
するようにしてください。入力パイプラインにレコードレベルでのデータのシャッフルとデータの解析の両方が含まれている場合、未解析のデータが解析済みのデータよりも大幅に大きい場合を除き(通常はそうではありません)、次の例に示すように、最初にシャッフルしてから解析します。これにより、メモリ使用量とパフォーマンスが向上する可能性があります。
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
は、buffer_size
要素の内部バッファーを維持するため、buffer_size
を減らすとOOMの問題が軽減される可能性があります。tf.distribute.experimental_distribute_dataset
またはtf.distribute.distribute_datasets_from_function
を使用するときに、ワーカーがデータを処理するtf.distribute.distribute_datasets_from_function
は保証されません。これは通常、tf.distribute
を使用してスケール予測をtf.distribute
場合に必要です。ただし、バッチ内の各要素にインデックスを挿入し、それに応じて出力を並べ替えることができます。次のスニペットは、出力を注文する方法の例です。
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
正規のtf.data.Datasetインスタンスを使用していない場合、データを配布するにはどうすればよいですか?
ユーザーがtf.data.Dataset
を使用して入力を表し、続いて上記のAPIを使用してデータセットを複数のデバイスに配布できない場合があります。このような場合、生のテンサーまたはジェネレーターからの入力を使用できます。
任意のテンソル入力にexperimental_distribute_values_from_functionを使用します
strategy.run
は、 next(iterator)
出力であるtf.distribute.DistributedValues
を受け入れnext(iterator)
。テンソル値を渡すには、 experimental_distribute_values_from_function
を使用して、生のtf.distribute.DistributedValues
からtf.distribute.DistributedValues
します。
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
入力がジェネレーターからのものである場合は、tf.data.Dataset.from_generatorを使用します
使用するジェネレーター関数がある場合は、tf.data.Dataset
を使用してfrom_generator
インスタンスを作成できます。
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)