Google I/O bir tamamlamadır! TensorFlow oturumlarını takip edin Oturumları görüntüleyin

Dağıtılmış Giriş

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

tf.distribute API'leri, kullanıcıların eğitimlerini tek bir makineden birden çok makineye ölçeklendirmeleri için kolay bir yol sağlar. Modellerini ölçeklendirirken, kullanıcıların girdilerini birden fazla cihaza dağıtmaları da gerekir. tf.distribute , girdilerinizi cihazlar arasında otomatik olarak dağıtabileceğiniz API'ler sağlar.

Bu kılavuz size, tf.distribute API'lerini kullanarak dağıtılmış veri kümesi ve yineleyiciler oluşturmanın farklı yollarını gösterecektir. Ek olarak, aşağıdaki konular ele alınacaktır:

Bu kılavuz, Keras API'leri ile dağıtılmış girdi kullanımını kapsamaz.

Dağıtılmış Veri Kümeleri

Ölçeklendirmek üzere tf.distribute API'lerini kullanmak için, kullanıcıların girdilerini temsil etmek üzere tf.data.Dataset kullanmaları önerilir. tf.distribute , tf.data.Dataset ile verimli bir şekilde çalışacak şekilde yapılmıştır (örneğin, verilerin her hızlandırıcı cihaza otomatik olarak önceden getirilmesi) ve performans optimizasyonları uygulamaya düzenli olarak dahil edilmiştir. tf.data.Dataset dışında bir şey kullanmak için bir kullanım durumunuz varsa, lütfen bu kılavuzun sonraki bölümüne bakın. Dağıtılmış olmayan bir eğitim döngüsünde, kullanıcılar önce bir tf.data.Dataset örneği oluşturur ve ardından öğeler üzerinde yinelenir. Örneğin:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
yer tutucu2 l10n-yer
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Kullanıcıların, bir kullanıcının mevcut kodunda minimum değişiklikle tf.distribute stratejisini kullanmasına izin vermek için, bir tf.data.Dataset örneğini dağıtacak ve dağıtılmış bir veri kümesi nesnesi döndürecek iki API tanıtıldı. Bir kullanıcı daha sonra bu dağıtılmış veri kümesi örneğini yineleyebilir ve modelini daha önce olduğu gibi eğitebilir. Şimdi iki API'ye bakalım - tf.distribute.Strategy.experimental_distribute_dataset ve tf.distribute.Strategy.distribute_datasets_from_function daha ayrıntılı olarak:

tf.distribute.Strategy.experimental_distribute_dataset

kullanım

Bu API, girdi olarak bir tf.data.Dataset örneği alır ve bir tf.distribute.DistributedDataset örneği döndürür. Girdi veri kümesini, genel toplu iş boyutuna eşit bir değerle toplulaştırmanız gerekir. Bu global parti boyutu, tüm cihazlarda 1 adımda işlemek istediğiniz numune sayısıdır. Bu dağıtılmış veri kümesi üzerinde Pythonic bir şekilde yinelenebilir veya iter kullanarak bir yineleyici oluşturabilirsiniz. Döndürülen nesne bir tf.data.Dataset örneği değildir ve veri kümesini herhangi bir şekilde dönüştüren veya inceleyen diğer API'leri desteklemez. Girişinizi farklı replikalar üzerinden parçalamak istediğiniz belirli yöntemleriniz yoksa önerilen API budur.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
tutucu5 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

Özellikler

yığınlama

