Yardım Kaggle üzerinde TensorFlow ile Büyük Bariyer Resifi korumak Meydan Üyelik

Dağıtılmış Giriş

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

Tf.distribute API'ler kullanıcıların birden makinelere tek makineden eğitimlerini ölçeklemek için kolay bir yol sağlar. Modellerini ölçeklendirirken, kullanıcıların girdilerini birden fazla cihaza dağıtmaları da gerekir. tf.distribute API'leri otomatik cihazlar arasında girişinizi dağıtabildikleri kullanarak sağlar.

Bu kılavuz size dağıtılan veri kümesi kullanarak yineleyicinızı oluşturabileceğiniz farklı yolları gösterecektir tf.distribute API'leri. Ek olarak, aşağıdaki konular ele alınacaktır:

Bu kılavuz, Keras API'leri ile dağıtılmış girdi kullanımını kapsamaz.

Dağıtılmış Veri Kümeleri

Kullanmak için tf.distribute ölçeğine API'leri, kullanıcıların kullanmanız önerilir tf.data.Dataset onların girişini temsil etmek. tf.distribute ile verimli bir şekilde çalışması için yapılmış tf.data.Dataset (örneğin, her bir gaz cihazı üzerine verilerin otomatik önceden getirme) performans optimizasyonları düzenli uygulamaya dahil edilmiştir. Eğer başka bir şey kullanmak için bir kullanım söz konusuysa tf.data.Dataset , daha sonraki bakınız bölüm bu kılavuzda. Olmayan bir dağıtılan eğitim döngüde kullanıcılar ilk bir oluşturmak tf.data.Dataset örneği ve ardından yinelerler elemanları üzerinde. Örneğin:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Kullanıcıların kullanmasına izin vermek için tf.distribute bir dağıtacağı, iki API tanıtıldı bir kullanıcının mevcut koduna minimal değişikliklerle stratejisini tf.data.Dataset örneği ve bir veri kümesi nesnesi dağıtılan döner. Bir kullanıcı daha sonra bu dağıtılmış veri kümesi örneğini yineleyebilir ve modelini daha önce olduğu gibi eğitebilir. Şimdi iki API'ler bakmak edelim - tf.distribute.Strategy.experimental_distribute_dataset ve tf.distribute.Strategy.distribute_datasets_from_function daha ayrıntılı olarak:

tf.distribute.Strategy.experimental_distribute_dataset

kullanım

Bu API bir sürer tf.data.Dataset girdi olarak örneği ve bir döner tf.distribute.DistributedDataset örneği. Giriş veri kümesini, genel toplu iş boyutuna eşit bir değerle toplulaştırmanız gerekir. Bu global parti boyutu, tüm cihazlarda 1 adımda işlemek istediğiniz numune sayısıdır. Bir Pythonic moda bu dağıtılmış veri kümesini üzerinde yineleme veya kullanan bir yineleyici oluşturabilir iter . Geri amacı değildir tf.data.Dataset örneği ve dönüşümü ya da herhangi bir şekilde bir veri kümesi kontrol başka API'leri desteklemez. Girişinizi farklı replikalar üzerinden parçalamak istediğiniz belirli yöntemleriniz yoksa önerilen API budur.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Özellikler

yığınlama

tf.distribute giriş rebatches tf.data.Dataset senkronize kopyaları sayısına bölünmesiyle elde edilen küresel parti boyutuna eşittir yeni bir parti boyutu örneği. Eşitlemedeki kopyaların sayısı, eğitim sırasında gradyan allreduce'da yer alan cihazların sayısına eşittir. Bir kullanıcı çağırdığında next yineleyici bir veri kopyası başına parti boyutu, her yineleme döndürülür dağıtılmış ile. Yeniden toplu veri kümesi kardinalitesi her zaman kopya sayısının bir katı olacaktır. Burada bir çift örnek var:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Dağıtım olmadan:
    • Grup 1: [0, 1, 2, 3]
    • Grup 2: [4, 5]
    • 2 kopya üzerinden dağıtım ile. Son toplu iş ([4, 5]) 2 kopya arasında bölünür.

    • Parti 1:

      • Kopya 1:[0, 1]
      • Kopya 2:[2, 3]
    • 2. parti:

      • Kopya 2: [4]
      • Kopya 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Dağıtım olmadan:
    • Grup 1: [[0], [1], [2], [3]]
    • 5 kopyadan fazla dağıtım ile:
    • Parti 1:
      • Kopya 1: [0]
      • Kopya 2: [1]
      • Kopya 3: [2]
      • 4 kopya: [3]
      • Kopya 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Dağıtım olmadan:
    • Grup 1: [0, 1, 2, 3]
    • 2. Grup: [4, 5, 6, 7]
    • 3 kopya üzerinden dağıtım ile:
    • Parti 1:
      • Kopya 1: [0, 1]
      • Kopya 2: [2, 3]
      • Kopya 3: []
    • 2. parti:
      • Kopya 1: [4, 5]
      • Kopya 2: [6, 7]
      • Kopya 3: []

