Повышение производительности с помощью tf.data API

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

Графические процессоры и TPU могут радикально сократить время, необходимое для выполнения одного шага обучения. Для достижения максимальной производительности требуется эффективный конвейер ввода, который доставляет данные для следующего шага до того, как текущий шаг будет завершен. tf.data API позволяет создавать гибкие и эффективные входные трубопроводы. Этот документ показывает , как использовать tf.data API для создания высокопроизводительных входного TensorFlow трубопроводов.

Перед тем, как продолжить, проверьте Сложение TensorFlow входные трубопроводы руководство , чтобы узнать , как использовать tf.data API.

Ресурсы

Настраивать

import tensorflow as tf

import time

В этом руководстве вы будете перебирать набор данных и измерять производительность. Создание воспроизводимых тестов производительности может быть трудным. Различные факторы, влияющие на воспроизводимость, включают:

  • Текущая загрузка процессора
  • Сетевой трафик
  • Сложные механизмы, такие как кеш

Чтобы получить воспроизводимый тест, вы создадите искусственный пример.

Набор данных

Начнем с определения класса , унаследованный от tf.data.Dataset называется ArtificialDataset . Этот набор данных:

  • Создает num_samples образцов ( по умолчанию 3)
  • Спит некоторое время перед первым элементом, чтобы имитировать открытие файла
  • Спит некоторое время перед созданием каждого элемента для имитации чтения данных из файла
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Этот набор данные похожи на tf.data.Dataset.range один, добавив фиксированную задержку в начале и в промежутках между каждой пробой.

Цикл обучения

Затем напишите фиктивный цикл обучения, который измеряет, сколько времени требуется для перебора набора данных. Время тренировки смоделировано.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Оптимизировать производительность

Для того, чтобы демонстрировать , как производительность может быть оптимизирована, вы улучшите производительность ArtificialDataset .

Наивный подход

Начните с простого конвейера без каких-либо уловок, перебирая набор данных как есть.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

Под капотом вот как было потрачено ваше время выполнения:

График выполнения данных - наивный метод

Сюжет показывает, что выполнение обучающего шага предполагает:

  • Открытие файла, если он еще не открыт
  • Получение записи данных из файла
  • Использование данных для обучения

Однако в такой наивной синхронной реализации, как здесь, пока ваш конвейер получает данные, ваша модель простаивает. И наоборот, пока ваша модель обучается, конвейер ввода простаивает. Таким образом, время шага обучения - это сумма времени открытия, чтения и тренировки.

Следующие разделы основаны на этом конвейере ввода, иллюстрируя передовые методы проектирования производительных конвейеров ввода TensorFlow.

Предварительная загрузка

Предварительная выборка перекрывает предварительную обработку и выполнение модели на этапе обучения. В то время как модель выполняет учебный шаг s , входной трубопровод считывает данные шаг s+1 . Это сокращает время шага до максимума (в отличие от суммы) обучения и времени, необходимого для извлечения данных.

tf.data API обеспечивает tf.data.Dataset.prefetch преобразование. Его можно использовать для разделения времени, когда данные производятся, от времени, когда данные потребляются. В частности, преобразование использует фоновый поток и внутренний буфер для предварительной выборки элементов из входного набора данных до того, как они будут запрошены. Количество элементов для предварительной выборки должно быть равно (или, возможно, больше) количеству пакетов, потребляемых одним шагом обучения. Можно либо вручную настроить это значение, или установите его в tf.data.AUTOTUNE , который предложит tf.data выполнения для настройки значения динамически во время выполнения.

Обратите внимание, что преобразование предварительной выборки дает преимущества в любое время, когда есть возможность перекрыть работу «производителя» с работой «потребителя».

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

График времени выполнения данных - метод предварительной выборки

Теперь, как показано на графике времени выполнения данных, пока выполняется шаг обучения для образца 0, входной конвейер считывает данные для образца 1 и так далее.

Распараллеливание извлечения данных

В реальных условиях входные данные могут храниться удаленно (например, в Google Cloud Storage или HDFS). Конвейер набора данных, который хорошо работает при локальном чтении данных, может стать узким местом ввода-вывода при удаленном чтении данных из-за следующих различий между локальным и удаленным хранилищем:

  • Время до первого байта: Чтение первого байта файла из удаленного хранилища может принимать порядки больше , чем из локального хранилища.
  • Читайте пропускную способность : В то время как удаленное хранение , как правило , предлагает большую суммарную пропускную способность, чтение одного файла может только быть в состоянии использовать небольшую часть этой полосы пропускания.