tf.distribute , giriş tf.data.Dataset örneğini, küresel toplu iş boyutunun eşitlenen çoğaltma sayısına bölünmesine eşit olan yeni bir toplu iş boyutuyla yeniden toplulaştırır. Eşitlemedeki replikaların sayısı, eğitim sırasında gradyan allreduce'da yer alan cihazların sayısına eşittir. Bir kullanıcı dağıtılmış yineleyiciyi çağırdığında, next yinelemede yineleme başına toplu veri boyutu döndürülür. Yeniden toplu veri kümesi kardinalitesi her zaman kopya sayısının bir katı olacaktır. Burada bir çift örnek var:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Dağıtım olmadan:
    • Grup 1: [0, 1, 2, 3]
    • Grup 2: [4, 5]
    • 2 kopya üzerinden dağıtım ile. Son toplu iş ([4, 5]) 2 kopya arasında bölünür.

    • Parti 1:

      • Kopya 1:[0, 1]
      • Kopya 2:[2, 3]
    • 2. parti:

      • Kopya 2: [4]
      • Kopya 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Dağıtım olmadan:
    • Grup 1: [[0], [1], [2], [3]]
    • 5 kopyadan fazla dağıtım ile:
    • Parti 1:
      • Kopya 1: [0]
      • Kopya 2: [1]
      • Kopya 3: [2]
      • 4 kopya: [3]
      • Kopya 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Dağıtım olmadan:
    • Grup 1: [0, 1, 2, 3]
    • 2. Grup: [4, 5, 6, 7]
    • 3 kopya üzerinden dağıtım ile:
    • Parti 1:
      • Kopya 1: [0, 1]
      • Kopya 2: [2, 3]
      • Kopya 3: []
    • 2. parti:
      • Kopya 1: [4, 5]
      • Kopya 2: [6, 7]
      • Kopya 3: []

Veri kümesini yeniden gruplandırma, çoğaltma sayısıyla doğrusal olarak artan bir alan karmaşıklığına sahiptir. Bu, çok çalışanlı eğitim kullanım durumu için giriş hattının OOM hatalarıyla karşılaşabileceği anlamına gelir.

parçalama

tf.distribute ayrıca MultiWorkerMirroredStrategy ve TPUStrategy ile çoklu çalışan eğitiminde girdi veri kümesini otomatik olarak parçalar. Her veri kümesi, çalışanın CPU cihazında oluşturulur. Bir veri kümesini bir çalışan kümesi üzerinde otomatik olarak paylaşmak, her çalışana tüm veri kümesinin bir alt kümesinin atandığı anlamına gelir (doğru tf.data.experimental.AutoShardPolicy ayarlanmışsa). Bu, her adımda, her çalışan tarafından örtüşmeyen veri kümesi öğelerinin global bir toplu iş boyutunun işlenmesini sağlamak içindir. Otomatik parçalama, tf.data.experimental.DistributeOptions kullanılarak belirtilebilecek birkaç farklı seçeneğe sahiptir. ParameterServerStrategy ile çok çalışanlı eğitimde otomatik parçalama olmadığını ve bu stratejiyle veri kümesi oluşturma hakkında daha fazla bilgiyi Parametre Sunucusu Stratejisi eğitiminde bulunabileceğini unutmayın.

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