Veri kümesini yeniden gruplandırma, çoğaltma sayısıyla doğrusal olarak artan bir alan karmaşıklığına sahiptir. Bu, çok çalışanlı eğitim kullanım durumu için giriş hattının OOM hatalarıyla karşılaşabileceği anlamına gelir.

parçalama

tf.distribute da çoklu işçi eğitiminde autoshards girdi veri kümesini MultiWorkerMirroredStrategy ve TPUStrategy . Her veri kümesi, çalışanın CPU cihazında oluşturulur. (Sağ eğer her işçinin tüm veri kümesinin bir alt kümesi atandığını işçilerin araçlarının bir dizi üzerinde bir veri kümesi Autosharding tf.data.experimental.AutoShardPolicy ayarlanır). Bu, her adımda, her çalışan tarafından örtüşmeyen veri kümesi öğelerinin global bir toplu iş boyutunun işlenmesini sağlamak içindir. Autosharding kullanılarak belirtilebilir farklı birkaç seçenek vardır tf.data.experimental.DistributeOptions . Not multi işçi eğitiminde hiçbir autosharding olduğunu ParameterServerStrategy bulunabilir ve bu strateji ile veri kümesi oluşturma hakkında daha fazla bilgi Parametre Sunucu Strateji öğretici .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

İçin ayarlayabileceğini üç farklı seçenek vardır tf.data.experimental.AutoShardPolicy :

  • OTOMATİK: Bu, DOSYA tarafından parçalanmaya çalışılacağı anlamına gelen varsayılan seçenektir. Dosya tabanlı bir veri kümesi algılanmazsa, FILE tarafından parçalama girişimi başarısız olur. tf.distribute sonra DATA tarafından Sharding işlemine geçer. Girdi veri kümesi dosya tabanlı ancak dosya sayısı işçilerin sayısından az olması durumunda, bir o Not InvalidArgumentError arttırılacaktır. Böyle bir durumda, açıkça için politika belirlemek AutoShardPolicy.DATA veya dosya sayısı işçilerin sayısından daha büyük olacak şekilde küçük dosyalar halinde giriş kaynağını bölün.
  • DOSYA: Giriş dosyalarını tüm çalışanlar üzerinde parçalamak istiyorsanız bu seçenek budur. Girdi dosyalarının sayısı çalışan sayısından çok fazlaysa ve dosyalardaki veriler eşit olarak dağıtılmışsa bu seçeneği kullanmalısınız. Bu seçeneğin dezavantajı, dosyalardaki veriler eşit olarak dağıtılmamışsa boşta çalışanlara sahip olmaktır. Dosya sayısı işçilerin sayısından daha az ise, bir InvalidArgumentError arttırılacaktır. Böyle bir durumda, açıkça için politika belirlemek AutoShardPolicy.DATA . Örneğin 2 dosyayı 2 işçiye her biri 1 replika olacak şekilde dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5] içerir ve Dosya 2 [6, 7, 8, 9, 10, 11] içerir. Eşitlenen toplam çoğaltma sayısı 2 ve genel toplu iş boyutu 4 olsun.

    • Çalışan 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [2, 3]
    • Parti 3 = Kopya 1: [4]
    • Parti 4 = Kopya 1: [5]
    • İşçi 1:
    • Parti 1 = Kopya 2: [6, 7]
    • Parti 2 = Kopya 2: [8, 9]
    • Parti 3 = Kopya 2: [10]
    • Parti 4 = Kopya 2: [11]
  • VERİ: Bu, öğeleri tüm çalışanlar arasında otomatik olarak paylaşacaktır. Çalışanların her biri tüm veri kümesini okuyacak ve yalnızca kendisine atanan parçayı işleyecektir. Diğer tüm parçalar atılacak. Bu genellikle, girdi dosyalarının sayısı çalışan sayısından azsa ve verilerin tüm çalışanlar arasında daha iyi paylaşılmasını istiyorsanız kullanılır. Dezavantajı, tüm veri kümesinin her çalışanda okunacak olmasıdır. Örneğin 1 dosyayı 2 işçiye dağıtalım. Dosya 1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Senkronize edilen toplam replika sayısı 2 olsun.

    • Çalışan 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [4, 5]
    • Parti 3 = Kopya 1: [8, 9]
    • İşçi 1:
    • Parti 1 = Kopya 2: [2, 3]
    • Parti 2 = Kopya 2: [6, 7]
    • Parti 3 = Kopya 2: [10, 11]
  • KAPALI: Otomatik parçalamayı kapatırsanız, her çalışan tüm verileri işler. Örneğin 1 dosyayı 2 işçiye dağıtalım. Dosya 1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Eşitlenen toplam kopya sayısı 2 olsun. Ardından her çalışan aşağıdaki dağılımı görecektir:

    • Çalışan 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [2, 3]
    • Parti 3 = Kopya 1: [4, 5]
    • Parti 4 = Kopya 1: [6, 7]
    • Parti 5 = Kopya 1: [8, 9]
    • Parti 6 = Kopya 1: [10, 11]

    • İşçi 1:

    • Parti 1 = Kopya 2: [0, 1]

    • Parti 2 = Kopya 2: [2, 3]

    • Parti 3 = Kopya 2: [4, 5]

    • Parti 4 = Kopya 2: [6, 7]

    • Parti 5 = Kopya 2: [8, 9]

    • Parti 6 = Kopya 2: [10, 11]

