De meilleures performances avec l'API tf.data

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Les GPU et les TPU peuvent réduire considérablement le temps nécessaire pour exécuter une seule étape de formation. Atteindre des performances optimales nécessite un pipeline d'entrée efficace qui fournit des données pour l'étape suivante avant la fin de l'étape en cours. La tf.data API aide à construire des pipelines d'entrée flexibles et efficaces. Ce document montre comment utiliser la tf.data API pour construire des pipelines d'entrée tensorflow très performants.

Avant de continuer, vérifiez les conduites d'entrée tensorflow Build guide pour apprendre comment utiliser l' tf.data API.

Ressources

Installer

import tensorflow as tf

import time
2021-08-02 22:23:29.073040: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0

Tout au long de ce guide, vous allez parcourir un ensemble de données et mesurer les performances. Il peut être difficile d'établir des références de performances reproductibles. Les différents facteurs affectant la reproductibilité comprennent :

  • La charge CPU actuelle
  • Le trafic réseau
  • Mécanismes complexes, tels que le cache

Pour obtenir un benchmark reproductible, vous allez construire un exemple artificiel.

L'ensemble de données

Commencez par définir une classe héritant de tf.data.Dataset appelé ArtificialDataset . Cet ensemble de données :

  • Génère num_samples échantillons (valeur par défaut est 3)
  • Dort un certain temps avant le premier élément pour simuler l'ouverture d'un fichier
  • Dort un certain temps avant de produire chaque élément pour simuler la lecture des données d'un fichier
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Cette base de données est similaire à la tf.data.Dataset.range une, en ajoutant un retard fixe au début de et entre chaque échantillon.

La boucle d'entraînement

Ensuite, écrivez une boucle d'entraînement factice qui mesure le temps qu'il faut pour itérer sur un ensemble de données. Le temps de formation est simulé.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Optimiser les performances

Pour montrer comment la performance peut être optimisé, vous permettra d' améliorer les performances du ArtificialDataset .

L'approche naïve

Commencez avec un pipeline naïf sans astuce, en itérant sur l'ensemble de données tel quel.

benchmark(ArtificialDataset())
2021-08-02 22:23:30.330329: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-08-02 22:23:30.994715: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:30.995801: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:00:05.0 name: Tesla V100-SXM2-16GB computeCapability: 7.0
coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 15.78GiB deviceMemoryBandwidth: 836.37GiB/s
2021-08-02 22:23:30.995843: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-08-02 22:23:30.999616: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
2021-08-02 22:23:30.999716: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11
2021-08-02 22:23:31.000850: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcufft.so.10
2021-08-02 22:23:31.001150: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcurand.so.10
2021-08-02 22:23:31.002238: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcusolver.so.11
2021-08-02 22:23:31.003310: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcusparse.so.11
2021-08-02 22:23:31.003523: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudnn.so.8
2021-08-02 22:23:31.003637: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.004573: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.005494: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1871] Adding visible gpu devices: 0
2021-08-02 22:23:31.007055: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-02 22:23:31.007607: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.008471: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:00:05.0 name: Tesla V100-SXM2-16GB computeCapability: 7.0
coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 15.78GiB deviceMemoryBandwidth: 836.37GiB/s
2021-08-02 22:23:31.008546: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.009525: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.010518: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1871] Adding visible gpu devices: 0
2021-08-02 22:23:31.010562: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-08-02 22:23:31.626538: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1258] Device interconnect StreamExecutor with strength 1 edge matrix:
2021-08-02 22:23:31.626582: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1264]      0 
2021-08-02 22:23:31.626591: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1277] 0:   N 
2021-08-02 22:23:31.626808: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.627879: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.628713: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-02 22:23:31.629565: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1418] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 14646 MB memory) -> physical GPU (device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0)
2021-08-02 22:23:31.669504: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-08-02 22:23:31.669989: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000194999 Hz
Execution time: 0.2567654919985216

Sous le capot, voici comment votre temps d'exécution a été dépensé :

Tracé du temps d'exécution des données - une méthode naïve

Le graphique montre que l'exécution d'une étape d'entraînement implique :

  • Ouvrir un fichier s'il n'a pas encore été ouvert
  • Récupérer une entrée de données à partir du fichier
  • Utiliser les données pour la formation