tf.data.experimental.AutoShardPolicy için ayarlayabileceğiniz üç farklı seçenek vardır:

  • OTOMATİK: Bu, DOSYA tarafından parçalanmaya çalışılacağı anlamına gelen varsayılan seçenektir. Dosya tabanlı bir veri kümesi algılanmazsa, FILE tarafından parçalama girişimi başarısız olur. tf.distribute daha sonra DATA tarafından parçalanmaya geri dönecektir. Girdi veri kümesi dosya tabanlıysa ancak dosya sayısı çalışan sayısından azsa, bir InvalidArgumentError oluşturulacağını unutmayın. Böyle bir durumda, politikayı açıkça AutoShardPolicy.DATA olarak ayarlayın veya girdi kaynağınızı, dosya sayısı çalışan sayısından fazla olacak şekilde daha küçük dosyalara bölün.
  • DOSYA: Girdi dosyalarını tüm çalışanlar üzerinde parçalamak istiyorsanız bu seçenek budur. Girdi dosyalarının sayısı çalışan sayısından çok fazlaysa ve dosyalardaki veriler eşit olarak dağıtılmışsa bu seçeneği kullanmalısınız. Bu seçeneğin dezavantajı, dosyalardaki veriler eşit olarak dağıtılmamışsa boşta çalışanlara sahip olmaktır. Dosya sayısı çalışan sayısından azsa, bir InvalidArgumentError ortaya çıkar. Bu olursa, politikayı açıkça AutoShardPolicy.DATA olarak ayarlayın. Örneğin 2 dosyayı 2 işçiye her biri 1 replika olacak şekilde dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5] içerir ve Dosya 2 [6, 7, 8, 9, 10, 11] içerir. Eşitlenen toplam çoğaltma sayısı 2 ve genel toplu iş boyutu 4 olsun.

    • Çalışan 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [2, 3]
    • Parti 3 = Kopya 1: [4]
    • Parti 4 = Kopya 1: [5]
    • işçi 1:
    • Parti 1 = Kopya 2: [6, 7]
    • Parti 2 = Kopya 2: [8, 9]
    • Parti 3 = Kopya 2: [10]
    • Parti 4 = Kopya 2: [11]
  • VERİ: Bu, öğeleri tüm çalışanlar arasında otomatik olarak paylaşacaktır. Çalışanların her biri tüm veri kümesini okuyacak ve yalnızca kendisine atanan parçayı işleyecektir. Diğer tüm parçalar atılacak. Bu genellikle, girdi dosyalarının sayısı çalışan sayısından azsa ve verilerin tüm çalışanlar arasında daha iyi paylaşılmasını istiyorsanız kullanılır. Dezavantajı, tüm veri kümesinin her çalışanda okunacak olmasıdır. Örneğin 1 dosyayı 2 işçiye dağıtalım. Dosya 1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Senkronize edilen toplam replika sayısı 2 olsun.

    • Çalışan 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [4, 5]
    • Parti 3 = Kopya 1: [8, 9]
    • işçi 1:
    • Parti 1 = Kopya 2: [2, 3]
    • Parti 2 = Kopya 2: [6, 7]
    • Parti 3 = Kopya 2: [10, 11]
  • KAPALI: Otomatik parçalamayı kapatırsanız, her çalışan tüm verileri işler. Örneğin 1 dosyayı 2 işçiye dağıtalım. Dosya 1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Senkronize edilen toplam replika sayısı 2 olsun. Ardından her çalışan aşağıdaki dağılımı görecektir:

    • Çalışan 0:
    • Parti 1 = Kopya 1: [0, 1]
    • Parti 2 = Kopya 1: [2, 3]
    • Parti 3 = Kopya 1: [4, 5]
    • Parti 4 = Kopya 1: [6, 7]
    • Parti 5 = Kopya 1: [8, 9]
    • Parti 6 = Kopya 1: [10, 11]

    • işçi 1:

    • Parti 1 = Kopya 2: [0, 1]

    • Parti 2 = Kopya 2: [2, 3]

    • Parti 3 = Kopya 2: [4, 5]

    • Parti 4 = Kopya 2: [6, 7]

    • Parti 5 = Kopya 2: [8, 9]

    • Parti 6 = Kopya 2: [10, 11]

ön yükleme

Varsayılan olarak, tf.distribute , kullanıcı tarafından sağlanan tf.data.Dataset örneğinin sonuna bir önceden getirme dönüşümü ekler. buffer_size olan önceden getirme dönüştürmesinin bağımsız değişkeni, eşitlemedeki kopyaların sayısına eşittir.

tf.distribute.Strategy.distribute_datasets_from_function

kullanım

Bu API, bir girdi işlevi alır ve bir tf.distribute.DistributedDataset örneği döndürür. Kullanıcıların ilettiği giriş işlevi bir tf.distribute.InputContext bağımsız değişkenine sahiptir ve bir tf.data.Dataset örneği döndürmelidir. Bu API ile tf.distribute , kullanıcının giriş işlevinden döndürülen tf.data.Dataset örneğinde başka bir değişiklik yapmaz. Veri kümesini toplu hale getirmek ve parçalamak kullanıcının sorumluluğundadır. tf.distribute , çalışanların her birinin CPU aygıtındaki giriş işlevini çağırır. Bu API, kullanıcıların kendi toplu işleme ve parçalama mantığını belirlemelerine izin vermenin yanı sıra, çok çalışanlı eğitim için kullanıldığında tf.distribute.Strategy.experimental_distribute_dataset ile karşılaştırıldığında daha iyi ölçeklenebilirlik ve performans gösterir.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
tutucu8 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Özellikler