ön yükleme

Varsayılan olarak, tf.distribute kullanıcının sonunda bir önceden getirme dönüşümü sağlanan ekler tf.data.Dataset örneği. Bir önceden getirme dönüşüme bağımsız değişken buffer_size senkronize yinelemeler sayısına eşittir.

tf.distribute.Strategy.distribute_datasets_from_function

kullanım

Bu API bir giriş işlevi alır ve bir döner tf.distribute.DistributedDataset örneği. Kullanıcıların geçmek olduğunu girdi işlevi vardır tf.distribute.InputContext argüman ve dönmelidir tf.data.Dataset örneği. Bu API ile tf.distribute kullanıcının üzerinde başka değişiklik yapmaz tf.data.Dataset giriş işlevinden döndürülen örneği. Veri kümesini toplu hale getirmek ve parçalamak kullanıcının sorumluluğundadır. tf.distribute işçi her işlemci cihazında giriş işlevi çağırır. Apart kullanıcıların kendi beton ve Kırma işlemi mantığını belirtmek için izin dışında, bu API ayrıca oranla daha iyi ölçeklenebilirlik ve performans gösteriyor tf.distribute.Strategy.experimental_distribute_dataset çok işçi eğitimi için kullanıldığında.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Özellikler

yığınlama

tf.data.Dataset giriş fonksiyonunun dönüş değeri olan örnek başına çoğaltma toplu boyutu kullanılarak harmanlanmış edilmelidir. Çoğaltma başına toplu iş boyutu, genel toplu iş boyutunun eşitleme eğitiminde yer alan çoğaltma sayısına bölümüdür. Bunun nedeni, tf.distribute işçi her işlemci cihazında giriş işlevi çağırır. Belirli bir çalışan üzerinde oluşturulan veri kümesi, o çalışandaki tüm kopyalar tarafından kullanıma hazır olmalıdır.

parçalama

tf.distribute.InputContext örtülü kullanıcının giriş işlevine argüman olarak geçirilen nesne tarafından oluşturulan tf.distribute kaputun altında. Bu işçilerin sayısı, geçerli işçi id vs. parçası olan bu özellikleri kullanarak kullanıcı tarafından ayarlanan politikalar gereği Sharding işleyebilir Bu giriş fonksiyonu hakkında bilgi sahibi tf.distribute.InputContext nesne.

ön yükleme

tf.distribute sonunda önceden getirme dönüşüm eklemez tf.data.Dataset kullanıcı tarafından sağlanan giriş fonksiyonu tarafından geri döndü.

Dağıtılmış Yineleyiciler

Olmayan dağıtılmış benzer tf.data.Dataset durumlarda, size bir yineleyici oluşturmanız gerekecektir tf.distribute.DistributedDataset o ve erişim üzerinde öğeleri yineleme yapmak örneklerini tf.distribute.DistributedDataset . Aşağıdaki bir oluşturabileceğiniz yollardır tf.distribute.DistributedIterator modelinizi yetiştirmek ve kullanmak:

Kullanımlar

Döngü yapısı için bir Pythonic kullanın

Sen yineleme yapmak için bir kullanıcı dostu Pythonic döngü kullanabilirsiniz tf.distribute.DistributedDataset . Dönen elemanlar tf.distribute.DistributedIterator tek olabilir tf.Tensor veya tf.distribute.DistributedValues yineleme başına bir değer içerir. Bir iç döngü Yerleştirme tf.function bir performans artışı sağlayacaktır. Ancak, break ve return henüz üzerinden bir döngü için desteklenmez tf.distribute.DistributedDataset bir iç yerleştirilir tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Kullanım iter açık bir yineleyici oluşturmak için

Yineleme yapmak için bir elemanların üzerinde tf.distribute.DistributedDataset Örneğin, bir oluşturabilir tf.distribute.DistributedIterator kullanarak iter üzerinde API. Açık bir yineleyiciyle, sabit sayıda adım için yineleme yapabilirsiniz. Bir sonraki elemanı almak için tf.distribute.DistributedIterator örneği dist_iterator , Arayabileceğin next(dist_iterator) , dist_iterator.get_next() veya dist_iterator.get_next_as_optional() . İlk ikisi temelde aynıdır:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

İle next() veya tf.distribute.DistributedIterator.get_next() eğer tf.distribute.DistributedIterator sonuna geldi, bir outofrange hatası atılır. İstemci python tarafında hatayı yakalayabilir ve kontrol noktası ve değerlendirme gibi diğer işleri yapmaya devam edebilir. Ancak, bir ana eğitim döngü kullanıyorsanız, bu olmaz eser (yani birden fazla adım koşmak tf.function gibi hangi görünüyor,):

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn bir adım içinde vücudu sarma birden çok adımları içerir tf.range . Bu durumda, döngüde bağımlılık olmadan farklı yinelemeler paralel olarak başlayabilir, bu nedenle önceki yinelemelerin hesaplanması bitmeden sonraki yinelemelerde bir OutOfRange hatası tetiklenebilir. Bir OutOfRange hatası atıldığında, fonksiyondaki tüm işlemler hemen sonlandırılacaktır. Bu kaçınmak istiyorum bazı durumda, bir outofrange hata atmak değil bir alternatiftir tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional döner bir tf.experimental.Optional bir sonraki elemanı ya da herhangi bir değeri içeren tf.distribute.DistributedIterator sona ulaşmıştır.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Kullanılması element_spec özelliği

Bir dağıtılmış bir veri kümesi unsurlarını geçerseniz tf.function ve istediğiniz tf.TypeSpec garantisi, Belirtebileceğiniz input_signature savını tf.function . Bir dağıtılmış veri kümesi çıktısı tf.distribute.DistributedValues tek bir cihaz ya da birden fazla cihaz giriş temsil edebilir. Almak için tf.TypeSpec kullanabilirsiniz bu dağıtılmış değerine karşılık gelen element_spec dağıtılan veri kümesi özelliğini veya dağıtılmış yineleyici nesne.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Kısmi Partiler

Zaman kısmi toplu karşılaşılan tf.data.Dataset örnekleri kullanıcı kopyaları sayısı veya veri kümesi örneğinin önem düzeyi parti büyüklüğü ile bölünebilir değilken bölünebilir değildir ve miktar içerebilir yarattığını. Veri kümesi birden kopyaları dağılmış olduğunda bu araçlar, bu next bazı Yineleyicilerin üzerinde çağrısı OutOfRangeError sonuçlanacaktır. Bu kullanım durumda işlemek için tf.distribute dönüş süreci için herhangi bir daha fazla veri yok kopyaları üzerinde toplu boyutu 0 toplu Sersem.

Veri ile iade edilmemesi durumunda tek işçi durumda, next iterasyon çağrıya, 0 seri boyutundaki kukla toplu oluşturulur ve veri kümesindeki gerçek veriler ile birlikte kullanılır. Kısmi gruplar durumunda, son global veri grubu, sahte veri gruplarının yanı sıra gerçek verileri içerecektir. Verileri işlemek için durma koşulu artık kopyalardan herhangi birinin veriye sahip olup olmadığını kontrol eder. Replikaların hiçbirinde veri yoksa OutOfRange hatası verilir.

