Bu sayfa, Cloud Translation API ile çevrilmiştir.
Switch to English

Dağıtılmış Girdi

TensorFlow.org'da görüntüleyin Google Colab'de çalıştırın Kaynağı GitHub'da görüntüleyin Defteri indirin

Tf.distribute API'leri, kullanıcıların eğitimlerini tek bir makineden birden çok makineye ölçeklendirmeleri için kolay bir yol sağlar. Modellerini ölçeklendirirken, kullanıcıların da girdilerini birden çok cihaza dağıtması gerekir. tf.distribute , tf.distribute cihazlar arasında otomatik olarak dağıtabileceğiniz API'ler sağlar.

Bu kılavuz size tf.distribute API'lerini kullanarak dağıtılmış veri kümesi ve yineleyiciler oluşturmanın farklı yollarını gösterecektir. Ek olarak, aşağıdaki konular ele alınacaktır:

Bu kılavuz, Keras API'leri ile dağıtılmış girdilerin kullanımını kapsamaz.

Dağıtılmış Veri Kümeleri

tf.distribute için tf.distribute API'lerini kullanmak için, kullanıcıların girdilerini temsil etmek üzeretf.data.Dataset kullanmalarıtf.data.Dataset . tf.distribute ,tf.data.Dataset (örneğin, verilerin her hızlandırıcı cihaza otomatik olarak önceden getirilmesi) ile verimli bir şekilde çalışması için yapılmıştır ve performans optimizasyonları düzenli olarak uygulamaya dahil edilmiştir.tf.data.Dataset dışında bir şey kullanmak için bir kullanım durumunuztf.data.Dataset , lütfen bu kılavuzun sonraki bölümlerine bakın. Dağıtılmamış bir eğitim döngüsünde, kullanıcılar önce birtf.data.Dataset örneği oluşturur ve ardından öğeler üzerindetf.data.Dataset . Örneğin:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Kullanıcıların tf.distribute stratejisini bir kullanıcının mevcut kodunda minimum değişikliklerle kullanmasına izin vermek için, birtf.data.Dataset örneğinitf.data.Dataset ve dağıtılmış bir veri kümesi nesnesi döndürecek iki API tanıtıldı. Bir kullanıcı daha sonra bu dağıtılmış veri kümesi örneği üzerinde yineleme yapabilir ve modelini daha önce olduğu gibi eğitebilir. Şimdi iki tf.distribute.Strategy.experimental_distribute_dataset - tf.distribute.Strategy.experimental_distribute_dataset ve tf.distribute.Strategy.distribute_datasets_from_function - daha ayrıntılı olarak tf.distribute.Strategy.distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

Kullanım

Bu API, girdi olarak birtf.data.Dataset örneğini alır ve bir tf.distribute.DistributedDataset örneği döndürür. Girdi veri kümesini genel toplu iş boyutuna eşit bir değerle toplu işlemelisiniz. Bu global parti boyutu, tüm cihazlarda 1 adımda işlemek istediğiniz örnek sayısıdır. Bir Pythonic moda bu dağıtılmış veri kümesini üzerinde yineleme veya kullanan bir yineleyici oluşturabilir iter . Döndürülen nesne birtf.data.Dataset örneği değildir ve veri kümesini herhangi bir şekilde dönüştüren veya inceleyen diğer API'leri desteklemez. Girişinizi farklı kopyalar üzerinden parçalamak istediğiniz belirli yöntemlere sahip değilseniz, önerilen API budur.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Özellikleri

Harmanlama

