Pomoc chronić Wielkiej Rafy Koralowej z TensorFlow na Kaggle Dołącz Wyzwanie

Lepsza wydajność dzięki API tf.data

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Procesory GPU i TPU mogą radykalnie skrócić czas potrzebny na wykonanie pojedynczego etapu uczenia. Osiągnięcie szczytowej wydajności wymaga wydajnego potoku wejściowego, który dostarcza dane do następnego kroku przed zakończeniem bieżącego kroku. tf.data API umożliwia budowanie elastycznych i efektywnych rurociągów wejściowych. Ten dokument pokazuje, jak korzystać z tf.data API do budowy wysoce wydajnych rurociągi wejściowych TensorFlow.

Przed kontynuowaniem należy sprawdzić rurociągi wejściowe Budowa TensorFlow kierować, aby dowiedzieć się, jak korzystać z tf.data API.

Zasoby

Ustawiać

import tensorflow as tf

import time

W tym przewodniku będziesz iterować po zbiorze danych i mierzyć wydajność. Tworzenie powtarzalnych testów wydajności może być trudne. Różne czynniki wpływające na odtwarzalność obejmują:

  • Aktualne obciążenie procesora
  • Ruch sieciowy
  • Złożone mechanizmy, takie jak pamięć podręczna

Aby uzyskać powtarzalny benchmark, zbudujesz sztuczny przykład.

Zbiór danych

Zacznij od zdefiniowania klasy dziedziczenie z tf.data.Dataset nazywa ArtificialDataset . Ten zbiór danych:

  • Generuje num_samples próbek (domyślnie 3)
  • Uśpiony przez jakiś czas przed pierwszym elementem, aby zasymulować otwarcie pliku
  • Uśpiony przez pewien czas przed wyprodukowaniem każdego elementu, aby symulować odczytywanie danych z pliku
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Ten zestaw danych jest podobna do tf.data.Dataset.range jednego, dodając stałą opóźnienie na początku i w między każdej próbki.

Pętla treningowa

Następnie napisz fikcyjną pętlę treningową, która mierzy, ile czasu zajmuje iteracja zestawu danych. Symulowany jest czas treningu.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Optymalizacja wydajności

Wykazują, jak można zoptymalizować wydajność, można poprawić wydajność ArtificialDataset .

Naiwne podejście

Zacznij od naiwnego potoku bez żadnych sztuczek, iterując po zbiorze danych bez zmian.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

Pod maską tak spędziłeś czas egzekucji:

Wykres czasu wykonania danych - metoda naiwna

Wykres pokazuje, że wykonanie kroku treningowego obejmuje:

  • Otwieranie pliku, jeśli nie został jeszcze otwarty
  • Pobieranie wpisu danych z pliku
  • Wykorzystanie danych do treningu

Jednak w naiwnej implementacji synchronicznej, takiej jak tutaj, podczas gdy potok pobiera dane, model jest bezczynny. I odwrotnie, gdy model jest trenowany, potok wejściowy jest bezczynny. Czas kroku treningowego jest więc sumą czasów otwierania, czytania i treningu.

Następne sekcje opierają się na tym potoku wejściowym, ilustrując najlepsze praktyki projektowania wydajnych potoków wejściowych TensorFlow.

Pobieranie wstępne

Pobieranie wstępne nakłada się na przetwarzanie wstępne i wykonanie modelu etapu uczenia. Podczas gdy model jest wykonywany krok szkolenia s rurociąg wejście odczytuje dane dla etapu s+1 . Spowoduje to skrócenie czasu kroku do maksimum (w przeciwieństwie do sumy) szkolenia i czasu potrzebnego na wyodrębnienie danych.