Кроме того, как только исходные байты загружаются в память, он также может быть необходимо для десериализации и / или расшифровать данные (например , Protobuf ), что требует дополнительных вычислений. Эти накладные расходы присутствуют независимо от того, хранятся ли данные локально или удаленно, но могут быть хуже в удаленном случае, если данные предварительно не загружаются эффективно.

Для того, чтобы смягчить влияние различных накладных расходов извлечения данных, то tf.data.Dataset.interleave преобразование может быть использовано для параллелизации шага загрузки данных, перемежение содержимых других наборов данных (например, читатели файлов данных). Количество наборов данных для перекрытия может быть задано с помощью cycle_length аргумента, а уровень параллелизма может быть определен с помощью num_parallel_calls аргумента. Подобно prefetch преобразование, interleave преобразование поддерживает tf.data.AUTOTUNE , который будет делегировать решение о том, каком уровне параллельности использования к tf.data выполнению.

Последовательное чередование

Аргументы по умолчанию для tf.data.Dataset.interleave трансформации делают его перемежать единичные образцы из двух наборов данных последовательно.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

График времени выполнения данных - последовательное чередование

Это исполнение данных в реальное время график позволяет демонстрировать поведение interleave преобразования выборки образцов в качестве альтернативы из двух наборов данных , доступных. Однако здесь не происходит никакого улучшения производительности.

Параллельное чередование

Теперь, используйте num_parallel_calls аргумент interleave трансформации. Это загружает несколько наборов данных параллельно, сокращая время ожидания открытия файлов.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

График времени выполнения данных - метод параллельного чередования

На этот раз, как показано на графике времени выполнения данных, чтение двух наборов данных распараллеливается, что сокращает время обработки глобальных данных.

Распараллеливание преобразования данных

При подготовке данных может потребоваться предварительная обработка входных элементов. С этой целью, tf.data API предлагает tf.data.Dataset.map преобразование, который применяет определенный пользователь функцию к каждому элементу входного набора данных. Поскольку элементы ввода не зависят друг от друга, предварительная обработка может быть распараллелена между несколькими ядрами ЦП. Чтобы сделать это возможным, аналогична prefetch и interleave трансформации, map преобразование обеспечивает num_parallel_calls аргумент , чтобы задать уровень параллелизма.

Выбор оптимального значения для num_parallel_calls аргумента зависит от вашего оборудования, характеристики ваших данных обучения (например, его размера и формы), стоимость вашей функции карты, и что другая обработка происходит на процессоре одновременно. Простая эвристика - использовать количество доступных ядер ЦП. Однако, как и для prefetch и interleave преобразования, map преобразования поддерживает tf.data.AUTOTUNE , который будет делегировать решение о том, каком уровне параллельности использования к tf.data выполнению.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Последовательное отображение

Начните с использованием map преобразования без параллельности в качестве базового примера.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

График времени выполнения данных - метод последовательного сопоставления

Что касается наивный подход , здесь, как сюжет шоу, время , затраченное на открытие, чтение, предварительная обработка (отображение) и учебные шаги суммируются для одной итерации.

Параллельное отображение

Теперь используйте ту же функцию предварительной обработки, но примените ее параллельно к нескольким образцам.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

Время выполнения данных - параллельное отображение

Как видно из графика данных, этапы предварительной обработки перекрываются, что сокращает общее время одной итерации.

Кеширование

tf.data.Dataset.cache преобразование может кэшировать набор данных, либо в памяти или на локальном запоминающем устройстве. Это убережет некоторые операции (например, открытие файла и чтение данных) от выполнения в каждую эпоху.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

Время выполнения данных - метод кэшированного набора данных

Здесь, исполнительные данных время график показывает , что при кэшировании набора данных, преобразование перед cache один (например , открытие файла и чтение данных) выполняется только в течение первой эпохи. Следующие эпохами будут повторно использовать данные кэшируются в cache - преобразования.