tf.distribute giriş rebatchestf.data.Dataset senkronize kopyaları sayısına bölünmesiyle elde edilen küresel parti boyutuna eşittir yeni bir parti boyutu örneği. Senkronize edilmiş çoğaltma sayısı, eğitim sırasında gradyan azalmasında yer alan cihazların sayısına eşittir. Bir kullanıcı dağıtılmış yineleyicide bir next çağırdığında, her eşlemede çoğaltma başına toplu veri boyutu döndürülür. Yeniden bağlanan veri kümesinin önem düzeyi, her zaman kopya sayısının katı olacaktır. Burada bir çift örnek var:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Dağıtım olmadan:
    • 1. Parti: [0, 1, 2, 3]
    • 2. Parti: [4, 5]
    • 2 kopya üzerinde dağıtım ile. Son grup ([4, 5]) 2 kopya arasında bölünür.

    • Parti 1:

      • Kopya 1: [0, 1]
      • Kopya 2: [2, 3]
    • Parti 2:

      • Kopya 2: [4]
      • Kopya 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Dağıtım olmadan:
    • 1. Parti: [[0], [1], [2], [3]]
    • 5 kopyadan fazla dağıtım ile:
    • Parti 1:
      • Kopya 1: [0]
      • Kopya 2: [1]
      • Kopya 3: [2]
      • Kopya 4: [3]
      • Kopya 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Dağıtım olmadan:
    • 1. Parti: [0, 1, 2, 3]
    • 2. Parti: [4, 5, 6, 7]
    • 3 kopya üzerinde dağıtım ile:
    • Parti 1:
      • Kopya 1: [0, 1]
      • Kopya 2: [2, 3]
      • Kopya 3: []
    • Parti 2:
      • Kopya 1: [4, 5]
      • Kopya 2: [6, 7]
      • Kopya 3: []

Veri kümesinin yeniden işlenmesi, yineleme sayısıyla doğrusal olarak artan bir alan karmaşıklığına sahiptir. Bu, çok çalışanlı eğitim kullanım durumu için, giriş hattının OOM hatalarıyla karşılaşabileceği anlamına gelir.

Parçalama

tf.distribute ayrıca MultiWorkerMirroredStrategy ve TPUStrategy ile çoklu çalışan eğitiminde girdi veri kümesini MultiWorkerMirroredStrategy TPUStrategy . Her veri kümesi, çalışanın CPU cihazında oluşturulur. Veri kümesini bir çalışan kümesi üzerinde otomatik olarak paylaşmak, her çalışana tüm veri kümesinin bir alt kümesinin atanması anlamına gelir (doğru tf.data.experimental.AutoShardPolicy ayarlandıysa). Bu, her adımda, çakışmayan veri kümesi öğelerinin genel toplu boyutunun her işçi tarafından işlenmesini sağlamak içindir. Autosharding kullanılarak belirtilebilir farklı birkaç seçenek vardır tf.data.experimental.DistributeOptions . ParameterServerStrategy ile çoklu çalışan eğitiminde otomatik paylaşım olmadığını ve bu strateji ile veri kümesi oluşturma hakkında daha fazla bilgi Parametre Sunucusu Stratejisi eğitiminde bulunabilir .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