tf.data API zapewnia tf.data.Dataset.prefetch transformacji. Może służyć do oddzielenia czasu generowania danych od czasu, w którym dane są zużywane. W szczególności transformacja wykorzystuje wątek w tle i wewnętrzny bufor do wstępnego pobierania elementów z wejściowego zestawu danych przed ich żądaniem. Liczba elementów do pobrania z wyprzedzeniem powinna być równa (lub być może większa niż) liczbie partii zużywanych przez pojedynczy krok uczenia. Można było ręcznie dostroić tę wartość lub ustawić go na tf.data.AUTOTUNE , który poprosi tf.data czas pracy, aby dostroić wartość dynamicznie przy starcie.

Należy zauważyć, że transformacja pobierania z wyprzedzeniem zapewnia korzyści za każdym razem, gdy istnieje możliwość nałożenia pracy „producenta” z pracą „konsumenta”.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

Wykres czasu wykonania danych - metoda pobierania z wyprzedzeniem

Teraz, jak pokazuje wykres czasu wykonania danych, podczas gdy krok uczenia jest uruchomiony dla próbki 0, potok wejściowy odczytuje dane dla próbki 1 i tak dalej.

Równoległa ekstrakcja danych

W warunkach rzeczywistych dane wejściowe mogą być przechowywane zdalnie (na przykład w Google Cloud Storage lub HDFS). Potok zestawu danych, który działa dobrze podczas lokalnego odczytywania danych, może powodować wąskie gardło we/wy podczas zdalnego odczytu danych z powodu następujących różnic między magazynem lokalnym i zdalnym:

  • Czas do pierwszego bajta: Czytając pierwszy bajt pliku ze zdalnego przechowywania mogą wziąć rzędy wielkości więcej niż z pamięci lokalnej.
  • Czytaj przepustowość: Podczas zdalnego przechowywania zazwyczaj oferuje dużą przepustowość zagregowaną, czytając jeden plik może być w stanie wykorzystać tylko niewielką część tego pasma.

Ponadto po surowe bajty są ładowane do pamięci, może być również konieczne deserializowania i / lub odszyfrowania danych (na przykład buforów ), co wymaga dodatkowych obliczeń. Ten narzut występuje niezależnie od tego, czy dane są przechowywane lokalnie czy zdalnie, ale może być gorszy w przypadku zdalnym, jeśli dane nie są skutecznie pobierane z wyprzedzeniem.

W celu złagodzenia wpływu różnych ogólnych ekstrakcji danych, tf.data.Dataset.interleave transformacja może być stosowany do parallelize etap ładowania danych, przeplatanie zawartość innych zbiorów danych (takich jak czytniki plików danych). Liczba zestawów danych do nakładania może być określony przez cycle_length argumentu, podczas gdy poziom równoległości może być określony przez num_parallel_calls argument. Podobne do prefetch transformacji, interleave transformacja obsługuje tf.data.AUTOTUNE , który przekaże decyzję o jakim poziomie równoległości do użytku do tf.data wykonawcze.

Przeplatanie sekwencyjne

Domyślne argumenty tf.data.Dataset.interleave przemiany sprawiają, że przeplatają pojedyncze próbki z dwóch zestawów danych sekwencyjnie.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

Wykres czasu wykonania danych - przeplatanie sekwencyjne

Ta realizacja umożliwia wykres danych w czasie wykazywać zachowanie interleave transformację, pobieranie próbek alternatywnie z dwóch zestawów danych dostępnych. Jednak nie jest to związane z poprawą wydajności.

Przeplatanie równoległe

Teraz użyj num_parallel_calls argument interleave transformacji. To ładuje wiele zestawów danych równolegle, skracając czas oczekiwania na otwarcie plików.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

Wykres czasu wykonania danych - metoda przeplatania równoległego

Tym razem, jak pokazuje wykres czasu wykonania danych, odczyt dwóch zestawów danych jest zrównoleglony, co skraca globalny czas przetwarzania danych.

Równoległa transformacja danych