Cependant, dans une implémentation synchrone naïve comme ici, pendant que votre pipeline récupère les données, votre modèle est inactif. Inversement, pendant que votre modèle s'entraîne, le pipeline d'entrée est inactif. Le temps de l'étape d'apprentissage est donc la somme des temps d'ouverture, de lecture et d'apprentissage.

Les sections suivantes s'appuient sur ce pipeline d'entrée, illustrant les bonnes pratiques pour la conception de pipelines d'entrée TensorFlow performants.

Prélecture

La prélecture chevauche le prétraitement et l'exécution du modèle d'une étape d'apprentissage. Bien que le modèle exécute l' étape de formation s la canalisation d'entrée est en train de lire les données de l' étape s+1 . Cela réduit le temps de pas au maximum (par opposition à la somme) de la formation et le temps nécessaire pour extraire les données.

L' tf.data API fournit la tf.data.Dataset.prefetch transformation. Il peut être utilisé pour découpler le moment où les données sont produites du moment où les données sont consommées. En particulier, la transformation utilise un thread d'arrière-plan et un tampon interne pour extraire les éléments de l'ensemble de données d'entrée avant le moment où ils sont demandés. Le nombre d'éléments à extraire doit être égal (ou éventuellement supérieur) au nombre de lots consommés par une seule étape d'apprentissage. Vous pouvez soit régler manuellement cette valeur, ou mis à tf.data.AUTOTUNE , qui demandera à l' tf.data d' exécution pour régler la valeur dynamiquement lors de l' exécution.

Notez que la transformation de prélecture offre des avantages chaque fois qu'il y a une opportunité de chevaucher le travail d'un « producteur » avec le travail d'un « consommateur ».

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.20864751600311138

Tracé du temps d'exécution des données - méthode de prélecture

Maintenant, comme le montre le graphique du temps d'exécution des données, pendant que l'étape d'apprentissage s'exécute pour l'échantillon 0, le pipeline d'entrée lit les données pour l'échantillon 1, et ainsi de suite.

Paralléliser l'extraction de données

Dans un environnement réel, les données d'entrée peuvent être stockées à distance (par exemple, sur Google Cloud Storage ou HDFS). Un pipeline d'ensemble de données qui fonctionne bien lors de la lecture de données localement peut devenir goulot d'étranglement sur les E/S lors de la lecture de données à distance en raison des différences suivantes entre le stockage local et distant :

  • Time-to-first-octet: La lecture du premier octet d'un fichier de stockage distant peut prendre des ordres de grandeur plus que du stockage local.
  • Lire le débit: Bien que le stockage à distance offre généralement une large bande passante globale, la lecture d' un seul fichier peut seulement être en mesure d'utiliser une petite fraction de cette bande passante.

De plus, une fois que les octets bruts sont chargés en mémoire, il peut également être nécessaire pour désérialiser et / ou décrypter les données (par exemple protobuf ), ce qui nécessite le calcul supplémentaire. Cette surcharge est présente indépendamment du fait que les données soient stockées localement ou à distance, mais peut être pire dans le cas distant si les données ne sont pas pré-extraites efficacement.

Pour atténuer les effets des divers frais généraux d'extraction de données, la tf.data.Dataset.interleave transformation peut être utilisée pour paralléliser l'étape de chargement de données, entrelacer le contenu d'autres ensembles de données (tels que lecteur de fichiers de données). Le nombre de jeux de données à chevauchement peut être spécifié par l' cycle_length thèse, tandis que le niveau de parallélisme peut être spécifié par le num_parallel_calls argument. Semblable à la prefetch transformation, la interleave transformation soutient tf.data.AUTOTUNE , qui délègue la décision à ce niveau de parallélisme à utiliser pour la tf.data exécution.

Entrelacement séquentiel

Les arguments par défaut de la tf.data.Dataset.interleave transformation rendent entrelacer les échantillons uniques à partir de deux jeux de données de manière séquentielle.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4843995549963438

Tracé du temps d'exécution des données - entrelacement séquentiel

