Google I/O bir tamamlamadır! TensorFlow oturumlarını takip edin Oturumları görüntüleyin

tf.data API ile daha iyi performans

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

GPU'lar ve TPU'lar, tek bir eğitim adımını yürütmek için gereken süreyi önemli ölçüde azaltabilir. En yüksek performansı elde etmek, mevcut adım bitmeden bir sonraki adım için veri sağlayan verimli bir girdi hattı gerektirir. tf.data API, esnek ve verimli girdi ardışık düzenleri oluşturmaya yardımcı olur. Bu belge, yüksek performanslı TensorFlow giriş işlem hatları oluşturmak için tf.data API'sinin nasıl kullanılacağını gösterir.

Devam etmeden önce, tf.data API'sinin nasıl kullanılacağını öğrenmek için TensorFlow giriş işlem hatları oluşturma kılavuzuna bakın.

Kaynaklar

Kurmak

import tensorflow as tf

import time

Bu kılavuz boyunca, bir veri kümesini yineleyecek ve performansı ölçeceksiniz. Tekrarlanabilir performans kıyaslamaları yapmak zor olabilir. Tekrarlanabilirliği etkileyen farklı faktörler şunları içerir:

  • Geçerli CPU yükü
  • ağ trafiği
  • Önbellek gibi karmaşık mekanizmalar

Tekrarlanabilir bir kıyaslama elde etmek için yapay bir örnek oluşturacaksınız.

veri kümesi