Podczas przygotowywania danych elementy wejściowe mogą wymagać wstępnego przetworzenia. W tym celu, tf.data oferty API tf.data.Dataset.map transformacji, która ma zastosowanie funkcji zdefiniowanej przez użytkownika do każdego elementu zbioru danych wejściowych. Ponieważ elementy wejściowe są od siebie niezależne, przetwarzanie wstępne można przeprowadzić równolegle na wielu rdzeniach procesora. Aby było to możliwe, podobnie jak prefetch i interleave przekształceń The map transformacja zapewnia num_parallel_calls argumentu, aby określić poziom równoległości.

Wybór najlepszej wartości dla num_parallel_calls argumentu zależy od sprzętu, charakterystyka dane treningowe (takie jak jego wielkości i kształtu), kosztem swojej funkcji map, a co innego typu przetwarzania dzieje się na CPU w tym samym czasie. Prosta heurystyka polega na wykorzystaniu liczby dostępnych rdzeni procesora. Jednakże, jak w przypadku prefetch i interleave transformacji, map transformacja obsługuje tf.data.AUTOTUNE który przekaże decyzję o jakim poziomie równoległości do użytku do tf.data wykonawcze.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Mapowanie sekwencyjne

Uruchom za pomocą map transformacji bez równoległości jako przykład bazowym.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

Wykres czasu wykonania danych - sekwencyjna metoda mapowania

Co do naiwnych podejścia tutaj, jak widać na wydruku, czas potrzebny na otwarcie, odczyt, przetwarzanie wstępne (odwzorowanie) oraz etapy szkolenia Podsumowując razem przez pojedynczy iteracji.

Mapowanie równoległe

Teraz użyj tej samej funkcji przetwarzania wstępnego, ale zastosuj ją równolegle do wielu próbek.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

Czas wykonania danych - mapowanie równoległe

Jak pokazuje wykres danych, etapy przetwarzania wstępnego nakładają się, co skraca całkowity czas pojedynczej iteracji.

Buforowanie

tf.data.Dataset.cache transformacja może buforować zestawu danych, zarówno w pamięci lub w pamięci lokalnej. Pozwoli to zaoszczędzić niektóre operacje (takie jak otwieranie plików i odczytywanie danych) przed wykonaniem w każdej epoce.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

Czas wykonania danych - metoda buforowanego zbioru danych

Tutaj wykonanie dane czasie pokazuje, że jeśli działka buforować zestawu danych, transformacje przed cache jednego (jak otwarcie pliku oraz odczytu danych) są realizowane tylko podczas pierwszej epoki. Kolejne epoki będzie ponownie wykorzystać dane buforowane przez cache transformacji.

Jeśli funkcja zdefiniowana przez użytkownika przeszedł do map transformacji jest drogie, zastosować cache transformacji po map transformacji, o ile powstały zbiór danych można jeszcze dopasować do pamięci lub pamięci lokalnej. Jeśli funkcja zdefiniowana przez użytkownika zwiększa przestrzeń wymagana do przechowywania zestawu danych poza pojemności pamięci podręcznej, czy stosuje się go po cache transformacji lub rozważyć wstępne przetwarzanie danych przed swojej pracy szkoleniowej w celu zmniejszenia zużycia zasobów.

Mapowanie wektoryzujące

Wywoływanie funkcji zdefiniowanej przez użytkownika przekazywany do map transformacji ma narzut związany z planowaniem i wykonaniem funkcji zdefiniowanej przez użytkownika. Wektorować funkcji zdefiniowanej przez użytkownika (czyli ma on działać na partię wejść na raz) i zastosować batch transformację przed map transformacji.

Aby zilustrować tę dobrą praktykę, Twój sztuczny zbiór danych nie jest odpowiedni. Opóźnieniem planowego wynosi około 10 mikrosekund (10e-6 sekund), mniej niż kilkadziesiąt milisekund stosowanych w ArtificialDataset , a więc ich wpływ jest trudny do zobaczenia.