Cette parcelle de temps d'exécution des données permet d'exposer le comportement de la interleave transformation, aller chercher des échantillons alternativement des deux ensembles de données disponibles. Cependant, aucune amélioration des performances n'est impliquée ici.

Entrelacement parallèle

Maintenant, utilisez le num_parallel_calls argument de la interleave transformation. Cela charge plusieurs ensembles de données en parallèle, réduisant le temps d'attente pour l'ouverture des fichiers.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.27614611799799604

Tracé du temps d'exécution des données - méthode d'entrelacement parallèle

Cette fois, comme le montre le graphique du temps d'exécution des données, la lecture des deux jeux de données est parallélisée, réduisant ainsi le temps de traitement global des données.

Paralléliser la transformation des données

Lors de la préparation des données, les éléments d'entrée peuvent avoir besoin d'être prétraités. A cet effet, les tf.data offres de l' API la tf.data.Dataset.map de transformation, qui applique une fonction définie par l' utilisateur à chaque élément de l'ensemble de données d'entrée. Étant donné que les éléments d'entrée sont indépendants les uns des autres, le prétraitement peut être parallélisé sur plusieurs cœurs de processeur. Pour que cela soit possible, de manière similaire aux prefetch et interleave transformations, la map transformation fournit l' num_parallel_calls argument pour spécifier le niveau de parallélisme.

Le choix de la meilleure valeur pour l' num_parallel_calls l' argument dépend de votre matériel, les caractéristiques de vos données de formation (tels que la taille et la forme), le coût de votre fonction de la carte, et quel autre traitement se passe sur le CPU en même temps. Une heuristique simple consiste à utiliser le nombre de cœurs de processeur disponibles. Cependant, comme pour la prefetch et interleave transformation, la map transformation prend en charge tf.data.AUTOTUNE qui déléguera la décision de ce niveau de parallélisme à utiliser pour la tf.data exécution.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Cartographie séquentielle

Commencez par utiliser la map de transformation sans parallélisme comme exemple de référence.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4389583010051865

Tracé du temps d'exécution des données - méthode de mappage séquentiel

Quant à l' approche naïve , ici, que l'intrigue montre, le temps passé pour l' ouverture, la lecture, pré-traitement (cartographie) et les étapes de formation ainsi que la somme pour une seule itération.

Cartographie parallèle

Maintenant, utilisez la même fonction de pré-traitement mais appliquez-la en parallèle sur plusieurs échantillons.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2734931380036869

Temps d'exécution des données - mappage parallèle

Comme le montre le graphique des données, les étapes de pré-traitement se chevauchent, ce qui réduit le temps global pour une seule itération.

Mise en cache

