Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Bessere Leistung mit der tf.data-API

Ansicht auf TensorFlow.org In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Überblick

GPUs und TPUs können die für die Ausführung eines einzelnen Trainingsschritts erforderliche Zeit drastisch reduzieren. Um eine Spitzenleistung zu erzielen, ist eine effiziente Eingabepipeline erforderlich, die Daten für den nächsten Schritt liefert, bevor der aktuelle Schritt abgeschlossen ist. Die tf.data API hilft beim Aufbau flexibler und effizienter Eingabepipelines. Dieses Dokument zeigt, wie Sie mit der tf.data API hochleistungsfähige TensorFlow-Eingabepipelines tf.data .

Bevor Sie fortfahren, lesen Sie das Handbuch " Build TensorFlow Input Pipelines ", um zu erfahren, wie Sie die tf.data API verwenden.

Ressourcen

Konfiguration

import tensorflow as tf

import time

In diesem Handbuch durchlaufen Sie ein Dataset und messen die Leistung. Es kann schwierig sein, reproduzierbare Leistungsbenchmarks zu erstellen. Verschiedene Faktoren wirken sich darauf aus:

  • die aktuelle CPU-Auslastung,
  • der Netzwerkverkehr,
  • komplexe Mechanismen wie Cache usw.

Erstellen Sie daher ein künstliches Beispiel, um einen reproduzierbaren Benchmark bereitzustellen.

Der Datensatz

Definieren Sie eine Klasse von Vererbungs tf.data.Dataset genannt ArtificialDataset . Dieser Datensatz:

  • generiert num_samples Samples (Standard ist 3)
  • Schläft einige Zeit vor dem ersten Element, um das Öffnen einer Datei zu simulieren
  • Schläft einige Zeit, bevor jedes Element erstellt wird, um das Lesen von Daten aus einer Datei zu simulieren
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=tf.dtypes.int64,
            output_shapes=(1,),
            args=(num_samples,)
        )

Dieser Datensatz ähnelt dem Datensatz tf.data.Dataset.range und fügt zu Beginn und zwischen den einzelnen Stichproben eine feste Verzögerung hinzu.

Die Trainingsschleife

Schreiben Sie eine Dummy-Trainingsschleife, die misst, wie lange es dauert, einen Datensatz zu durchlaufen. Die Trainingszeit wird simuliert.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    tf.print("Execution time:", time.perf_counter() - start_time)

Leistung optimieren

Um zu zeigen, wie die Leistung optimiert werden kann, verbessern Sie die Leistung des ArtificialDataset .

Der naive Ansatz

Beginnen Sie mit einer naiven Pipeline ohne Tricks und durchlaufen Sie den Datensatz so wie er ist.

benchmark(ArtificialDataset())
Execution time: 0.2530532629998561

Unter der Haube wurde Ihre Ausführungszeit folgendermaßen verbracht:

Naiv

Sie können sehen, dass das Ausführen eines Trainingsschritts Folgendes umfasst:

  • Öffnen einer Datei, falls diese noch nicht geöffnet wurde,
  • Abrufen eines Dateneintrags aus der Datei,
  • Verwendung der Daten für das Training.

In einer naiven synchronen Implementierung wie hier bleibt Ihr Modell jedoch im Leerlauf, während Ihre Pipeline die Daten abruft. Umgekehrt befindet sich die Eingabepipeline während des Trainings im Leerlauf. Die Trainingsschrittzeit ist somit die Summe aller Öffnungs-, Lese- und Trainingszeiten.

Die nächsten Abschnitte bauen auf dieser Eingabe-Pipeline auf und veranschaulichen Best Practices für das Entwerfen performanter TensorFlow-Eingabe-Pipelines.

Prefetching

Das Vorabrufen überlappt die Vorverarbeitung und Modellausführung eines Trainingsschritts. Während das Modell die Trainingsschritte s ausführt, liest die Eingabepipeline die Daten für die Schritte s+1 . Dadurch wird die Schrittzeit auf das Maximum (im Gegensatz zur Summe) des Trainings und die Zeit reduziert, die zum Extrahieren der Daten benötigt wird.