Çoklu çalışan durumu için, çalışanların her birine ilişkin verilerin varlığını temsil eden boole değeri, çapraz çoğaltma iletişimi kullanılarak toplanır ve bu, tüm çalışanların dağıtılmış veri kümesini işlemeyi bitirip bitirmediğini belirlemek için kullanılır. Bu, çalışanlar arası iletişimi içerdiğinden, ilgili bazı performans cezaları vardır.

uyarılar

  • Kullanırken tf.distribute.Strategy.experimental_distribute_dataset çoklu işçi kurulum ile API'leri, kullanıcılar geçmesi tf.data.Dataset dosyalarından okur. Eğer tf.data.experimental.AutoShardPolicy ayarlanır AUTO veya FILE , aşama parti büyüklüğü başına gerçek kullanıcı tanımlı küresel parti boyutu daha küçük olabilir. Bu, dosyadaki kalan öğeler genel toplu iş boyutundan küçük olduğunda gerçekleşebilir. Kullanıcılar ya çalıştırmak veya set adımlar sayısına bağlı olmaksızın veri kümesi tüketebileceği tf.data.experimental.AutoShardPolicy için DATA etrafında işe.

  • Duruma veri kümesi dönüşümleri şu anda desteklenmez tf.distribute ve veri kümesi şu anda yok sayılır sahip olabileceği herhangi bir durum bilgisi op. Senin Veri kümesi varsa Örneğin, map_fn kullandığı bu tf.random.uniform bir görüntüyü döndürmek için, o zaman piton işlemi yürütülmektedir yerel makinede (rastgele tohum yani) durumuna bağlı olan bir veri kümesi grafiğini var.

  • Deneysel tf.data.experimental.OptimizationOptions gibi ile birlikte kullanıldığı zaman olduğu gibi - bazı bağlamlarda varsayılan kutu tarafından devre dışı tf.distribute - bir performans düşüşüne neden. Bunları yalnızca bir dağıtma ayarında iş yükünüzün performansından yararlandıklarını doğruladıktan sonra etkinleştirmelisiniz.

  • Bakınız bu kılavuzun ile giriş boru hattı iyileştirme konusunda tf.data genelde. Birkaç ek ipucu:

    • Birden işçi var ve kullanıyorsanız tf.data.Dataset.list_files bir veya daha fazla glob desen eşleştirme tüm dosyalardan bir veri kümesi oluşturmak için, ayarlamayı unutmayın seed argüman veya set shuffle=False her işçinin sürekli dosyayı shard şekilde yerleştirin.

    • Giriş işlem hattınız hem verileri kayıt düzeyinde karıştırmayı hem de verileri ayrıştırmayı içeriyorsa, ayrıştırılmamış veriler ayrıştırılmış verilerden önemli ölçüde daha büyük değilse (ki bu genellikle böyle değildir), aşağıdaki örnekte gösterildiği gibi önce karıştırın ve ardından ayrıştırın. Bu, bellek kullanımına ve performansına fayda sağlayabilir.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) bir iç tampon muhafaza buffer_size elemanları ve böylece indirgeyici buffer_size OOM sorunu aleviate olabilir.

  • Kullanıldığında sipariş ettiği veri işçiler tarafından işlenir tf.distribute.experimental_distribute_dataset veya tf.distribute.distribute_datasets_from_function garanti edilmez. Eğer kullanıyorsanız bu tipik gereklidir tf.distribute ölçek öngörüsüne. Bununla birlikte, partideki her öğe için bir dizin ekleyebilir ve çıktıları buna göre sipariş edebilirsiniz. Aşağıdaki kod parçası, çıktıların nasıl sipariş edileceğine bir örnektir.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Kurallı bir tf.data.Dataset örneği kullanmıyorsam verilerimi nasıl dağıtırım?

Bazen kullanıcılar kullanamaz tf.data.Dataset onların girişini temsil etmek ve daha sonra yukarıda bahsedilen API'ler birden fazla cihaza veri kümesi dağıtmak. Bu gibi durumlarda ham tensörleri veya bir jeneratörden gelen girdileri kullanabilirsiniz.

Rastgele tensör girişleri için deneysel_distribute_values_from_function kullanın

strategy.run kabul tf.distribute.DistributedValues çıkışı olan next(iterator) . Tensör değerleri geçirmek için, kullanımı experimental_distribute_values_from_function yapı için tf.distribute.DistributedValues ham tansörleri den.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Girişiniz bir jeneratörden geliyorsa tf.data.Dataset.from_generator kullanın

Kullanmak istediğiniz bir jeneratör işlevi varsa, bir oluşturabilir tf.data.Dataset kullanarak örneğini from_generator API.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)