W tym przykładzie, należy bazową tf.data.Dataset.range funkcję i uproszczenie pętli treningowy najprostszej formie.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Mapowanie skalarne

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

Czas wykonania danych - metoda mapy skalarnej

Powyższy wykres ilustruje, co się dzieje (przy mniejszej liczbie próbek) przy użyciu metody mapowania skalarnego. Pokazuje, że mapowana funkcja jest stosowana dla każdej próbki. Chociaż ta funkcja jest bardzo szybka, ma pewne narzuty, które wpływają na wydajność czasu.

Mapowanie wektorowe

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

Czas wykonania danych - metoda zwektoryzowanej mapy

Tym razem mapowana funkcja jest wywoływana raz i dotyczy partii próbki. Jak pokazuje wykres czasu wykonania danych, chociaż wykonanie funkcji może zająć więcej czasu, narzut pojawia się tylko raz, poprawiając ogólną wydajność czasu.

Zmniejszenie zużycia pamięci

Niektóre transformacje, w tym interleave , prefetch i shuffle , utrzymać wewnętrzny bufor elementów. Jeśli funkcja zdefiniowany przez użytkownika przekazywane do map transformacji zmienia rozmiary elementów, po czym w kolejności transformacji mapy i transformacji, że elementy buforowe wpływa wykorzystania pamięci. Ogólnie rzecz biorąc, wybierz kolejność, która skutkuje mniejszym zużyciem pamięci, chyba że inna kolejność jest pożądana ze względu na wydajność.

Buforowanie częściowych obliczeń

Zaleca się, aby buforować zestawu danych po map transformacji wyjątkiem jeśli transformacja ta sprawia, że dane zbyt duża, aby zmieścić się w pamięci. Kompromis można osiągnąć, jeśli mapowaną funkcję można podzielić na dwie części: część pochłaniającą czas i część pochłaniającą pamięć. W takim przypadku możesz połączyć swoje transformacje jak poniżej:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

W ten sposób część czasochłonna jest wykonywana tylko w pierwszej epoce i unikasz używania zbyt dużej ilości pamięci podręcznej.

Podsumowanie najlepszych praktyk

Oto podsumowanie najlepszych praktyk projektowania wydajnych potoków wejściowych TensorFlow:

Odwzorowanie postaci

Głębiej w tf.data.Dataset zrozumienia API, można grać z własnych rurociągów. Poniżej znajduje się kod użyty do wykreślenia obrazów z tego przewodnika. Może to być dobry punkt wyjścia, pokazujący niektóre obejścia typowych problemów, takich jak:

  • Odtwarzalność czasu wykonania
  • Mapowane funkcje chętne do wykonania
  • interleave transformacja wywoływalnym
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Zbiór danych

Podobne do ArtificialDataset można zbudować zbiór danych powracającego czas spędzony w każdym kroku.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Ten zbiór danych dostarcza próbki kształtu [[2, 1], [2, 2], [2, 3]] i typu [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Każda próbka to:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Gdzie:

  • Open i Read są kroki identyfikatory
  • t0 jest znacznik czasu, gdy odpowiedni etap rozpoczęty
  • d jest czas spędzony w odpowiednim punkcie
  • i jest indeksem wystąpienie
  • e jest indeks Epoki (ile razy zbiór danych został iterowany)
  • s jest indeksem próbki

Pętla iteracji

Spraw, aby pętla iteracji była nieco bardziej skomplikowana, aby agregować wszystkie czasy. Będzie to działać tylko z zestawami danych generującymi próbki, jak opisano powyżej.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Metoda kreślenia

Wreszcie zdefiniować funkcję w stanie sporządzić harmonogram danej wartości zwracane przez timelined_benchmark funkcji.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Użyj wrapperów dla funkcji mapowanej

Aby uruchomić funkcję odwzorowany w gorliwym kontekście trzeba owinąć je wewnątrz tf.py_function rozmowy.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Porównanie rurociągów

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Naiwny

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

Zoptymalizowany

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png