tf.data.experimental.AutoShardPolicy için ayarlayabileceğiniz üç farklı seçenek vardır:

  • OTOMATİK: Bu varsayılan seçenektir, yani FILE tarafından parçalama girişiminde bulunulacaktır. Dosya tabanlı bir veri kümesi algılanmazsa, FILE ile parçalama girişimi başarısız olur. tf.distribute daha sonra DATA ile parçalanmaya geri döner. Girdi veri kümesi dosya tabanlıysa, ancak dosya sayısı çalışan sayısından azsa, bir InvalidArgumentError çıkacaktır. Böyle bir durumda, politikayı açıkça AutoShardPolicy.DATA ayarlayın veya giriş kaynağınızı, dosya sayısı çalışan sayısından fazla olacak şekilde daha küçük dosyalara bölün.
  • DOSYA: Girdi dosyalarını tüm çalışanlar üzerinde parçalamak istiyorsanız bu seçenektir. Girdi dosyalarının sayısı çalışan sayısından çok daha fazlaysa ve dosyalardaki veriler eşit olarak dağıtılmışsa bu seçeneği kullanmalısınız. Bu seçeneğin dezavantajı, dosyalardaki veriler eşit olarak dağıtılmadığında boşta çalışanlara sahip olmasıdır. Dosya sayısı çalışan sayısından azsa, bir InvalidArgumentError ortaya çıkar. Böyle bir durumda, politikayı açıkça AutoShardPolicy.DATA . Örneğin, 2 dosyayı her birinde 1 kopya olacak şekilde 2 çalışana dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5] içerir ve Dosya 2 [6, 7, 8, 9, 10, 11] içerir. Eşzamanlı olarak toplam kopya sayısı 2 ve genel toplu iş boyutu 4 olsun.

    • İşçi 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Grup 2 = Kopya 1: [2, 3]
    • Grup 3 = Kopya 1: [4]
    • Parti 4 = Kopya 1: [5]
    • İşçi 1:
    • Grup 1 = Kopya 2: [6, 7]
    • Parti 2 = Kopya 2: [8, 9]
    • Grup 3 = Kopya 2: [10]
    • Parti 4 = Kopya 2: [11]
  • VERİ: Bu, öğeleri tüm çalışanlar arasında otomatik olarak paylaşacaktır. Çalışanların her biri veri kümesinin tamamını okuyacak ve yalnızca kendisine atanan parçayı işleyecektir. Diğer tüm parçalar atılacak. Bu genellikle, girdi dosyalarının sayısı çalışan sayısından azsa ve tüm çalışanlar arasında verilerin daha iyi parçalanmasını istiyorsanız kullanılır. Olumsuz yanı, veri kümesinin tamamının her işçi üzerinde okunacak olmasıdır. Örneğin, 1 dosyayı 2 çalışana dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Eşitlemedeki toplam kopya sayısı 2 olsun.

    • İşçi 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [4, 5]
    • Grup 3 = Kopya 1: [8, 9]
    • İşçi 1:
    • Parti 1 = Kopya 2: [2, 3]
    • Parti 2 = Kopya 2: [6, 7]
    • Grup 3 = Kopya 2: [10, 11]
  • KAPALI: Otomatik paylaşmayı kapatırsanız, her çalışan tüm verileri işleyecektir. Örneğin, 1 dosyayı 2 çalışana dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Eşzamanlı toplam kopya sayısı 2 olsun. Ardından her çalışan aşağıdaki dağılımı görecek:

    • İşçi 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Grup 2 = Kopya 1: [2, 3]
    • Grup 3 = Kopya 1: [4, 5]
    • Grup 4 = Kopya 1: [6, 7]
    • Parti 5 = Kopya 1: [8, 9]
    • Parti 6 = Kopya 1: [10, 11]

    • İşçi 1:

    • Grup 1 = Kopya 2: [0, 1]

    • Grup 2 = Kopya 2: [2, 3]

    • Grup 3 = Kopya 2: [4, 5]

    • Parti 4 = Kopya 2: [6, 7]

    • Parti 5 = Kopya 2: [8, 9]

    • Toplu 6 = Kopya 2: [10, 11]

Önceden getiriliyor

Varsayılan olarak, tf.distribute , kullanıcı tarafından sağlanantf.data.Dataset örneğinin sonuna bir önceden getirme dönüşümü ekler. buffer_size olan önceden getirme dönüşümünün argümanı, senkronizasyondaki replikaların sayısına eşittir.

tf.distribute.Strategy.distribute_datasets_from_function

Kullanım

Bu API bir girdi işlevini alır ve bir tf.distribute.DistributedDataset örneği döndürür. Kullanıcıların tf.distribute.InputContext girdi işlevi bir tf.distribute.InputContext bağımsız değişkenine sahiptir ve birtf.data.Dataset örneği döndürmelidir. Bu API ile tf.distribute , kullanıcının girdi işlevinden döndürülentf.data.Dataset örneğinde başka bir değişiklik yapmaz. Veri kümesini toplu işlemek ve parçalamak kullanıcının sorumluluğundadır. tf.distribute , her bir çalışanın CPU aygıtındaki giriş işlevini çağırır. Kullanıcıların kendi gruplama ve parçalama mantığını belirlemelerine izin vermenin yanı sıra, bu API, çok çalışanlı eğitim için kullanıldığında tf.distribute.Strategy.experimental_distribute_dataset karşılaştırıldığında daha iyi ölçeklenebilirlik ve performans gösterir.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Özellikleri

