![]() | ![]() | ![]() | ![]() |
Tf.distribute API'leri, kullanıcıların eğitimlerini tek bir makineden birden çok makineye ölçeklendirmeleri için kolay bir yol sağlar. Modellerini ölçeklendirirken, kullanıcıların da girdilerini birden çok cihaza dağıtması gerekir. tf.distribute
, tf.distribute
cihazlar arasında otomatik olarak dağıtabileceğiniz API'ler sağlar.
Bu kılavuz size tf.distribute
API'lerini kullanarak dağıtılmış veri kümesi ve yineleyiciler oluşturmanın farklı yollarını gösterecektir. Ek olarak, aşağıdaki konular ele alınacaktır:
-
tf.distribute.Strategy.experimental_distribute_dataset
vetf.distribute.Strategy.distribute_datasets_from_function
kullanılırken kullanım, parçalama ve gruplama seçenekleri. - Dağıtılmış veri kümesi üzerinde yineleme yapabileceğiniz farklı yollar.
-
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API'leri vetf.data
API'leri arasındakitf.distribute.Strategy.experimental_distribute_dataset
yanı sıra kullanıcıların kullanımlarında karşılaşabilecekleri tüm sınırlamalar.
Bu kılavuz, Keras API'leri ile dağıtılmış girdilerin kullanımını kapsamaz.
Dağıtılmış Veri Kümeleri
tf.distribute
için tf.distribute
API'lerini kullanmak için, kullanıcıların girdilerini temsil etmek üzeretf.data.Dataset
kullanmalarıtf.data.Dataset
. tf.distribute
,tf.data.Dataset
(örneğin, verilerin her hızlandırıcı cihaza otomatik olarak önceden getirilmesi) ile verimli bir şekilde çalışması için yapılmıştır ve performans optimizasyonları düzenli olarak uygulamaya dahil edilmiştir.tf.data.Dataset
dışında bir şey kullanmak için bir kullanım durumunuztf.data.Dataset
, lütfen bu kılavuzun sonraki bölümlerine bakın. Dağıtılmamış bir eğitim döngüsünde, kullanıcılar önce birtf.data.Dataset
örneği oluşturur ve ardından öğeler üzerindetf.data.Dataset
. Örneğin:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.4.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Kullanıcıların tf.distribute
stratejisini bir kullanıcının mevcut kodunda minimum değişikliklerle kullanmasına izin vermek için, birtf.data.Dataset
örneğinitf.data.Dataset
ve dağıtılmış bir veri kümesi nesnesi döndürecek iki API tanıtıldı. Bir kullanıcı daha sonra bu dağıtılmış veri kümesi örneği üzerinde yineleme yapabilir ve modelini daha önce olduğu gibi eğitebilir. Şimdi iki tf.distribute.Strategy.experimental_distribute_dataset
- tf.distribute.Strategy.experimental_distribute_dataset
ve tf.distribute.Strategy.distribute_datasets_from_function
- daha ayrıntılı olarak tf.distribute.Strategy.distribute_datasets_from_function
:
tf.distribute.Strategy.experimental_distribute_dataset
Kullanım
Bu API, girdi olarak birtf.data.Dataset
örneğini alır ve bir tf.distribute.DistributedDataset
örneği döndürür. Girdi veri kümesini genel toplu iş boyutuna eşit bir değerle toplu işlemelisiniz. Bu global parti boyutu, tüm cihazlarda 1 adımda işlemek istediğiniz örnek sayısıdır. Bir Pythonic moda bu dağıtılmış veri kümesini üzerinde yineleme veya kullanan bir yineleyici oluşturabilir iter
. Döndürülen nesne birtf.data.Dataset
örneği değildir ve veri kümesini herhangi bir şekilde dönüştüren veya inceleyen diğer API'leri desteklemez. Girişinizi farklı kopyalar üzerinden parçalamak istediğiniz belirli yöntemlere sahip değilseniz, önerilen API budur.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>)
Özellikleri
Harmanlama
tf.distribute
giriş rebatchestf.data.Dataset
senkronize kopyaları sayısına bölünmesiyle elde edilen küresel parti boyutuna eşittir yeni bir parti boyutu örneği. Senkronize edilmiş çoğaltma sayısı, eğitim sırasında gradyan azalmasında yer alan cihazların sayısına eşittir. Bir kullanıcı dağıtılmış yineleyicide bir next
çağırdığında, her eşlemede çoğaltma başına toplu veri boyutu döndürülür. Yeniden bağlanan veri kümesinin önem düzeyi, her zaman kopya sayısının katı olacaktır. Burada bir çift örnek var:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Dağıtım olmadan:
- 1. Parti: [0, 1, 2, 3]
- 2. Parti: [4, 5]
2 kopya üzerinde dağıtım ile. Son grup ([4, 5]) 2 kopya arasında bölünür.
Parti 1:
- Kopya 1: [0, 1]
- Kopya 2: [2, 3]
Parti 2:
- Kopya 2: [4]
- Kopya 2: [5]
tf.data.Dataset.range(4).batch(4)
- Dağıtım olmadan:
- 1. Parti: [[0], [1], [2], [3]]
- 5 kopyadan fazla dağıtım ile:
- Parti 1:
- Kopya 1: [0]
- Kopya 2: [1]
- Kopya 3: [2]
- Kopya 4: [3]
- Kopya 5: []
tf.data.Dataset.range(8).batch(4)
- Dağıtım olmadan:
- 1. Parti: [0, 1, 2, 3]
- 2. Parti: [4, 5, 6, 7]
- 3 kopya üzerinde dağıtım ile:
- Parti 1:
- Kopya 1: [0, 1]
- Kopya 2: [2, 3]
- Kopya 3: []
- Parti 2:
- Kopya 1: [4, 5]
- Kopya 2: [6, 7]
- Kopya 3: []
Veri kümesinin yeniden işlenmesi, yineleme sayısıyla doğrusal olarak artan bir alan karmaşıklığına sahiptir. Bu, çok çalışanlı eğitim kullanım durumu için, giriş hattının OOM hatalarıyla karşılaşabileceği anlamına gelir.
Parçalama
tf.distribute
ayrıca MultiWorkerMirroredStrategy ve TPUStrategy
ile çoklu çalışan eğitiminde girdi veri kümesini MultiWorkerMirroredStrategy
TPUStrategy
. Her veri kümesi, çalışanın CPU cihazında oluşturulur. Veri kümesini bir çalışan kümesi üzerinde otomatik olarak paylaşmak, her çalışana tüm veri kümesinin bir alt kümesinin atanması anlamına gelir (doğru tf.data.experimental.AutoShardPolicy
ayarlandıysa). Bu, her adımda, çakışmayan veri kümesi öğelerinin genel toplu boyutunun her işçi tarafından işlenmesini sağlamak içindir. Autosharding kullanılarak belirtilebilir farklı birkaç seçenek vardır tf.data.experimental.DistributeOptions
. ParameterServerStrategy
ile çoklu çalışan eğitiminde otomatik paylaşım olmadığını ve bu strateji ile veri kümesi oluşturma hakkında daha fazla bilgi Parametre Sunucusu Stratejisi eğitiminde bulunabilir .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
tf.data.experimental.AutoShardPolicy
için ayarlayabileceğiniz üç farklı seçenek vardır:
- OTOMATİK: Bu varsayılan seçenektir, yani FILE tarafından parçalama girişiminde bulunulacaktır. Dosya tabanlı bir veri kümesi algılanmazsa, FILE ile parçalama girişimi başarısız olur.
tf.distribute
daha sonra DATA ile parçalanmaya geri döner. Girdi veri kümesi dosya tabanlıysa, ancak dosya sayısı çalışan sayısından azsa, birInvalidArgumentError
çıkacaktır. Böyle bir durumda, politikayı açıkçaAutoShardPolicy.DATA
ayarlayın veya giriş kaynağınızı, dosya sayısı çalışan sayısından fazla olacak şekilde daha küçük dosyalara bölün. DOSYA: Girdi dosyalarını tüm çalışanlar üzerinde parçalamak istiyorsanız bu seçenektir. Girdi dosyalarının sayısı çalışan sayısından çok daha fazlaysa ve dosyalardaki veriler eşit olarak dağıtılmışsa bu seçeneği kullanmalısınız. Bu seçeneğin dezavantajı, dosyalardaki veriler eşit olarak dağıtılmadığında boşta çalışanlara sahip olmasıdır. Dosya sayısı çalışan sayısından azsa, bir
InvalidArgumentError
ortaya çıkar. Böyle bir durumda, politikayı açıkçaAutoShardPolicy.DATA
. Örneğin, 2 dosyayı her birinde 1 kopya olacak şekilde 2 çalışana dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5] içerir ve Dosya 2 [6, 7, 8, 9, 10, 11] içerir. Eşzamanlı olarak toplam kopya sayısı 2 ve genel toplu iş boyutu 4 olsun.- İşçi 0:
- Parti 1 = Kopya 1: [0, 1]
- Grup 2 = Kopya 1: [2, 3]
- Grup 3 = Kopya 1: [4]
- Parti 4 = Kopya 1: [5]
- İşçi 1:
- Grup 1 = Kopya 2: [6, 7]
- Parti 2 = Kopya 2: [8, 9]
- Grup 3 = Kopya 2: [10]
- Parti 4 = Kopya 2: [11]
VERİ: Bu, öğeleri tüm çalışanlar arasında otomatik olarak paylaşacaktır. Çalışanların her biri veri kümesinin tamamını okuyacak ve yalnızca kendisine atanan parçayı işleyecektir. Diğer tüm parçalar atılacak. Bu genellikle, girdi dosyalarının sayısı çalışan sayısından azsa ve tüm çalışanlar arasında verilerin daha iyi parçalanmasını istiyorsanız kullanılır. Olumsuz yanı, veri kümesinin tamamının her işçi üzerinde okunacak olmasıdır. Örneğin, 1 dosyayı 2 çalışana dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Eşitlemedeki toplam kopya sayısı 2 olsun.
- İşçi 0:
- Parti 1 = Kopya 1: [0, 1]
- Parti 2 = Kopya 1: [4, 5]
- Grup 3 = Kopya 1: [8, 9]
- İşçi 1:
- Parti 1 = Kopya 2: [2, 3]
- Parti 2 = Kopya 2: [6, 7]
- Grup 3 = Kopya 2: [10, 11]
KAPALI: Otomatik paylaşmayı kapatırsanız, her çalışan tüm verileri işleyecektir. Örneğin, 1 dosyayı 2 çalışana dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Eşzamanlı toplam kopya sayısı 2 olsun. Ardından her çalışan aşağıdaki dağılımı görecek:
- İşçi 0:
- Parti 1 = Kopya 1: [0, 1]
- Grup 2 = Kopya 1: [2, 3]
- Grup 3 = Kopya 1: [4, 5]
- Grup 4 = Kopya 1: [6, 7]
- Parti 5 = Kopya 1: [8, 9]
Parti 6 = Kopya 1: [10, 11]
İşçi 1:
Grup 1 = Kopya 2: [0, 1]
Grup 2 = Kopya 2: [2, 3]
Grup 3 = Kopya 2: [4, 5]
Parti 4 = Kopya 2: [6, 7]
Parti 5 = Kopya 2: [8, 9]
Toplu 6 = Kopya 2: [10, 11]
Önceden getiriliyor
Varsayılan olarak, tf.distribute
, kullanıcı tarafından sağlanantf.data.Dataset
örneğinin sonuna bir önceden getirme dönüşümü ekler. buffer_size
olan önceden getirme dönüşümünün argümanı, senkronizasyondaki replikaların sayısına eşittir.
tf.distribute.Strategy.distribute_datasets_from_function
Kullanım
Bu API bir girdi işlevini alır ve bir tf.distribute.DistributedDataset
örneği döndürür. Kullanıcıların tf.distribute.InputContext
girdi işlevi bir tf.distribute.InputContext
bağımsız değişkenine sahiptir ve birtf.data.Dataset
örneği döndürmelidir. Bu API ile tf.distribute
, kullanıcının girdi işlevinden döndürülentf.data.Dataset
örneğinde başka bir değişiklik yapmaz. Veri kümesini toplu işlemek ve parçalamak kullanıcının sorumluluğundadır. tf.distribute
, her bir çalışanın CPU aygıtındaki giriş işlevini çağırır. Kullanıcıların kendi gruplama ve parçalama mantığını belirlemelerine izin vermenin yanı sıra, bu API, çok çalışanlı eğitim için kullanıldığında tf.distribute.Strategy.experimental_distribute_dataset
karşılaştırıldığında daha iyi ölçeklenebilirlik ve performans gösterir.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Özellikleri
Harmanlama
Girdi işlevinin dönüş değeri olantf.data.Dataset
örneği, çoğaltma başına toplu iş boyutu kullanılarak toplu hale getirilmelidir. Çoğaltma başına toplu iş boyutu, genel toplu iş boyutunun eşitleme eğitiminde yer alan eşleme sayısına bölünmesiyle elde edilir. Bunun nedeni, tf.distribute
işlevinin her bir çalışanın CPU aygıtındaki girdi işlevini tf.distribute
. Belirli bir çalışan üzerinde oluşturulan veri kümesi, o çalışan üzerindeki tüm kopyalar tarafından kullanıma hazır olmalıdır.
Parçalama
Kullanıcının girdi işlevine bağımsız değişken olarak örtük olarak iletilen tf.distribute.InputContext
nesnesi, başlık altında tf.distribute
tarafından oluşturulur. Çalışan sayısı, geçerli çalışan kimliği vb. Hakkında bilgi içerir. Bu girdi işlevi, tf.distribute.InputContext
nesnesinin parçası olan bu özellikleri kullanarak kullanıcı tarafından belirlenen tf.distribute.InputContext
.
Önceden getiriliyor
tf.distribute
, kullanıcı tarafından sağlanan girdi işlevi tarafından döndürülentf.data.Dataset
sonuna bir ön getirme dönüşümütf.data.Dataset
.
Dağıtılmış Yineleyiciler
Dağıtılmamıştf.data.Dataset
örneklerine benzer şekilde, üzerinde yineleme yapmak ve tf.distribute.DistributedDataset
içindeki öğelere erişmek için tf.distribute.DistributedDataset
örneklerinde bir yineleyici oluşturmanız gerekir. Aşağıda, bir tf.distribute.DistributedIterator
oluşturmanın ve modelinizi eğitmek için kullanmanın yolları şunlardır:
Kullanımlar
Pythonic for loop yapısı kullanın
tf.distribute.DistributedDataset
üzerinde yineleme yapmak için kullanıcı dostu bir Pythonic döngüsü kullanabilirsiniz. Dönen elemanlar tf.distribute.DistributedIterator
tek olabilir tf.Tensor
veya tf.distribute.DistributedValues
yineleme başına bir değer içerir. Döngüyü bir tf.function
içine yerleştirmek performans artışı sağlayacaktır. Ancak, break
ve return
henüz üzerinden bir döngü için desteklenmez tf.distribute.DistributedDataset
bir iç yerleştirilir tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Açık bir yineleyici oluşturmak için iter
kullanın
Yineleme yapmak için bir elemanların üzerinde tf.distribute.DistributedDataset
Örneğin, bir oluşturabilir tf.distribute.DistributedIterator
kullanarak iter
üzerinde API. Açık bir yineleyici ile, sabit sayıda adım için yineleme yapabilirsiniz. Bir tf.distribute.DistributedIterator
örnek dist_iterator
sonraki elemanı almak için, next(dist_iterator)
, dist_iterator.get_next()
veya dist_iterator.get_next_as_optional()
. İlk ikisi esasen aynıdır:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
tf.distribute.DistributedIterator.get_next()
next()
veya tf.distribute.DistributedIterator.get_next()
, tf.distribute.DistributedIterator
sonuna ulaştıysa, bir OutOfRange hatası atılır. İstemci, python tarafında hatayı yakalayabilir ve kontrol noktası belirleme ve değerlendirme gibi diğer işleri yapmaya devam edebilir. Ancak, aşağıdaki gibi görünen bir ana bilgisayar eğitim döngüsü kullanıyorsanız (yani, tf.function
başına birden fazla adım çalıştırın) bu tf.function
:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
, adım gövdesini bir tf.range
içerisine sararak birden çok adım içerir. Bu durumda, döngüdeki bağımlılık olmadan farklı yinelemeler paralel olarak başlayabilir, bu nedenle bir OutOfRange hatası, önceki yinelemelerin hesaplanması bitmeden önce sonraki yinelemelerde tetiklenebilir. Bir OutOfRange hatası atıldığında, işlevdeki tüm işlemler hemen sonlandırılacaktır. Bu, kaçınmak isteyeceğiniz bir durumsa, tf.distribute.DistributedIterator.get_next_as_optional()
hatası vermeyen bir alternatif tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
döner bir tf.experimental.Optional
bir sonraki elemanı ya da herhangi bir değeri içeren tf.distribute.DistributedIterator
sona ulaşmıştır.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
element_spec
özelliğini kullanma
Bir dağıtılmış bir veri kümesi unsurlarını geçerseniz tf.function
ve istediğiniz tf.TypeSpec
garantisi, Belirtebileceğiniz input_signature
savını tf.function
. Dağıtılmış bir veri kümesinin çıktısı, girdiyi tek bir cihaza veya birden çok cihaza temsil tf.distribute.DistributedValues
. Bu dağıtılmış değere karşılık gelen tf.TypeSpec
elde etmek için, dağıtılmış veri kümesinin veya dağıtılmış yineleyici nesnesinin element_spec
özelliğini kullanabilirsiniz.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Kısmi Partiler
Kullanıcıların oluşturduğutf.data.Dataset
örnekleri, çoğaltma sayısına eşit olarak bölünemeyen toplu iş boyutları içerebildiğinde veya veri kümesi örneğinin önem düzeyi toplu iş boyutuna bölünemediğinde kısmi partilerle karşılaşılır. Bu, veri kümesi birden fazla çoğaltmaya dağıtıldığında, bazı yineleyicilerdeki bir next
çağrının bir OutOfRangeError ile sonuçlanacağı anlamına gelir. Bu kullanım örneğini işlemek için tf.distribute
, tf.distribute
başka veriye sahip olmayan eşlemelerde 0 toplu iş boyutunda sahte gruplar döndürür.
Tek işçi durumu için, veriler yineleyicide bir next
çağrı tarafından döndürülmezse, 0 toplu iş boyutundaki sahte gruplar oluşturulur ve veri kümesindeki gerçek verilerle birlikte kullanılır. Kısmi partiler söz konusu olduğunda, son küresel veri grubu, sahte veri yığınlarının yanı sıra gerçek verileri içerecektir. Verilerin işlenmesi için durdurma koşulu artık kopyalardan herhangi birinin veriye sahip olup olmadığını kontrol eder. Yinelemelerin hiçbirinde veri yoksa, OutOfRange hatası atılır.
Çoklu çalışan durumu için, her bir çalışanla ilgili verilerin varlığını temsil eden boole değeri, çapraz çoğaltma iletişimi kullanılarak toplanır ve bu, tüm çalışanların dağıtılmış veri kümesini işlemeyi bitirip bitirmediğini belirlemek için kullanılır. Bu, çalışanlar arası iletişimi içerdiğinden, bazı performans cezaları söz konusudur.
Uyarılar
tf.distribute.Strategy.experimental_distribute_dataset
API'lerini birden çok çalışan kurulumuyla kullanırken, kullanıcılar dosyalardan okuyan birtf.data.Dataset
.tf.data.experimental.AutoShardPolicy
,AUTO
veyaFILE
olarak ayarlanmışsa, adım başına gerçek parti boyutu, kullanıcı tanımlı genel parti boyutundan daha küçük olabilir. Bu, dosyadaki kalan öğeler genel toplu iş boyutundan daha küçük olduğunda gerçekleşebilir. Kullanıcılar, çalıştırılacak adımların sayısına bağlı olmaksızın veri kümesinitf.data.experimental.AutoShardPolicy
veyatf.data.experimental.AutoShardPolicy
DATA
olarak ayarlayaraktf.data.experimental.AutoShardPolicy
etrafında çalışabilir.Durum bilgisi olan veri kümesi dönüşümleri şu anda
tf.distribute
ile desteklenmemektedir ve veri kümesinin sahip olabileceği durum bilgisi olan tüm işlemler şu anda göz ardı edilmektedir. Örneğin, verimap_fn
bir görüntüyü döndürmek içintf.random.uniform
kullanan birtf.random.uniform
varsa, python işleminin yürütüldüğü yerel makinedeki duruma (yani rastgele çekirdeğe) bağlı olan bir veri kümesi grafiğiniz olur.Varsayılan olarak devre dışı bırakılan deneysel
tf.data.experimental.OptimizationOptions
, belirli bağlamlarda - örneğintf.distribute
ile birlikte kullanıldığında - performanstf.distribute
neden olabilir. Bunları yalnızca bir dağıtım ayarında iş yükünüzün performansına fayda sağladıklarını doğruladıktan sonra etkinleştirmelisiniz.Genel olarak
tf.data
ile girdi ardışıktf.data
nasıl optimize edeceğiniz konusunda lütfen bu kılavuza bakın. Birkaç ek ipucu:Birden fazla çalışanınız varsa ve bir veya daha fazla glob
tf.data.Dataset.list_files
eşleşen tüm dosyalardan bir veri kümesi oluşturmak içintf.data.Dataset.list_files
kullanıyorsanız, her çalışanın dosyayı tutarlı bir şekildetf.data.Dataset.list_files
içinseed
bağımsız değişkenini ayarlamayı veyashuffle=False
ayarlamayı unutmayın.Girdi ardışık düzeniniz hem verileri kayıt düzeyinde karıştırmayı hem de verileri ayrıştırmayı içeriyorsa, ayrıştırılmamış veriler ayrıştırılmış verilerden önemli ölçüde daha büyük olmadıkça (bu genellikle böyle değildir), aşağıdaki örnekte gösterildiği gibi önce karıştırın ve ardından ayrıştırın. Bu, bellek kullanımına ve performansına fayda sağlayabilir.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
buffer_size
öğelerinin dahili bir tamponunu korur ve bu nedenlebuffer_size
azaltılması OOM sorununubuffer_size
kaldırabilir.tf.distribute.experimental_distribute_dataset
veyatf.distribute.distribute_datasets_from_function
kullanılırken verilerin çalışanlar tarafındantf.distribute.experimental_distribute_dataset
tf.distribute.distribute_datasets_from_function
garanti edilmez. Tahminitf.distribute
içintf.distribute
kullanıyorsanız bu genellikle gereklidir. Bununla birlikte, toplu işteki her eleman için bir dizin ekleyebilir ve çıktıları buna göre sipariş edebilirsiniz. Aşağıdaki kod parçası, çıktıların nasıl sıralanacağına dair bir örnektir.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
Kanonik bir tf.data.Dataset örneği kullanmıyorsam verilerimi nasıl dağıtabilirim?
Bazen kullanıcılar girişlerini temsil etmek için birtf.data.Dataset
ve daha sonra veri kümesini birden çok cihaza dağıtmak için yukarıda belirtilen API'leri kullanamazlar. Bu gibi durumlarda ham tensörleri veya bir jeneratörden gelen girdileri kullanabilirsiniz.
Keyfi tensör girdileri için experimental_distribute_values_from_function kullanın
strategy.run
, bir next(iterator)
çıktısı olan tf.distribute.DistributedValues
kabul eder. Tensör değerlerini geçirmek için, ham tensörlerden tf.distribute.DistributedValues
oluşturmak için experimental_distribute_values_from_function
işlevini kullanın.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Girişiniz bir jeneratörden geliyorsa tf.data.Dataset.from_generator kullanın
tf.data.Dataset
istediğiniz bir oluşturucu işleviniz varsa, from_generator
API'sini kullanarak birtf.data.Dataset
örneği oluşturabilirsiniz.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)