ArtificialDataset tf.data.Dataset adlı bir sınıf tanımlayarak başlayın. Bu veri kümesi:

  • num_samples örnekleri oluşturur (varsayılan 3'tür)
  • Bir dosyayı açmayı simüle etmek için ilk öğeden önce bir süre uyur
  • Bir dosyadan veri okumayı simüle etmek için her bir öğeyi üretmeden önce bir süre uyur
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Bu veri kümesi, her örneğin başına ve arasına sabit bir gecikme ekleyerek tf.data.Dataset.range benzer.

eğitim döngüsü

Ardından, bir veri kümesi üzerinde yinelemenin ne kadar sürdüğünü ölçen yapay bir eğitim döngüsü yazın. Eğitim süresi simüle edilir.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Performansı optimize edin

Performansın nasıl optimize edilebileceğini göstermek için ArtificialDataset performansını iyileştireceksiniz.

naif yaklaşım

Veri kümesini olduğu gibi yineleyerek, hile kullanmadan saf bir işlem hattıyla başlayın.

benchmark(ArtificialDataset())
tutucu4 l10n-yer
Execution time: 0.26497629899995445

Kaputun altında, yürütme süreniz şu şekilde harcandı:

Veri yürütme zaman grafiği - saf bir yöntem

Çizim, bir eğitim adımı gerçekleştirmenin şunları içerdiğini gösterir:

  • Henüz açılmamış bir dosyanın açılması
  • Dosyadan bir veri girişi getirme
  • Verilerin eğitim için kullanılması

Ancak, buradaki gibi saf bir eşzamanlı uygulamada, boru hattınız verileri alırken modeliniz boşta oturuyor. Tersine, modeliniz eğitim alırken, giriş boru hattı boşta oturuyor. Eğitim adım süresi böylece açılış, okuma ve eğitim sürelerinin toplamıdır.

Sonraki bölümler, bu girdi ardışık düzenini temel alarak, performanslı TensorFlow girdi ardışık düzenlerini tasarlamak için en iyi uygulamaları gösterir.

ön yükleme

Önceden getirme, bir eğitim adımının ön işlemesi ve model yürütmesiyle örtüşür. Model s eğitim adımını yürütürken, girdi ardışık düzeni s+1 adımı için verileri okuyor. Bunu yapmak, adım süresini (toplamın aksine) eğitimin maksimum değerine ve verilerin çıkarılması için gereken süreye azaltır.

tf.data API, tf.data.Dataset.prefetch dönüşümünü sağlar. Verinin üretildiği zamanı, verinin tüketildiği zamandan ayırmak için kullanılabilir. Özellikle, dönüşüm, girdi veri kümesindeki öğeleri talep edilmeden önce önceden getirmek için bir arka plan iş parçacığı ve bir dahili arabellek kullanır. Önceden getirilecek öğelerin sayısı, tek bir eğitim adımı tarafından tüketilen partilerin sayısına eşit (veya muhtemelen daha fazla) olmalıdır. Bu değeri manuel olarak ayarlayabilir veya tf.data.AUTOTUNE olarak ayarlayabilirsiniz; bu, tf.data çalışma zamanının değeri çalışma zamanında dinamik olarak ayarlamasını isteyecektir.

Önceden getirme dönüşümünün, bir "üreticinin" işiyle bir "tüketicinin" işinin örtüşmesi için bir fırsat olduğu her zaman fayda sağladığını unutmayın.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
tutucu6 l10n-yer
Execution time: 0.21731788600027357

Veri yürütme zaman grafiği - önceden getirme yöntemi

Şimdi, veri yürütme zaman grafiğinin gösterdiği gibi, eğitim adımı örnek 0 için çalışırken, giriş ardışık düzeni örnek 1 için verileri okuyor ve bu böyle devam ediyor.

Paralelleştirme veri çıkarma

Gerçek dünya ortamında, giriş verileri uzaktan depolanabilir (örneğin, Google Cloud Storage veya HDFS'de). Verileri yerel olarak okurken iyi çalışan bir veri kümesi ardışık düzeni, yerel ve uzak depolama arasındaki aşağıdaki farklar nedeniyle verileri uzaktan okurken G/Ç'de darboğaz oluşturabilir:

  • İlk bayt süresi : Bir dosyanın ilk baytını uzak depolamadan okumak, yerel depolamadan çok daha uzun siparişler alabilir.
  • Okuma verimi : Uzak depolama tipik olarak büyük bir toplam bant genişliği sunarken, tek bir dosyayı okumak bu bant genişliğinin yalnızca küçük bir kısmını kullanabilir.

Ek olarak, ham baytlar belleğe yüklendikten sonra, ek hesaplama gerektiren verilerin seri hale getirilmesi ve/veya şifresinin çözülmesi (örneğin protobuf ) gerekli olabilir. Bu ek yük, verilerin yerel olarak mı yoksa uzaktan mı depolandığına bakılmaksızın mevcuttur, ancak veriler etkin bir şekilde önceden getirilmezse uzak durumda daha kötü olabilir.

Çeşitli veri çıkarma genel giderlerinin etkisini azaltmak için, tf.data.Dataset.interleave dönüşümü, diğer veri kümelerinin (veri dosyası okuyucuları gibi) içeriklerini araya ekleyerek veri yükleme adımını paralel hale getirmek için kullanılabilir. Çakışacak veri kümelerinin sayısı cycle_length bağımsız değişkeni ile belirtilebilirken, paralellik düzeyi num_parallel_calls bağımsız değişkeni tarafından belirtilebilir. Önceden prefetch dönüşümüne benzer şekilde, interleave ekleme dönüşümü tf.data.AUTOTUNE destekler ve bu, tf.data çalışma zamanına hangi düzeyde paralellik kullanılacağına ilişkin kararı devreder.

sıralı serpiştirme

tf.data.Dataset.interleave dönüşümünün varsayılan argümanları, iki veri kümesinden tek örnekleri sırayla serpiştirmesini sağlar.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
tutucu8 l10n-yer
Execution time: 0.4987426460002098

Veri yürütme zaman grafiği - sıralı serpiştirme

Bu veri yürütme zaman grafiği, mevcut iki veri kümesinden alternatif olarak örnekler getirerek, interleave ekleme dönüşümünün davranışını sergilemeye izin verir. Ancak burada herhangi bir performans iyileştirmesi söz konusu değildir.

paralel serpiştirme

Şimdi, interleave ekleme dönüşümünün num_parallel_calls bağımsız değişkenini kullanın. Bu, birden çok veri kümesini paralel olarak yükleyerek dosyaların açılmasını bekleyen süreyi azaltır.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
tutucu10 l10n-yer
Execution time: 0.283668874000341

Veri yürütme zaman grafiği - paralel serpiştirme yöntemi

Bu kez, veri yürütme zaman grafiğinin gösterdiği gibi, iki veri kümesinin okunması paralel hale getirilerek küresel veri işleme süresi azaltılır.

Paralelleştirme veri dönüşümü

Veri hazırlanırken girdi öğelerinin önceden işlenmesi gerekebilir. Bu amaçla, tf.data API, giriş veri kümesinin her bir öğesine kullanıcı tanımlı bir işlev uygulayan tf.data.Dataset.map dönüşümünü sunar. Giriş öğeleri birbirinden bağımsız olduğundan, ön işleme birden çok CPU çekirdeği arasında paralel hale getirilebilir. Bunu mümkün kılmak için, prefetch ve interleave ekleme dönüşümlerine benzer şekilde, map dönüşümü paralellik düzeyini belirtmek için num_parallel_calls argümanını sağlar.

num_parallel_calls argümanı için en iyi değeri seçmek, donanımınıza, eğitim verilerinizin özelliklerine (boyutu ve şekli gibi), harita işlevinizin maliyetine ve aynı anda CPU'da başka hangi işlemlerin gerçekleştiğine bağlıdır. Basit bir buluşsal yöntem, mevcut CPU çekirdeği sayısını kullanmaktır. Ancak, prefetch ve interleave ekleme dönüşümüne gelince, map dönüşümü tf.data.AUTOTUNE destekler ve bu da tf.data çalışma zamanına hangi düzeyde paralellik kullanılacağına ilişkin kararı devreder.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

sıralı eşleme

Temel örnek olarak paralellik olmadan map dönüşümünü kullanarak başlayın.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
tutucu13 l10n-yer
Execution time: 0.4505277170001136

Veri yürütme zaman grafiği - sıralı eşleme yöntemi

Saf yaklaşıma gelince, burada, grafiğin gösterdiği gibi, açma, okuma, ön işleme (haritalama) ve eğitim adımları için harcanan süreler tek bir yineleme için toplanır.

paralel haritalama

Şimdi, aynı ön işleme işlevini kullanın, ancak bunu birden çok örneğe paralel olarak uygulayın.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
tutucu15 l10n-yer
Execution time: 0.2839677860001757

Veri yürütme süresi - paralel eşleme

Veri grafiğinin gösterdiği gibi, ön işleme adımları örtüşerek tek bir yineleme için toplam süreyi azaltır.

Önbelleğe almak

tf.data.Dataset.cache dönüşümü, bellekte veya yerel depolamada bir veri kümesini önbelleğe alabilir. Bu, bazı işlemlerin (dosya açma ve veri okuma gibi) her çağda yürütülmesini önleyecektir.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
tutucu17 l10n-yer
Execution time: 0.3848854380003104

Veri yürütme süresi - önbelleğe alınmış veri kümesi yöntemi

Burada, veri yürütme zaman grafiği, bir veri kümesini önbelleğe aldığınızda, cache önceki dönüşümlerin (dosya açma ve veri okuma gibi) yalnızca ilk dönem sırasında yürütüldüğünü gösterir. Sonraki dönemler, cache dönüşümü tarafından önbelleğe alınan verileri yeniden kullanacak.

map dönüşümüne geçirilen kullanıcı tanımlı işlev pahalıysa, elde edilen veri kümesi belleğe veya yerel depolamaya sığabildiği sürece map dönüşümünden sonra cache dönüşümünü uygulayın. Kullanıcı tanımlı işlev, veri kümesini önbellek kapasitesinin ötesinde depolamak için gereken alanı artırırsa, bunu cache dönüştürmesinden sonra uygulayın veya kaynak kullanımını azaltmak için eğitim işinizden önce verilerinizi önceden işlemeyi düşünün.

haritalamayı vektörleştirme

map dönüşümüne geçirilen kullanıcı tanımlı bir işlevi çağırmak, kullanıcı tanımlı işlevi zamanlama ve yürütmeyle ilgili ek yüke sahiptir. Kullanıcı tanımlı işlevi vektörleştirin (yani, aynı anda bir grup girdi üzerinde çalışmasını sağlayın) ve batch dönüştürmeyi map dönüştürmesinden önce uygulayın.

Bu iyi uygulamayı göstermek için yapay veri kümeniz uygun değildir. Programlama gecikmesi yaklaşık 10 mikrosaniyedir (10e-6 saniye), bu, ArtificialDataset Veri Kümesi'nde kullanılan onlarca milisaniyeden çok daha azdır ve bu nedenle etkisini görmek zordur.

Bu örnek için, temel tf.data.Dataset.range işlevini kullanın ve eğitim döngüsünü en basit biçimine basitleştirin.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

skaler haritalama

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
tutucu20 l10n-yer
Execution time: 0.2712608739998359

Veri yürütme süresi - skaler harita yöntemi

Yukarıdaki çizim, skaler haritalama yöntemini kullanarak (daha az örnekle) neler olduğunu göstermektedir. Her bir örnek için eşlenen işlevin uygulandığını gösterir. Bu işlev çok hızlı olsa da, zaman performansını etkileyen bazı ek yükleri vardır.

vektörleştirilmiş eşleme

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
tutucu22 l10n-yer
Execution time: 0.02737950600021577

Veri yürütme süresi - vektörleştirilmiş harita yöntemi

Bu sefer, eşlenen işlev bir kez çağrılır ve bir numune partisine uygulanır. Veri yürütme zaman grafiğinin gösterdiği gibi, işlevin yürütülmesi daha fazla zaman alabilirken, ek yük yalnızca bir kez görünerek genel zaman performansını iyileştirir.

Bellek ayak izini azaltmak

interleave , prefetch ve shuffle dahil olmak üzere bir dizi dönüşüm, öğelerin bir dahili arabelleğini korur. Harita dönüşümüne geçirilen kullanıcı tanımlı işlev, öğelerin boyutunu değiştirirse, map dönüşümünün sırası ve arabelleğe alınan öğelerin dönüşümleri bellek kullanımını etkiler. Genel olarak, performans için farklı bir sıralama istenmediği sürece, daha düşük bellek ayak izi ile sonuçlanan sırayı seçin.

Kısmi hesaplamaları önbelleğe alma

Bu dönüşümün verileri belleğe sığmayacak kadar büyük hale getirmesi dışında, map dönüşümünden sonra veri kümesinin önbelleğe alınması önerilir. Eşlenen işleviniz iki parçaya bölünebilirse, bir ödünleşim elde edilebilir: zaman alan parça ve bellek tüketen parça. Bu durumda dönüşümlerinizi aşağıdaki gibi zincirleyebilirsiniz:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Bu şekilde, zaman alıcı kısım yalnızca ilk epoch sırasında yürütülür ve çok fazla önbellek alanı kullanmaktan kaçınırsınız.

En iyi uygulama özeti

Performanslı TensorFlow giriş ardışık düzenlerini tasarlamaya yönelik en iyi uygulamaların bir özetini burada bulabilirsiniz:

Rakamların çoğaltılması

tf.data.Dataset API anlayışında daha derine inmek için kendi ardışık düzenlerinizle oynayabilirsiniz. Aşağıda, bu kılavuzdaki görüntüleri çizmek için kullanılan kod bulunmaktadır. Aşağıdakiler gibi yaygın zorluklar için bazı geçici çözümler göstererek iyi bir başlangıç ​​noktası olabilir:

  • Yürütme süresi tekrarlanabilirliği
  • Eşlenen işlevler istekli yürütme
  • interleave dönüşüm çağrılabilir
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

veri kümesi

ArtificialDataset Veri Kümesine benzer şekilde, her adımda harcanan süreyi döndüren bir veri kümesi oluşturabilirsiniz.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Bu veri kümesi, şekil [[2, 1], [2, 2], [2, 3]] ve [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Her örnek:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Neresi:

  • Open ve Read adım tanımlayıcılarıdır
  • t0 , ilgili adımın başladığı zaman damgasıdır
  • d , ilgili adımda harcanan zamandır
  • i örnek diziniyim
  • e , dönem indeksidir (veri kümesinin tekrarlanma sayısı)
  • s örnek dizindir

yineleme döngüsü

Tüm zamanlamaları toplamak için yineleme döngüsünü biraz daha karmaşık hale getirin. Bu, yalnızca yukarıda ayrıntılı olarak açıklandığı gibi örnekler üreten veri kümeleriyle çalışacaktır.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

çizim yöntemi

Son olarak, timelined_benchmark işlevi tarafından döndürülen değerlerle bir zaman çizelgesi çizebilen bir işlev tanımlayın.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Eşlenen işlev için sarmalayıcıları kullanın

Eşlenmiş işlevi istekli bir bağlamda çalıştırmak için, bunları bir tf.py_function çağrısı içine sarmanız gerekir.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Boru hatları karşılaştırması

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Toy

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
tutucu32 l10n-yer
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

Optimize

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
-yer tutucu35 l10n-yer
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png