Harmanlama

Girdi işlevinin dönüş değeri olantf.data.Dataset örneği, çoğaltma başına toplu iş boyutu kullanılarak toplu hale getirilmelidir. Çoğaltma başına toplu iş boyutu, genel toplu iş boyutunun eşitleme eğitiminde yer alan eşleme sayısına bölünmesiyle elde edilir. Bunun nedeni, tf.distribute işlevinin her bir çalışanın CPU aygıtındaki girdi işlevini tf.distribute . Belirli bir çalışan üzerinde oluşturulan veri kümesi, o çalışan üzerindeki tüm kopyalar tarafından kullanıma hazır olmalıdır.

Parçalama

Kullanıcının girdi işlevine bağımsız değişken olarak örtük olarak iletilen tf.distribute.InputContext nesnesi, başlık altında tf.distribute tarafından oluşturulur. Çalışan sayısı, geçerli çalışan kimliği vb. Hakkında bilgi içerir. Bu girdi işlevi, tf.distribute.InputContext nesnesinin parçası olan bu özellikleri kullanarak kullanıcı tarafından belirlenen tf.distribute.InputContext .

Önceden getiriliyor

tf.distribute , kullanıcı tarafından sağlanan girdi işlevi tarafından döndürülentf.data.Dataset sonuna bir ön getirme dönüşümütf.data.Dataset .

Dağıtılmış Yineleyiciler

Dağıtılmamıştf.data.Dataset örneklerine benzer şekilde, üzerinde yineleme yapmak ve tf.distribute.DistributedDataset içindeki öğelere erişmek için tf.distribute.DistributedDataset örneklerinde bir yineleyici oluşturmanız gerekir. Aşağıda, bir tf.distribute.DistributedIterator oluşturmanın ve modelinizi eğitmek için kullanmanın yolları şunlardır:

Kullanımlar

Pythonic for loop yapısı kullanın

tf.distribute.DistributedDataset üzerinde yineleme yapmak için kullanıcı dostu bir Pythonic döngüsü kullanabilirsiniz. Dönen elemanlar tf.distribute.DistributedIterator tek olabilir tf.Tensor veya tf.distribute.DistributedValues yineleme başına bir değer içerir. Döngüyü bir tf.function içine yerleştirmek performans artışı sağlayacaktır. Ancak, break ve return henüz üzerinden bir döngü için desteklenmez tf.distribute.DistributedDataset bir iç yerleştirilir tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Açık bir yineleyici oluşturmak için iter kullanın

Yineleme yapmak için bir elemanların üzerinde tf.distribute.DistributedDataset Örneğin, bir oluşturabilir tf.distribute.DistributedIterator kullanarak iter üzerinde API. Açık bir yineleyici ile, sabit sayıda adım için yineleme yapabilirsiniz. Bir tf.distribute.DistributedIterator örnek dist_iterator sonraki elemanı almak için, next(dist_iterator) , dist_iterator.get_next() veya dist_iterator.get_next_as_optional() . İlk ikisi esasen aynıdır:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