Die tf.data API stellt die tf.data.Dataset.prefetch Transformation. Es kann verwendet werden, um die Zeit, zu der Daten erzeugt werden, von der Zeit zu entkoppeln, zu der Daten verbraucht werden. Insbesondere verwendet die Transformation einen Hintergrundthread und einen internen Puffer, um Elemente aus dem Eingabedatensatz vorab abzurufen, bevor sie angefordert werden. Die Anzahl der Elemente, die vorab abgerufen werden sollen, sollte gleich (oder möglicherweise größer als) der Anzahl der Stapel sein, die von einem einzelnen Trainingsschritt verbraucht werden. Sie können diesen Wert entweder manuell tf.data.experimental.AUTOTUNE oder auf tf.data.experimental.AUTOTUNE setzen, tf.data.experimental.AUTOTUNE die Laufzeit von tf.data aufgefordert wird, den Wert zur Laufzeit dynamisch zu tf.data .

Beachten Sie, dass die Prefetch-Transformation immer dann Vorteile bietet, wenn die Möglichkeit besteht, die Arbeit eines "Produzenten" mit der Arbeit eines "Verbrauchers" zu überschneiden.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.experimental.AUTOTUNE)
)
Execution time: 0.20858672200006367

Vorabgerufen

Dieses Mal können Sie sehen, dass während der Trainingsschritt für Probe 0 ausgeführt wird, die Eingabepipeline die Daten für Probe 1 liest und so weiter.

Parallelisierung der Datenextraktion

In einer realen Umgebung können die Eingabedaten remote gespeichert werden (z. B. GCS oder HDFS). Eine Dataset-Pipeline, die beim lokalen Lesen von Daten gut funktioniert, kann beim Remote-Lesen von Daten aufgrund der folgenden Unterschiede zwischen lokalem und Remote-Speicher zu einem Engpass bei der E / A führen:

  • Zeit bis zum ersten Byte: Das Lesen des ersten Bytes einer Datei aus dem Remotespeicher kann um Größenordnungen länger dauern als aus dem lokalen Speicher.
  • Lesedurchsatz: Während Remotespeicher normalerweise eine große Gesamtbandbreite bietet, kann das Lesen einer einzelnen Datei möglicherweise nur einen kleinen Bruchteil dieser Bandbreite nutzen.

Darüber hinaus kann es nach dem Laden der Rohbytes in den Speicher erforderlich sein, die Daten (z. B. Protobuf ) zu deserialisieren und / oder zu entschlüsseln, was zusätzliche Berechnungen erfordert. Dieser Overhead ist unabhängig davon vorhanden, ob die Daten lokal oder remote gespeichert werden, kann jedoch im Remote-Fall schlimmer sein, wenn die Daten nicht effektiv vorab abgerufen werden.

Um die Auswirkungen der verschiedenen Datenextraktionskosten zu tf.data.Dataset.interleave , kann die Transformation tf.data.Dataset.interleave verwendet werden, um den tf.data.Dataset.interleave zu parallelisieren und den Inhalt anderer Datensätze (z. B. tf.data.Dataset.interleave zu verschachteln. Die Anzahl der zu überlappenden Datasets kann durch das Argument cycle_length angegeben werden, während der Grad der Parallelität durch das Argument num_parallel_calls angegeben werden num_parallel_calls . Ähnlich wie bei der prefetch Transformation unterstützt die interleave Transformation tf.data.experimental.AUTOTUNE wodurch die Entscheidung über die zu verwendende Parallelitätsstufe an die Laufzeit von tf.data wird.

Sequentielle Verschachtelung

Mit den Standardargumenten der Transformation tf.data.Dataset.interleave werden einzelne Stichproben aus zwei Datensätzen nacheinander verschachtelt.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(ArtificialDataset)
)
Execution time: 0.2373930549999841

Sequentielle Verschachtelung

Dieses Diagramm ermöglicht es, das Verhalten der interleave Transformation darzustellen und alternativ Stichproben aus den beiden verfügbaren Datensätzen abzurufen. Hierbei handelt es sich jedoch nicht um eine Leistungsverbesserung.

Parallele Verschachtelung

Verwenden Sie nun das Argument num_parallel_calls der interleave Transformation. Dadurch werden mehrere Datensätze parallel geladen, wodurch die Wartezeit für das Öffnen der Dateien verkürzt wird.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        ArtificialDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)
Execution time: 0.1730301249999684

Parallele Verschachtelung

Dieses Mal wird das Lesen der beiden Datensätze parallelisiert, wodurch die globale Datenverarbeitungszeit verkürzt wird.