yığınlama

Giriş işlevinin dönüş değeri olan tf.data.Dataset örneği, çoğaltma başına toplu iş boyutu kullanılarak toplu hale getirilmelidir. Çoğaltma başına toplu iş boyutu, genel toplu iş boyutunun eşitleme eğitiminde yer alan çoğaltma sayısına bölümüdür. Bunun nedeni, tf.distribute her bir çalışanın CPU aygıtındaki giriş işlevini çağırmasıdır. Belirli bir çalışan üzerinde oluşturulan veri kümesi, o çalışandaki tüm kopyalar tarafından kullanıma hazır olmalıdır.

parçalama

Kullanıcının giriş işlevine dolaylı olarak argüman olarak iletilen tf.distribute.InputContext nesnesi, başlık altında tf.distribute tarafından oluşturulur. Çalışan sayısı, geçerli çalışan kimliği vb. hakkında bilgilere sahiptir. Bu giriş işlevi, tf.distribute.InputContext nesnesinin parçası olan bu özellikleri kullanarak kullanıcı tarafından belirlenen ilkelere göre parçalamayı işleyebilir.

ön yükleme

tf.distribute , kullanıcı tarafından sağlanan giriş işlevi tarafından döndürülen tf.data.Dataset sonuna bir önceden getirme dönüşümü eklemez.

Dağıtılmış Yineleyiciler

Dağıtılmamış tf.data.Dataset örneklerine benzer şekilde, üzerinde yineleme yapmak ve tf.distribute.DistributedDataset içindeki öğelere erişmek için tf.distribute.DistributedDataset örneklerinde bir yineleyici oluşturmanız gerekir. Aşağıdakiler, bir tf.distribute.DistributedIterator oluşturabileceğiniz ve modelinizi eğitmek için kullanabileceğiniz yöntemlerdir:

kullanımlar

Döngü yapısı için bir Pythonic kullanın

tf.distribute.DistributedDataset üzerinde yineleme yapmak için kullanıcı dostu bir Pythonic döngüsü kullanabilirsiniz. tf.distribute.DistributedIterator öğesinden döndürülen öğeler, tek bir tf.Tensor veya kopya başına bir değer içeren bir tf.distribute.DistributedValues olabilir. Döngüyü bir tf.function içine yerleştirmek bir performans artışı sağlayacaktır. Ancak, break ve return şu anda bir tf.function içine yerleştirilmiş bir tf.distribute.DistributedDataset üzerindeki bir döngü için desteklenmemektedir.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
tutucu10 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Açık bir iter oluşturmak için yineleyiciyi kullanın

Bir tf.distribute.DistributedDataset örneğindeki öğeleri yinelemek için, üzerindeki yineleme iter kullanarak bir tf.distribute.DistributedIterator oluşturabilirsiniz. Açık bir yineleyiciyle, sabit sayıda adım için yineleme yapabilirsiniz. Bir tf.distribute.DistributedIterator örneğinden bir sonraki öğeyi almak için dist_iterator , next(dist_iterator) , dist_iterator.get_next() veya dist_iterator.get_next_as_optional() . İlk ikisi temelde aynıdır:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
tutucu12 l10n-yer
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