tf.distribute.DistributedIterator.get_next() next() veya tf.distribute.DistributedIterator.get_next() , tf.distribute.DistributedIterator sonuna ulaştıysa, bir OutOfRange hatası atılır. İstemci, python tarafında hatayı yakalayabilir ve kontrol noktası belirleme ve değerlendirme gibi diğer işleri yapmaya devam edebilir. Ancak, aşağıdaki gibi görünen bir ana bilgisayar eğitim döngüsü kullanıyorsanız (yani, tf.function başına birden fazla adım çalıştırın) bu tf.function :

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn , adım gövdesini bir tf.range içerisine sararak birden çok adım içerir. Bu durumda, döngüdeki bağımlılık olmadan farklı yinelemeler paralel olarak başlayabilir, bu nedenle bir OutOfRange hatası, önceki yinelemelerin hesaplanması bitmeden önce sonraki yinelemelerde tetiklenebilir. Bir OutOfRange hatası atıldığında, işlevdeki tüm işlemler hemen sonlandırılacaktır. Bu, kaçınmak isteyeceğiniz bir durumsa, tf.distribute.DistributedIterator.get_next_as_optional() hatası vermeyen bir alternatif tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional döner bir tf.experimental.Optional bir sonraki elemanı ya da herhangi bir değeri içeren tf.distribute.DistributedIterator sona ulaşmıştır.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

element_spec özelliğini kullanma

Bir dağıtılmış bir veri kümesi unsurlarını geçerseniz tf.function ve istediğiniz tf.TypeSpec garantisi, Belirtebileceğiniz input_signature savını tf.function . Dağıtılmış bir veri kümesinin çıktısı, girdiyi tek bir cihaza veya birden çok cihaza temsil tf.distribute.DistributedValues . Bu dağıtılmış değere karşılık gelen tf.TypeSpec elde etmek için, dağıtılmış veri kümesinin veya dağıtılmış yineleyici nesnesinin element_spec özelliğini kullanabilirsiniz.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Kısmi Partiler

Kullanıcıların oluşturduğutf.data.Dataset örnekleri, çoğaltma sayısına eşit olarak bölünemeyen toplu iş boyutları içerebildiğinde veya veri kümesi örneğinin önem düzeyi toplu iş boyutuna bölünemediğinde kısmi partilerle karşılaşılır. Bu, veri kümesi birden fazla çoğaltmaya dağıtıldığında, bazı yineleyicilerdeki bir next çağrının bir OutOfRangeError ile sonuçlanacağı anlamına gelir. Bu kullanım örneğini işlemek için tf.distribute , tf.distribute başka veriye sahip olmayan eşlemelerde 0 toplu iş boyutunda sahte gruplar döndürür.

Tek işçi durumu için, veriler yineleyicide bir next çağrı tarafından döndürülmezse, 0 toplu iş boyutundaki sahte gruplar oluşturulur ve veri kümesindeki gerçek verilerle birlikte kullanılır. Kısmi partiler söz konusu olduğunda, son küresel veri grubu, sahte veri yığınlarının yanı sıra gerçek verileri içerecektir. Verilerin işlenmesi için durdurma koşulu artık kopyalardan herhangi birinin veriye sahip olup olmadığını kontrol eder. Yinelemelerin hiçbirinde veri yoksa, OutOfRange hatası atılır.

Çoklu çalışan durumu için, her bir çalışanla ilgili verilerin varlığını temsil eden boole değeri, çapraz çoğaltma iletişimi kullanılarak toplanır ve bu, tüm çalışanların dağıtılmış veri kümesini işlemeyi bitirip bitirmediğini belirlemek için kullanılır. Bu, çalışanlar arası iletişimi içerdiğinden, bazı performans cezaları söz konusudur.

