![]() | ![]() | ![]() | ![]() |
Überblick
GPUs und TPUs können die für die Ausführung eines einzelnen Trainingsschritts erforderliche Zeit drastisch reduzieren. Um eine Spitzenleistung zu erzielen, ist eine effiziente Eingabepipeline erforderlich, die Daten für den nächsten Schritt liefert, bevor der aktuelle Schritt abgeschlossen ist. Die tf.data
API hilft beim Aufbau flexibler und effizienter Eingabepipelines. Dieses Dokument zeigt, wie Sie mit der tf.data
API hochleistungsfähige TensorFlow-Eingabepipelines tf.data
.
Bevor Sie fortfahren, tf.data
das tf.data
zum Erstellen von TensorFlow-Eingabepipelines, um zu erfahren, wie Sie die tf.data
API verwenden.
Ressourcen
- Erstellen Sie TensorFlow-Eingabepipelines
tf.data.Dataset
API- Analysieren
tf.data
Leistung vontf.data
mit dem TF Profiler
Installieren
import tensorflow as tf
import time
In diesem Handbuch durchlaufen Sie ein Dataset und messen die Leistung. Es kann schwierig sein, reproduzierbare Leistungsbenchmarks zu erstellen. Verschiedene Faktoren, die die Reproduzierbarkeit beeinflussen, umfassen:
- Die aktuelle CPU-Auslastung
- Der Netzwerkverkehr
- Komplexe Mechanismen wie Cache
Um einen reproduzierbaren Benchmark zu erhalten, erstellen Sie ein künstliches Beispiel.
Der Datensatz
Beginnen Sie mit der Definition einer Klasse, die vontf.data.Dataset
Namen ArtificialDataset
erbt. Dieser Datensatz:
- Generiert
num_samples
Samples (Standard ist 3) - Schläft einige Zeit vor dem ersten Element, um das Öffnen einer Datei zu simulieren
- Schläft einige Zeit, bevor jedes Element erstellt wird, um das Lesen von Daten aus einer Datei zu simulieren
class ArtificialDataset(tf.data.Dataset):
def _generator(num_samples):
# Opening the file
time.sleep(0.03)
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
time.sleep(0.015)
yield (sample_idx,)
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
args=(num_samples,)
)
Dieser Datensatz ähnelt dem Datensatz tf.data.Dataset.range
und fügt zu Beginn und zwischen den einzelnen Stichproben eine feste Verzögerung hinzu.
Die Trainingsschleife
Schreiben Sie als Nächstes eine Dummy-Trainingsschleife, die misst, wie lange es dauert, einen Datensatz zu durchlaufen. Die Trainingszeit wird simuliert.
def benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
for sample in dataset:
# Performing a training step
time.sleep(0.01)
print("Execution time:", time.perf_counter() - start_time)
Leistung optimieren
Um zu zeigen, wie die Leistung optimiert werden kann, verbessern Sie die Leistung des ArtificialDataset
.
Der naive Ansatz
Beginnen Sie mit einer naiven Pipeline ohne Tricks und durchlaufen Sie den Datensatz so wie er ist.
benchmark(ArtificialDataset())
Execution time: 0.2541472299999441
Unter der Haube wurde Ihre Ausführungszeit folgendermaßen verbracht:
Die Darstellung zeigt, dass die Durchführung eines Trainingsschritts Folgendes umfasst:
- Öffnen einer Datei, falls diese noch nicht geöffnet wurde
- Abrufen eines Dateneintrags aus der Datei
- Verwendung der Daten für das Training
In einer naiven synchronen Implementierung wie hier bleibt Ihr Modell jedoch im Leerlauf, während Ihre Pipeline die Daten abruft. Umgekehrt befindet sich die Eingabepipeline während des Trainings im Leerlauf. Die Trainingsschrittzeit ist somit die Summe aus Öffnungs-, Lese- und Trainingszeiten.
Die nächsten Abschnitte bauen auf dieser Eingabe-Pipeline auf und veranschaulichen Best Practices für das Entwerfen performanter TensorFlow-Eingabe-Pipelines.
Prefetching
Das Vorabrufen überlappt die Vorverarbeitung und Modellausführung eines Trainingsschritts. Während das Modell die Trainingsschritte s
ausführt, liest die Eingabepipeline die Daten für die Schritte s+1
. Dadurch wird die Schrittzeit auf das Maximum (im Gegensatz zur Summe) des Trainings und die Zeit reduziert, die zum Extrahieren der Daten benötigt wird.
Die tf.data
API stellt die tf.data.Dataset.prefetch
Transformation. Es kann verwendet werden, um die Zeit, zu der Daten erzeugt werden, von der Zeit zu entkoppeln, zu der Daten verbraucht werden. Insbesondere verwendet die Transformation einen Hintergrundthread und einen internen Puffer, um Elemente aus dem Eingabedatensatz vorab abzurufen, bevor sie angefordert werden. Die Anzahl der Elemente, die vorab abgerufen werden sollen, sollte gleich (oder möglicherweise größer als) der Anzahl der Stapel sein, die von einem einzelnen Trainingsschritt verbraucht werden. Sie können diesen Wert entweder manuell tf.data.AUTOTUNE
oder auf tf.data.AUTOTUNE
, wodurch die Laufzeit von tf.data
aufgefordert wird, den Wert zur Laufzeit dynamisch zu tf.data
.
Beachten Sie, dass die Prefetch-Transformation immer dann Vorteile bietet, wenn die Möglichkeit besteht, die Arbeit eines "Produzenten" mit der Arbeit eines "Verbrauchers" zu überschneiden.
benchmark(
ArtificialDataset()
.prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.20805208699994182
Wie das Diagramm der Datenausführungszeit zeigt, liest die Eingabepipeline während des Trainingsschritts für Probe 0 die Daten für Probe 1 und so weiter.
Parallelisierung der Datenextraktion
In einer realen Umgebung können die Eingabedaten remote gespeichert werden (z. B. in Google Cloud Storage oder HDFS). Eine Dataset-Pipeline, die beim lokalen Lesen von Daten gut funktioniert, kann beim Remote-Lesen von Daten aufgrund der folgenden Unterschiede zwischen lokalem und Remote-Speicher zu einem Engpass bei der E / A führen:
- Zeit bis zum ersten Byte : Das Lesen des ersten Bytes einer Datei aus dem Remotespeicher kann um Größenordnungen länger dauern als aus dem lokalen Speicher.
- Lesedurchsatz : Während Remotespeicher normalerweise eine große Gesamtbandbreite bietet, kann das Lesen einer einzelnen Datei möglicherweise nur einen kleinen Bruchteil dieser Bandbreite nutzen.
Darüber hinaus kann es nach dem Laden der Rohbytes in den Speicher erforderlich sein, die Daten (z. B. Protobuf ) zu deserialisieren und / oder zu entschlüsseln, was zusätzliche Berechnungen erfordert. Dieser Overhead ist unabhängig davon vorhanden, ob die Daten lokal oder remote gespeichert werden, kann jedoch im Remote-Fall schlimmer sein, wenn die Daten nicht effektiv vorab abgerufen werden.
Um die Auswirkungen der verschiedenen Datenextraktionskosten zu tf.data.Dataset.interleave
, kann die Transformation tf.data.Dataset.interleave
verwendet werden, um den tf.data.Dataset.interleave
zu parallelisieren und den Inhalt anderer Datensätze (z. B. tf.data.Dataset.interleave
zu verschachteln. Die Anzahl der zu überlappenden Datasets kann durch das Argument cycle_length
angegeben werden, während der Grad der Parallelität durch das Argument num_parallel_calls
angegeben werden num_parallel_calls
. Ähnlich wie bei der prefetch
Transformation unterstützt die interleave
Transformation tf.data.AUTOTUNE
, wodurch die Entscheidung über die zu verwendende Parallelitätsebene an die Laufzeit von tf.data
wird.
Sequentielle Verschachtelung
Die Standardargumente der Transformation tf.data.Dataset.interleave
, dass einzelne Stichproben aus zwei Datensätzen nacheinander verschachtelt werden.
benchmark(
tf.data.Dataset.range(2)
.interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4883518669998921
Dieses Datenausführungszeitdiagramm ermöglicht es, das Verhalten der interleave
Transformation darzustellen und alternativ Stichproben aus den beiden verfügbaren Datensätzen abzurufen. Hierbei handelt es sich jedoch nicht um eine Leistungsverbesserung.
Parallele Verschachtelung
Verwenden Sie nun das Argument num_parallel_calls
der interleave
Transformation. Dadurch werden mehrere Datensätze parallel geladen, wodurch die Wartezeit für das Öffnen der Dateien verkürzt wird.
benchmark(
tf.data.Dataset.range(2)
.interleave(
lambda _: ArtificialDataset(),
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.26920967700016263
Dieses Mal wird, wie das Diagramm zur Datenausführungszeit zeigt, das Lesen der beiden Datensätze parallelisiert, wodurch die globale Datenverarbeitungszeit verkürzt wird.
Parallelisierung der Datentransformation
Bei der Vorbereitung von Daten müssen Eingabeelemente möglicherweise vorverarbeitet werden. Zu diesem Zweck bietet die tf.data
API die Transformation tf.data.Dataset.map
, die auf jedes Element des Eingabedatensatzes eine benutzerdefinierte Funktion anwendet. Da die Eingabeelemente unabhängig voneinander sind, kann die Vorverarbeitung über mehrere CPU-Kerne hinweg parallelisiert werden. Um dies zu ermöglichen, liefert die map
Transformation ähnlich wie die prefetch
und interleave
Transformationen das Argument num_parallel_calls
, um den Grad der Parallelität anzugeben.
Die Auswahl des besten Werts für das Argument num_parallel_calls
hängt von Ihrer Hardware, den Eigenschaften Ihrer Trainingsdaten (wie Größe und Form), den Kosten Ihrer Kartenfunktion und der gleichzeitigen anderen Verarbeitung auf der CPU ab. Eine einfache Heuristik besteht darin, die Anzahl der verfügbaren CPU-Kerne zu verwenden. Bei der prefetch
und interleave
Transformation unterstützt die map
Transformation jedoch tf.data.AUTOTUNE
wodurch die Entscheidung über die zu verwendende Parallelitätsstufe an die tf.data
Laufzeit tf.data
wird.
def mapped_function(s):
# Do some hard pre-processing
tf.py_function(lambda: time.sleep(0.03), [], ())
return s
Sequentielle Zuordnung
Verwenden Sie zunächst die map
ohne Parallelität als Basisbeispiel.
benchmark(
ArtificialDataset()
.map(mapped_function)
)
Execution time: 0.4379127629999857
Was den naiven Ansatz betrifft, so summieren sich hier, wie die Darstellung zeigt, die für das Öffnen, Lesen, Vorverarbeiten (Mapping) und Trainingsschritte aufgewendeten Zeiten zu einer einzigen Iteration.
Parallele Zuordnung
Verwenden Sie jetzt dieselbe Vorverarbeitungsfunktion, wenden Sie sie jedoch parallel auf mehrere Proben an.
benchmark(
ArtificialDataset()
.map(
mapped_function,
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.2747970279999663
Wie das Datenplot zeigt, überlappen sich die Vorverarbeitungsschritte, wodurch sich die Gesamtzeit für eine einzelne Iteration verringert.
Caching
Die Transformation tf.data.Dataset.cache
kann ein Dataset entweder im Speicher oder im lokalen Speicher zwischenspeichern. Dadurch wird verhindert, dass einige Vorgänge (wie das Öffnen von Dateien und das Lesen von Daten) während jeder Epoche ausgeführt werden.
benchmark(
ArtificialDataset()
.map( # Apply time consuming operations before cache
mapped_function
).cache(
),
5
)
Execution time: 0.3715158390000397
Hier zeigt das Diagramm der Datenausführungszeit, dass beim Zwischenspeichern eines Datensatzes die Transformationen vor dem cache
(wie das Öffnen von Dateien und das Lesen von Daten) nur während der ersten Epoche ausgeführt werden. In den nächsten Epochen werden die von der cache
Transformation zwischengespeicherten Daten wiederverwendet.
Wenn die benutzerdefinierte Funktion, die an die map
Transformation übergeben wird, teuer ist, wenden Sie die cache
Transformation nach der map
Transformation an, solange das resultierende Dataset noch in den Speicher oder in den lokalen Speicher passt. Wenn die benutzerdefinierte Funktion den zum Speichern des Datasets erforderlichen Speicherplatz über die Cache-Kapazität hinaus erhöht, wenden Sie sie entweder nach der cache
Umwandlung an oder ziehen Sie in Betracht, Ihre Daten vor Ihrem Schulungsjob vorzuverarbeiten, um den Ressourcenverbrauch zu verringern.
Vektorisierung der Zuordnung
Das Aufrufen einer benutzerdefinierten Funktion, die an die map
Transformation übergeben wird, ist mit dem Planen und Ausführen der benutzerdefinierten Funktion verbunden. Vektorisieren Sie die benutzerdefinierte Funktion (dh lassen Sie sie über einen Stapel von Eingaben gleichzeitig arbeiten) und wenden Sie die batch
vor der map
.
Um diese bewährte Methode zu veranschaulichen, ist Ihr künstlicher Datensatz nicht geeignet. Die Planungsverzögerung beträgt etwa 10 Mikrosekunden (10e-6 Sekunden), weit weniger als die im ArtificialDataset
verwendeten zehn Millisekunden, und daher ist ihre Auswirkung schwer zu erkennen.
Verwenden Sie in diesem Beispiel die tf.data.Dataset.range
und vereinfachen Sie die Trainingsschleife in ihrer einfachsten Form.
fast_dataset = tf.data.Dataset.range(10000)
def fast_benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for _ in tf.data.Dataset.range(num_epochs):
for _ in dataset:
pass
tf.print("Execution time:", time.perf_counter() - start_time)
def increment(x):
return x+1
Skalare Zuordnung
fast_benchmark(
fast_dataset
# Apply function one item at a time
.map(increment)
# Batch
.batch(256)
)
Execution time: 0.9082538790000854
Das obige Diagramm zeigt, was (mit weniger Stichproben) unter Verwendung der skalaren Zuordnungsmethode vor sich geht. Es zeigt, dass die abgebildete Funktion für jede Probe angewendet wird. Diese Funktion ist zwar sehr schnell, hat jedoch einen gewissen Overhead, der sich auf die Zeitleistung auswirkt.
Vektorisierte Zuordnung
fast_benchmark(
fast_dataset
.batch(256)
# Apply function on a batch of items
# The tf.Tensor.__add__ method already handle batches
.map(increment)
)
Execution time: 0.03624614399996062
Dieses Mal wird die zugeordnete Funktion einmal aufgerufen und gilt für eine Probencharge. Wie das Diagramm der Datenausführungszeit zeigt, wird die Ausführung der Funktion zwar länger dauern, der Overhead wird jedoch nur einmal angezeigt, wodurch die Gesamtzeitleistung verbessert wird.
Reduzierung des Speicherbedarfs
Eine Reihe von Transformationen, einschließlich interleave
, prefetch
und shuffle
, verwalten einen internen Puffer von Elementen. Wenn die benutzerdefinierte Funktion, die an die map
Transformation übergeben wird, die Größe der Elemente ändert, wirken sich die Reihenfolge der Map-Transformation und die Transformationen, die Elemente puffern, auf die Speichernutzung aus. Wählen Sie im Allgemeinen die Reihenfolge, die zu einem geringeren Speicherbedarf führt, es sei denn, eine andere Reihenfolge ist für die Leistung wünschenswert.
Teilberechnungen zwischenspeichern
Es wird empfohlen, das Dataset nach der map
Transformation zwischenzuspeichern, es sei denn, diese Transformation macht die Daten zu groß, um in den Speicher zu passen. Ein Kompromiss kann erzielt werden, wenn Ihre zugeordnete Funktion in zwei Teile aufgeteilt werden kann: einen zeitaufwändigen und einen speicheraufwendigen Teil. In diesem Fall können Sie Ihre Transformationen wie folgt verketten:
dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
Auf diese Weise wird der zeitaufwändige Teil nur während der ersten Epoche ausgeführt, und Sie vermeiden, zu viel Cache-Speicherplatz zu verwenden.
Best-Practice-Zusammenfassung
Hier ist eine Zusammenfassung der Best Practices für das Entwerfen performanter TensorFlow-Eingabepipelines:
- Verwenden Sie die
prefetch
Transformation , um die Arbeit eines Produzenten und eines Verbrauchers zu überlappen - Parallelisieren Sie die Datenlesetransformation mithilfe der
interleave
Transformation - Parallelisieren Sie die
map
Transformation, indem Sie das Argumentnum_parallel_calls
- Verwenden Sie die
cache
Transformation, um Daten während der ersten Epoche im Speicher zwischenzuspeichern - Vektorisieren Sie benutzerdefinierte Funktionen, die an die
map
werden - Reduzieren Sie die Speichernutzung beim Anwenden der
interleave
,prefetch
undshuffle
Transformationen
Wiedergabe der Figuren
Um das Verständnis dertf.data.Dataset
API zutf.data.Dataset
, können Sie mit Ihren eigenen Pipelines spielen. Unten finden Sie den Code, mit dem die Bilder aus diesem Handbuch gezeichnet werden. Es kann ein guter Ausgangspunkt sein und einige Problemumgehungen für häufig auftretende Schwierigkeiten aufzeigen, wie z.
- Reproduzierbarkeit der Ausführungszeit
- Zugeordnete Funktionen eifrige Ausführung
-
interleave
Transformation aufrufbar
import itertools
from collections import defaultdict
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
Der Datensatz
Ähnlich wie beim ArtificialDataset
Sie ein Dataset erstellen, das die in jedem Schritt verbrachte Zeit zurückgibt.
class TimeMeasuredDataset(tf.data.Dataset):
# OUTPUT: (steps, timings, counters)
OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
_INSTANCES_COUNTER = itertools.count() # Number of datasets generated
_EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset
def _generator(instance_idx, num_samples):
epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
# Opening the file
open_enter = time.perf_counter()
time.sleep(0.03)
open_elapsed = time.perf_counter() - open_enter
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
read_enter = time.perf_counter()
time.sleep(0.015)
read_elapsed = time.perf_counter() - read_enter
yield (
[("Open",), ("Read",)],
[(open_enter, open_elapsed), (read_enter, read_elapsed)],
[(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
)
open_enter, open_elapsed = -1., -1. # Negative values will be filtered
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_types=cls.OUTPUT_TYPES,
output_shapes=cls.OUTPUT_SHAPES,
args=(next(cls._INSTANCES_COUNTER), num_samples)
)
Dieser Datensatz enthält Beispiele für Form [[2, 1], [2, 2], [2, 3]]
und vom Typ [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]
. Jede Probe ist:
(
[("Open"), ("Read")],
[(t0, d), (t0, d)],
[(i, e, -1), (i, e, s)]
)
Wo:
-
Open
undRead
sind Schrittkennungen -
t0
ist der Zeitstempel, zu dem der entsprechende Schritt gestartet wurde -
d
ist die Zeit, die im entsprechenden Schritt verbracht wird -
i
ist der Instanzindex -
e
ist der Epochenindex (Häufigkeit, mit der der Datensatz iteriert wurde) -
s
ist der Beispielindex
Die Iterationsschleife
Machen Sie die Iterationsschleife etwas komplizierter, um alle Timings zusammenzufassen. Dies funktioniert nur mit Datensätzen, die Beispiele generieren, wie oben beschrieben.
def timelined_benchmark(dataset, num_epochs=2):
# Initialize accumulators
steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
epoch_enter = time.perf_counter()
for (steps, times, values) in dataset:
# Record dataset preparation informations
steps_acc = tf.concat((steps_acc, steps), axis=0)
times_acc = tf.concat((times_acc, times), axis=0)
values_acc = tf.concat((values_acc, values), axis=0)
# Simulate training time
train_enter = time.perf_counter()
time.sleep(0.01)
train_elapsed = time.perf_counter() - train_enter
# Record training informations
steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
epoch_elapsed = time.perf_counter() - epoch_enter
# Record epoch informations
steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
time.sleep(0.001)
tf.print("Execution time:", time.perf_counter() - start_time)
return {"steps": steps_acc, "times": times_acc, "values": values_acc}
Die Plotmethode
Definieren Sie abschließend eine Funktion, mit der eine Zeitleiste mit den von der Funktion timelined_benchmark zurückgegebenen Werten timelined_benchmark
kann.
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
# Remove invalid entries (negative times, or empty steps) from the timelines
invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
steps = timeline['steps'][invalid_mask].numpy()
times = timeline['times'][invalid_mask].numpy()
values = timeline['values'][invalid_mask].numpy()
# Get a set of different steps, ordered by the first time they are encountered
step_ids, indices = np.stack(np.unique(steps, return_index=True))
step_ids = step_ids[np.argsort(indices)]
# Shift the starting time to 0 and compute the maximal time value
min_time = times[:,0].min()
times[:,0] = (times[:,0] - min_time)
end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
cmap = mpl.cm.get_cmap("plasma")
plt.close()
fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
fig.suptitle(title)
fig.set_size_inches(17.0, len(step_ids))
plt.xlim(-0.01, end)
for i, step in enumerate(step_ids):
step_name = step.decode()
ax = axs[i]
ax.set_ylabel(step_name)
ax.set_ylim(0, 1)
ax.set_yticks([])
ax.set_xlabel("time (s)")
ax.set_xticklabels([])
ax.grid(which="both", axis="x", color="k", linestyle=":")
# Get timings and annotation for the given step
entries_mask = np.squeeze(steps==step)
serie = np.unique(times[entries_mask], axis=0)
annotations = values[entries_mask]
ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
if annotate:
for j, (start, width) in enumerate(serie):
annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
horizontalalignment='left', verticalalignment='center')
if save:
plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")
Verwenden Sie Wrapper für die zugeordnete Funktion
Um die zugeordnete Funktion in einem eifrigen Kontext auszuführen, müssen Sie sie in einen Aufruf von tf.py_function
.
def map_decorator(func):
def wrapper(steps, times, values):
# Use a tf.py_function to prevent auto-graph from compiling the method
return tf.py_function(
func,
inp=(steps, times, values),
Tout=(steps.dtype, times.dtype, values.dtype)
)
return wrapper
Pipelines Vergleich
_batch_map_num_items = 50
def dataset_generator_fun(*args):
return TimeMeasuredDataset(num_samples=_batch_map_num_items)
Naiv
@map_decorator
def naive_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001) # Time consuming step
time.sleep(0.0001) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, [["Map"]]), axis=0),
tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
tf.concat((values, [values[-1]]), axis=0)
)
naive_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.flat_map(dataset_generator_fun)
.map(naive_map)
.batch(_batch_map_num_items, drop_remainder=True)
.unbatch(),
5
)
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead Execution time: 12.445692234000035
Optimiert
@map_decorator
def time_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001 * values.shape[0]) # Time consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
@map_decorator
def memory_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.0001 * values.shape[0]) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
# Use tf.tile to handle batch dimension
return (
tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
optimized_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.interleave( # Parallelize data reading
dataset_generator_fun,
num_parallel_calls=tf.data.AUTOTUNE
)
.batch( # Vectorize your mapped function
_batch_map_num_items,
drop_remainder=True)
.map( # Parallelize map transformation
time_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.cache() # Cache data
.map( # Reduce memory usage
memory_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.prefetch( # Overlap producer and consumer works
tf.data.AUTOTUNE
)
.unbatch(),
5
)
Execution time: 6.326935971000012
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)