next() veya tf.distribute.DistributedIterator.get_next() ile, tf.distribute.DistributedIterator sonuna ulaştıysa, bir OutOfRange hatası atılır. İstemci python tarafında hatayı yakalayabilir ve kontrol noktası ve değerlendirme gibi diğer işleri yapmaya devam edebilir. Ancak, şuna benzeyen bir ana bilgisayar eğitim döngüsü kullanıyorsanız (yani, tf.function başına birden çok adım çalıştırın) bu çalışmaz:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn , adım gövdesini bir tf.range içine sararak birden çok adım içerir. Bu durumda, döngüde bağımlılık olmadan farklı yinelemeler paralel olarak başlayabilir, bu nedenle önceki yinelemelerin hesaplanması bitmeden sonraki yinelemelerde bir OutOfRange hatası tetiklenebilir. Bir OutOfRange hatası atıldığında, fonksiyondaki tüm işlemler hemen sonlandırılacaktır. Bu, kaçınmak istediğiniz bir durumsa, OutOfRange hatası oluşturmayan bir alternatif tf.distribute.DistributedIterator.get_next_as_optional() 'dır. get_next_as_optional , sonraki öğeyi içeren veya tf.distribute.DistributedIterator sona ulaştıysa hiçbir değeri içermeyen bir tf.experimental.Optional döndürür.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
tutucu15 l10n-yer
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

element_spec özelliğini kullanma

Dağıtılmış bir veri kümesinin öğelerini bir tf.function ve bir tf.TypeSpec garantisi istiyorsanız, tf.function input_signature bağımsız değişkenini belirtebilirsiniz. Dağıtılmış bir veri kümesinin çıktısı tf.distribute.DistributedValues ve bu, girişi tek bir cihaza veya birden çok cihaza gösterebilir. Bu dağıtılmış değere karşılık gelen tf.TypeSpec almak için dağıtılmış veri kümesinin veya dağıtılmış yineleyici nesnesinin element_spec özelliğini kullanabilirsiniz.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
tutucu17 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Kısmi Partiler

Kullanıcıların oluşturduğu tf.data.Dataset örnekleri, çoğaltma sayısına eşit olarak bölünemeyen toplu iş boyutları içerebildiğinde veya veri kümesi örneğinin önemliliği toplu iş boyutuna bölünemediğinde kısmi toplu işlerle karşılaşılır. Bu, veri kümesi birden çok kopyaya dağıtıldığında, bazı yineleyicilerdeki next çağrının bir OutOfRangeError ile sonuçlanacağı anlamına gelir. Bu kullanım durumunu ele almak için tf.distribute , işlenecek daha fazla verisi olmayan replikalarda 0 toplu iş boyutundaki sahte toplu grupları döndürür.

Tek çalışan durumu için, yineleyicideki bir next çağrı tarafından veriler döndürülmezse, 0 toplu iş boyutunda sahte partiler oluşturulur ve veri kümesindeki gerçek verilerle birlikte kullanılır. Kısmi gruplar durumunda, son küresel veri grubu, sahte veri gruplarının yanı sıra gerçek verileri içerecektir. Verileri işlemek için durma koşulu artık kopyalardan herhangi birinin veriye sahip olup olmadığını kontrol eder. Replikaların hiçbirinde veri yoksa OutOfRange hatası verilir.

Çoklu çalışan durumu için, çalışanların her birine ilişkin verilerin varlığını temsil eden boole değeri, çapraz çoğaltma iletişimi kullanılarak toplanır ve bu, tüm çalışanların dağıtılmış veri kümesini işlemeyi bitirip bitirmediğini belirlemek için kullanılır. Bu, çalışanlar arası iletişimi içerdiğinden, ilgili bazı performans cezaları vardır.