Uyarılar

  • tf.distribute.Strategy.experimental_distribute_dataset API'lerini birden çok çalışan kurulumuyla kullanırken, kullanıcılar dosyalardan okuyan birtf.data.Dataset . tf.data.experimental.AutoShardPolicy , AUTO veya FILE olarak ayarlanmışsa, adım başına gerçek parti boyutu, kullanıcı tanımlı genel parti boyutundan daha küçük olabilir. Bu, dosyadaki kalan öğeler genel toplu iş boyutundan daha küçük olduğunda gerçekleşebilir. Kullanıcılar, çalıştırılacak adımların sayısına bağlı olmaksızın veri kümesini tf.data.experimental.AutoShardPolicy veya tf.data.experimental.AutoShardPolicy DATA olarak ayarlayarak tf.data.experimental.AutoShardPolicy etrafında çalışabilir.

  • Durum bilgisi olan veri kümesi dönüşümleri şu anda tf.distribute ile desteklenmemektedir ve veri kümesinin sahip olabileceği durum bilgisi olan tüm işlemler şu anda göz ardı edilmektedir. Örneğin, veri map_fn bir görüntüyü döndürmek için tf.random.uniform kullanan bir tf.random.uniform varsa, python işleminin yürütüldüğü yerel makinedeki duruma (yani rastgele çekirdeğe) bağlı olan bir veri kümesi grafiğiniz olur.

  • Varsayılan olarak devre dışı bırakılan deneysel tf.data.experimental.OptimizationOptions , belirli bağlamlarda - örneğin tf.distribute ile birlikte kullanıldığında - performans tf.distribute neden olabilir. Bunları yalnızca bir dağıtım ayarında iş yükünüzün performansına fayda sağladıklarını doğruladıktan sonra etkinleştirmelisiniz.

  • Genel olarak tf.data ile girdi ardışık tf.data nasıl optimize edeceğiniz konusunda lütfen bu kılavuza bakın. Birkaç ek ipucu:

    • Birden fazla çalışanınız varsa ve bir veya daha fazla glob tf.data.Dataset.list_files eşleşen tüm dosyalardan bir veri kümesi oluşturmak için tf.data.Dataset.list_files kullanıyorsanız, her çalışanın dosyayı tutarlı bir şekilde tf.data.Dataset.list_files için seed bağımsız değişkenini ayarlamayı veya shuffle=False ayarlamayı unutmayın.

    • Girdi ardışık düzeniniz hem verileri kayıt düzeyinde karıştırmayı hem de verileri ayrıştırmayı içeriyorsa, ayrıştırılmamış veriler ayrıştırılmış verilerden önemli ölçüde daha büyük olmadıkça (bu genellikle böyle değildir), aşağıdaki örnekte gösterildiği gibi önce karıştırın ve ardından ayrıştırın. Bu, bellek kullanımına ve performansına fayda sağlayabilir.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) buffer_size öğelerinin dahili bir tamponunu korur ve bu nedenle buffer_size azaltılması OOM sorununu buffer_size kaldırabilir.

  • tf.distribute.experimental_distribute_dataset veya tf.distribute.distribute_datasets_from_function kullanılırken verilerin çalışanlar tarafından tf.distribute.experimental_distribute_dataset tf.distribute.distribute_datasets_from_function garanti edilmez. Tahmini tf.distribute için tf.distribute kullanıyorsanız bu genellikle gereklidir. Bununla birlikte, toplu işteki her eleman için bir dizin ekleyebilir ve çıktıları buna göre sipariş edebilirsiniz. Aşağıdaki kod parçası, çıktıların nasıl sıralanacağına dair bir örnektir.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Kanonik bir tf.data.Dataset örneği kullanmıyorsam verilerimi nasıl dağıtabilirim?

Bazen kullanıcılar girişlerini temsil etmek için birtf.data.Dataset ve daha sonra veri kümesini birden çok cihaza dağıtmak için yukarıda belirtilen API'leri kullanamazlar. Bu gibi durumlarda ham tensörleri veya bir jeneratörden gelen girdileri kullanabilirsiniz.

Keyfi tensör girdileri için experimental_distribute_values_from_function kullanın

strategy.run , bir next(iterator) çıktısı olan tf.distribute.DistributedValues kabul eder. Tensör değerlerini geçirmek için, ham tensörlerden tf.distribute.DistributedValues oluşturmak için experimental_distribute_values_from_function işlevini kullanın.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Girişiniz bir jeneratörden geliyorsa tf.data.Dataset.from_generator kullanın

tf.data.Dataset istediğiniz bir oluşturucu işleviniz varsa, from_generator API'sini kullanarak birtf.data.Dataset örneği oluşturabilirsiniz.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)