Если определенный пользователь функция , переданная в map преобразование является дорогостоящей, применить cache преобразование после map преобразования, пока полученный набор данные все еще могут поместиться в память или локальное хранилище. Если определенный пользователь функция увеличивает пространство , необходимое для хранения набора данных за пределами емкости кэша - памяти, либо применить его после cache преобразования или рассмотреть для предварительной обработки данных перед вашей тренировочной работой , чтобы уменьшить использование ресурсов.

Векторизация отображения

Вызов пользовательской функции передается в map трансформации накладных расходов , связанных с планированием и выполнением пользовательской функции. Векторизовать пользователем функции (то есть, есть он работает над серией входов на один раз) и применить batch преобразование перед map преобразования.

Чтобы проиллюстрировать эту передовую практику, ваш искусственный набор данных не подходит. Задержка планирования составляет около 10 микросекунд (10e-6 секунд), гораздо меньше , чем несколько десятков миллисекунд , используемых в ArtificialDataset , и , следовательно , его воздействие трудно увидеть.

Для этого примера использовать базовую tf.data.Dataset.range функцию и упростить цикл подготовки к своей простейшей форме.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Скалярное отображение

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

Время выполнения данных - метод скалярной карты

На приведенном выше графике показано, что происходит (с меньшим количеством выборок) с использованием метода скалярного сопоставления. Он показывает, что отображенная функция применяется для каждого образца. Хотя эта функция работает очень быстро, у нее есть некоторые накладные расходы, влияющие на временные характеристики.

Векторизованное отображение

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

Время выполнения данных - метод векторизации карты

На этот раз сопоставленная функция вызывается один раз и применяется к партии выборки. Как показано на графике времени выполнения данных, хотя для выполнения функции может потребоваться больше времени, накладные расходы появляются только один раз, улучшая общую временную производительность.

Уменьшение объема памяти

Целый ряд преобразований, в том числе с interleave , prefetch и shuffle , поддерживать внутренний буфер элементов. Если пользовательская функция передается в map преобразование изменяет размер элементов, то упорядочения преобразования карты и преобразования , которые буферные элементы влияют на использовании памяти. В общем, выбирайте порядок, который приводит к меньшему объему памяти, если другой порядок не желателен для производительности.

Кеширование частичных вычислений

Рекомендуется кэшировать набор данных после map преобразования , за исключением , если это преобразование делает данные слишком большой , чтобы поместиться в памяти. Компромисс может быть достигнут, если ваша отображаемая функция может быть разделена на две части: часть, занимающую много времени, и часть, потребляющую память. В этом случае вы можете связать свои преобразования, как показано ниже:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Таким образом, трудоемкая часть выполняется только в течение первой эпохи, и вы избегаете использования слишком большого объема кеш-памяти.

Сводка передовой практики

Вот краткое изложение лучших практик для проектирования производительных входных конвейеров TensorFlow:

Воспроизведение фигур

Для того, чтобы идти глубже в tf.data.Dataset понимания API, вы можете играть со своими собственными трубопроводами. Ниже приведен код, используемый для построения изображений из этого руководства. Это может быть хорошей отправной точкой, показывая некоторые обходные пути для типичных трудностей, таких как:

  • Воспроизводимость времени выполнения
  • Отображенные функции, нетерпеливые к выполнению
  • interleave вызываемым преобразования
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Набор данных

Подобно ArtificialDataset вы можете создать набор данных , возвращая время , проведенное на каждом шаге.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Этот набор данных содержит образцы формы [[2, 1], [2, 2], [2, 3]] и типа [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Каждый образец:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Где:

  • Open и Read являются идентификаторами шагов
  • t0 является меткой времени , когда соответствующий этап начала
  • d это время , проведенное в соответствующем шаге
  • i это индекс экземпляра
  • e является индекс эпохи (число раз набор данных был итерированным)
  • s является индексом выборки

Итерационный цикл

Сделайте итерационный цикл немного сложнее, чтобы агрегировать все тайминги. Это будет работать только с наборами данных, генерирующими образцы, как описано выше.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Метод построения

И, наконец, определить функцию , способную построить график заданного значения , возвращаемое timelined_benchmark функции.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Используйте оболочки для отображаемой функции

Для запуска отображенных функций в нетерпеливом контексте, вы должны обернуть их внутри tf.py_function вызова.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Сравнение трубопроводов

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Наивный

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

Оптимизировано

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

PNG

draw_timeline(optimized_timeline, "Optimized", 15)

PNG