Parallelisierung der Datentransformation

Bei der Vorbereitung von Daten müssen Eingabeelemente möglicherweise vorverarbeitet werden. Zu diesem Zweck bietet die tf.data API die Transformation tf.data.Dataset.map , die auf jedes Element des Eingabedatensatzes eine benutzerdefinierte Funktion anwendet. Da die Eingabeelemente unabhängig voneinander sind, kann die Vorverarbeitung über mehrere CPU-Kerne hinweg parallelisiert werden. Um dies zu ermöglichen, liefert die map Transformation ähnlich wie die prefetch und interleave Transformationen das Argument num_parallel_calls , um den Grad der Parallelität anzugeben.

Die Auswahl des besten Werts für das Argument num_parallel_calls hängt von Ihrer Hardware, den Eigenschaften Ihrer Trainingsdaten (wie Größe und Form), den Kosten Ihrer Kartenfunktion und der gleichzeitigen anderen Verarbeitung auf der CPU ab. Eine einfache Heuristik besteht darin, die Anzahl der verfügbaren CPU-Kerne zu verwenden. Bei der prefetch und interleave Transformation unterstützt die map Transformation jedoch tf.data.experimental.AUTOTUNE wodurch die Entscheidung über die zu verwendende Parallelitätsstufe an die tf.data Laufzeit tf.data wird.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Sequentielle Zuordnung

Verwenden Sie zunächst die map ohne Parallelität als Basisbeispiel.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.43913738300011573

Sequentielle Zuordnung

Was den naiven Ansatz betrifft, so summieren sich hier die für das Öffnen, Lesen, Vorverarbeiten (Mapping) und Trainingsschritte aufgewendeten Zeiten zu einer einzigen Iteration.

Parallele Zuordnung

Verwenden Sie jetzt dieselbe Vorverarbeitungsfunktion, wenden Sie sie jedoch parallel auf mehrere Proben an.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)
Execution time: 0.2730358689998411

Parallele Zuordnung

Jetzt können Sie im Diagramm sehen, dass sich die Vorverarbeitungsschritte überlappen, wodurch sich die Gesamtzeit für eine einzelne Iteration verringert.

Caching

Die Transformation tf.data.Dataset.cache kann ein Dataset entweder im Speicher oder im lokalen Speicher zwischenspeichern. Dadurch wird verhindert, dass einige Vorgänge (wie das Öffnen von Dateien und das Lesen von Daten) während jeder Epoche ausgeführt werden.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.36568501300007483

Zwischengespeicherter Datensatz

Wenn Sie ein Dataset zwischenspeichern, werden die Transformationen vor dem cache (wie das Öffnen von Dateien und das Lesen von Daten) nur während der ersten Epoche ausgeführt. In den nächsten Epochen werden die von der cache Transformation zwischengespeicherten Daten wiederverwendet.

Wenn die in die map Transformation übergebene benutzerdefinierte Funktion teuer ist, wenden Sie die cache Transformation nach der map Transformation an, solange das resultierende Dataset noch in den Speicher oder in den lokalen Speicher passt. Wenn die benutzerdefinierte Funktion den zum Speichern des Datasets erforderlichen Speicherplatz über die Cache-Kapazität hinaus erhöht, wenden Sie sie entweder nach der cache Umwandlung an oder ziehen Sie in Betracht, Ihre Daten vor Ihrem Schulungsjob vorzuverarbeiten, um den Ressourcenverbrauch zu verringern.

Vektorisierung der Zuordnung

Das Aufrufen einer benutzerdefinierten Funktion, die an die map , ist mit dem Planen und Ausführen der benutzerdefinierten Funktion verbunden. Wir empfehlen, die benutzerdefinierte Funktion zu vektorisieren (dh sie muss über einen Stapel von Eingaben gleichzeitig ausgeführt werden) und die batch vor der map anzuwenden.

Um diese bewährte Methode zu veranschaulichen, ist Ihr künstlicher Datensatz nicht geeignet. Die Planungsverzögerung beträgt etwa 10 Mikrosekunden (10e-6 Sekunden), weit weniger als die im ArtificialDataset verwendeten zehn Millisekunden, und daher ist ihre Auswirkung schwer zu erkennen.