uyarılar

  • Birden çok çalışan kurulumuyla tf.distribute.Strategy.experimental_distribute_dataset API'lerini kullanırken, kullanıcılar dosyalardan okuyan bir tf.data.Dataset . tf.data.experimental.AutoShardPolicy , AUTO veya FILE olarak ayarlanırsa, adım başına gerçek toplu iş boyutu, kullanıcı tanımlı genel toplu iş boyutundan daha küçük olabilir. Bu, dosyadaki kalan öğeler genel toplu iş boyutundan küçük olduğunda gerçekleşebilir. Kullanıcılar, çalıştırılacak adım sayısına bağlı olmadan veri kümesini tüketebilir veya tf.data.experimental.AutoShardPolicy , üzerinde çalışmak için DATA olarak ayarlayabilir.

  • Durum bilgisi olan veri kümesi dönüşümleri şu anda tf.distribute ile desteklenmemektedir ve veri kümesinin sahip olabileceği durum bilgisi olan işlemler şu anda yok sayılmaktadır. Örneğin, veri kümenizde bir görüntüyü döndürmek için map_fn kullanan bir tf.random.uniform varsa, python işleminin yürütüldüğü yerel makinedeki duruma (yani rastgele tohuma) bağlı bir veri kümesi grafiğiniz olur.

  • Varsayılan olarak devre dışı bırakılan deneysel tf.data.experimental.OptimizationOptions belirli bağlamlarda -- tf.distribute ile birlikte kullanıldığında olduğu gibi -- performans düşüşüne neden olabilir. Bunları yalnızca bir dağıtım ayarında iş yükünüzün performansından yararlandıklarını doğruladıktan sonra etkinleştirmelisiniz.

  • Giriş işlem hattınızı genel olarak tf.data ile nasıl optimize edeceğinizi öğrenmek için lütfen bu kılavuza bakın. Birkaç ek ipucu:

    • Birden fazla çalışanınız varsa ve bir veya daha fazla glob modeliyle eşleşen tüm dosyalardan bir veri kümesi oluşturmak için tf.data.Dataset.list_files kullanıyorsanız, her çalışanın dosyayı tutarlı bir şekilde parçalaması için seed bağımsız değişkenini veya shuffle=False değerini ayarlamayı unutmayın.

    • Giriş işlem hattınız hem verileri kayıt düzeyinde karıştırmayı hem de verileri ayrıştırmayı içeriyorsa, ayrıştırılmamış veriler ayrıştırılan verilerden önemli ölçüde büyük değilse (ki bu genellikle böyle değildir), aşağıdaki örnekte gösterildiği gibi önce karıştırın ve ardından ayrıştırın. Bu, bellek kullanımına ve performansına fayda sağlayabilir.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) buffer_size öğelerinin dahili bir arabelleğini korur ve bu nedenle buffer_size azaltmak OOM sorununu alevlendirebilir.

  • tf.distribute.experimental_distribute_dataset veya tf.distribute.distribute_datasets_from_function kullanılırken verilerin işçiler tarafından işlenme sırası garanti edilmez. Tahmini ölçeklendirmek için tf.distribute kullanıyorsanız bu genellikle gereklidir. Bununla birlikte, partideki her öğe için bir dizin ekleyebilir ve çıktıları buna göre sipariş edebilirsiniz. Aşağıdaki kod parçası, çıktıların nasıl sipariş edileceğine bir örnektir.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
tutucu20 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

Kurallı bir tf.data.Dataset örneği kullanmıyorsam verilerimi nasıl dağıtırım?

Bazen kullanıcılar girdilerini temsil etmek için bir tf.data.Dataset kullanamazlar ve ardından veri setini birden çok cihaza dağıtmak için yukarıda belirtilen API'leri kullanamazlar. Bu gibi durumlarda ham tensörleri veya bir jeneratörden gelen girdileri kullanabilirsiniz.

Rastgele tensör girişleri için deneysel_distribute_values_from_function kullanın

strategy.run next(iterator) çıktısı olan tf.distribute.DistributedValues kabul eder. Tensör değerlerini iletmek için, ham tensörlerden tf.distribute.DistributedValues oluşturmak için experimental_distribute_values_from_function kullanın.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
tutucu22 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Girişiniz bir jeneratörden geliyorsa tf.data.Dataset.from_generator kullanın

Kullanmak istediğiniz bir oluşturucu işleviniz varsa, from_generator API'sini kullanarak bir tf.data.Dataset örneği oluşturabilirsiniz.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
tutucu24 l10n-yer
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.