La tf.data.Dataset.cache transformation peut mettre en cache un ensemble de données, que ce soit dans la mémoire ou sur le stockage local. Cela évitera que certaines opérations (comme l'ouverture de fichiers et la lecture de données) soient exécutées à chaque époque.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.369826126996486

Temps d'exécution des données - méthode de jeu de données en cache

Ici, l'intrigue temps l' exécution des données montre que lorsque vous mettez en cache un ensemble de données, les transformations avant le cache un (comme l'ouverture de fichiers et la lecture des données) ne sont exécutées que pendant la première époque. Les prochaines époques réutilise les données mises en cache par le cache de transformation.

Si la fonction définie par l' utilisateur passé dans la map transformation est coûteuse, appliquez le cache de transformation après la map de transformation tant que le jeu de données résultant peut encore entrer dans la mémoire ou le stockage local. Si la fonction définie par l' utilisateur augmente l'espace nécessaire pour stocker l'ensemble de données au - delà de la capacité de cache, soit l' appliquer après la cache de transformation ou d' envisager une pré-traitement de vos données avant votre travail de formation afin de réduire l' utilisation des ressources.

Cartographie de vectorisation

L' invocation d' une fonction définie par l' utilisateur passé dans la map transformation a frais généraux liés à la planification et l' exécution de la fonction définie par l' utilisateur. Vectoriser la fonction définie par l' utilisateur (qui est, le faire fonctionner sur un lot d'entrées à la fois) et appliquer le batch transformation avant le map de transformation.

Pour illustrer cette bonne pratique, votre jeu de données artificiel n'est pas adapté. Le retard d'ordonnancement est d' environ 10 microsecondes (10E-6 secondes), beaucoup moins que les dizaines de millisecondes utilisés dans la ArtificialDataset , et donc son impact est difficile à voir.

Pour cet exemple, utilisez la base tf.data.Dataset.range fonction et de simplifier la boucle de formation à sa forme la plus simple.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Cartographie scalaire

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2540382809966104

Temps d'exécution des données - méthode de la carte scalaire

Le graphique ci-dessus illustre ce qui se passe (avec moins d'échantillons) en utilisant la méthode de mappage scalaire. Il montre que la fonction mappée est appliquée pour chaque échantillon. Bien que cette fonction soit très rapide, elle a une certaine surcharge qui a un impact sur les performances temporelles.

Cartographie vectorisée

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.025890030003210995

Temps d'exécution des données - méthode de la carte vectorisée

Cette fois, la fonction mappée est appelée une fois et s'applique à un lot d'échantillons. Comme le montre le graphique du temps d'exécution des données, bien que l'exécution de la fonction puisse prendre plus de temps, la surcharge n'apparaît qu'une seule fois, ce qui améliore les performances temporelles globales.

Réduction de l'empreinte mémoire

Un certain nombre de transformations, notamment interleave , prefetch , et shuffle , à maintenir une mémoire tampon interne des éléments. Si la fonction définie par l' utilisateur passé dans la map transformation modifie la taille des éléments, puis l'ordre de la transformation de la carte et les transformations que les éléments tampons affecte l'utilisation de la mémoire. En général, choisissez l'ordre qui se traduit par une empreinte mémoire plus faible, à moins qu'un ordre différent ne soit souhaitable pour les performances.

Mise en cache des calculs partiels

Il est recommandé de mettre en cache l'ensemble de données après la map de transformation , sauf si cette transformation rend les données trop grand pour la mémoire. Un compromis peut être obtenu si votre fonction mappée peut être divisée en deux parties : une partie qui prend du temps et une partie qui consomme de la mémoire. Dans ce cas, vous pouvez enchaîner vos transformations comme ci-dessous :

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

De cette façon, la partie chronophage n'est exécutée que pendant la première époque, et vous évitez d'utiliser trop d'espace de cache.

Résumé des bonnes pratiques

Voici un résumé des bonnes pratiques pour la conception de pipelines d'entrée TensorFlow performants :

Reproduire les chiffres

Pour aller plus loin dans la tf.data.Dataset compréhension de l' API, vous pouvez jouer avec vos propres pipelines. Vous trouverez ci-dessous le code utilisé pour tracer les images de ce guide. Cela peut être un bon point de départ, montrant quelques solutions de contournement pour les difficultés courantes telles que :

  • Reproductibilité du temps d'exécution
  • Exécution impatiente des fonctions mappées
  • interleave appelable de transformation
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

L'ensemble de données

Semblable à la ArtificialDataset vous pouvez construire un ensemble de données de retour le temps passé à chaque étape.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Cet ensemble de données fournit des échantillons de la forme [[2, 1], [2, 2], [2, 3]] et le type [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Chaque échantillon est :

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Où:

  • Open et Read sont les étapes d' identification
  • t0 est l'estampille temporelle lorsque l'étape correspondante commence
  • d est le temps passé à l'étape correspondante
  • i est l'index d'instance
  • e est l'indice d'époque (nombre de fois l'ensemble de données a été réitérée)
  • s est l'indice de l' échantillon

La boucle d'itération

Rendez la boucle d'itération un peu plus compliquée pour agréger tous les timings. Cela ne fonctionnera qu'avec les ensembles de données générant des échantillons comme détaillé ci-dessus.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

La méthode de traçage

Enfin, définir une fonction capable de tracer une ligne de temps étant donné les valeurs renvoyées par la timelined_benchmark fonction.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Utiliser des wrappers pour la fonction mappée

Pour exécuter la fonction mappée dans un contexte avide, vous devez les envelopper dans un tf.py_function appel.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Comparaison des pipelines

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Naïve

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_18171/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_18171/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 12.421246555000835

Optimisé

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.333646917002625
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png