Verwenden Sie in diesem Beispiel die Funktion base tf.data.Dataset.range und vereinfachen Sie die Trainingsschleife in ihrer einfachsten Form.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Skalare Zuordnung

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.8861004689999845

Skalarkarte

Die obige Darstellung zeigt, was los ist (mit weniger Beispielen). Sie können sehen, dass die zugeordnete Funktion für jede Probe angewendet wird. Diese Funktion ist zwar sehr schnell, hat jedoch einen gewissen Overhead, der sich auf die Zeitleistung auswirkt.

Vektorisierte Zuordnung

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.032729552000091644

Vektorisierte Karte

Dieses Mal wird die zugeordnete Funktion einmal aufgerufen und gilt für eine Probencharge. Während die Ausführung der Funktion länger dauern kann, wird der Overhead nur einmal angezeigt, wodurch die Gesamtzeitleistung verbessert wird.

Reduzierung des Speicherbedarfs

Eine Reihe von Transformationen, einschließlich interleave , prefetch und shuffle , verwalten einen internen Puffer von Elementen. Wenn die benutzerdefinierte Funktion, die an die map Transformation übergeben wird, die Größe der Elemente ändert, wirken sich die Reihenfolge der Map-Transformation und die Transformationen, die Elemente puffern, auf die Speichernutzung aus. Im Allgemeinen empfehlen wir, die Reihenfolge zu wählen, die zu einem geringeren Speicherbedarf führt, es sei denn, eine andere Reihenfolge ist für die Leistung wünschenswert.

Teilberechnungen zwischenspeichern

Es wird empfohlen, das Dataset nach der map Transformation zwischenzuspeichern, es sei denn, diese Transformation macht die Daten zu groß, um in den Speicher zu passen. Ein Kompromiss kann erzielt werden, wenn Ihre zugeordnete Funktion in zwei Teile aufgeteilt werden kann: einen zeitaufwändigen und einen speicheraufwendigen Teil. In diesem Fall können Sie Ihre Transformationen wie folgt verketten:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Auf diese Weise wird der zeitaufwändige Teil nur während der ersten Epoche ausgeführt, und Sie vermeiden, zu viel Cache-Speicherplatz zu verwenden.

Best-Practice-Zusammenfassung

Hier ist eine Zusammenfassung der Best Practices für das Entwerfen performanter TensorFlow-Eingabepipelines:

Reproduktion der Figuren

Um das Verständnis der tf.data.Dataset API zu tf.data.Dataset , können Sie mit Ihren eigenen Pipelines spielen. Unten finden Sie den Code, mit dem die Bilder aus diesem Handbuch gezeichnet werden. Es kann ein guter Ausgangspunkt sein und einige Problemumgehungen für häufig auftretende Schwierigkeiten aufzeigen, wie z.

  • Reproduzierbarkeit der Ausführungszeit;
  • Zugeordnete Funktionen eifrige Ausführung;
  • interleave Transformation aufrufbar.
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Der Datensatz

Ähnlich wie beim ArtificialDataset Sie ein Dataset erstellen, das die in jedem Schritt verbrachte Zeit zurückgibt.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Dieser Datensatz enthält Beispiele für Form [[2, 1], [2, 2], [2, 3]] und vom Typ [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Jede Probe ist:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Wo:

  • Open und Read sind Schrittkennungen
  • t0 ist der Zeitstempel, zu dem der entsprechende Schritt gestartet wurde
  • d ist die Zeit, die im entsprechenden Schritt verbracht wird
  • i ist der Instanzindex
  • e ist der Epochenindex (Häufigkeit, mit der der Datensatz iteriert wurde)
  • s ist der Beispielindex

Die Iterationsschleife

Machen Sie die Iterationsschleife etwas komplizierter, um alle Timings zusammenzufassen. Dies funktioniert nur mit Datensätzen, die Beispiele generieren, wie oben beschrieben.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Die Plotmethode

Definieren Sie abschließend eine Funktion, mit der eine Zeitleiste mit den von der Funktion timelined_benchmark zurückgegebenen Werten timelined_benchmark kann.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Verwenden Sie Wrapper für die zugeordnete Funktion

Um die zugeordnete Funktion in einem eifrigen Kontext auszuführen, müssen Sie sie in einen Aufruf von tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Pipelines Vergleich

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Naiv

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
Execution time: 12.436093607999965

Optimiert

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.experimental.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.303204500999982

draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png