Google I / O возвращается 18-20 мая! Зарезервируйте место и составьте свое расписание Зарегистрируйтесь сейчас

Лучшая производительность с tf.data API

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

Графические процессоры и TPU могут радикально сократить время, необходимое для выполнения одного шага обучения. Для достижения максимальной производительности требуется эффективный конвейер ввода, который доставляет данные для следующего шага до того, как текущий шаг будет завершен. API tf.data помогает создавать гибкие и эффективные конвейеры ввода. В этом документе показано, как использовать API tf.data для создания высокопроизводительных конвейеров ввода tf.data .

Прежде чем продолжить, ознакомьтесь с руководством по tf.data входных конвейеров tf.data чтобы узнать, как использовать tf.data API.

Ресурсы

Настраивать

import tensorflow as tf

import time

В этом руководстве вы будете перебирать набор данных и измерять производительность. Создание воспроизводимых тестов производительности может быть трудным. Различные факторы, влияющие на воспроизводимость, включают:

  • Текущая загрузка процессора
  • Сетевой трафик
  • Сложные механизмы, такие как кеш

Чтобы получить воспроизводимый тест, вы создадите искусственный пример.

Набор данных

Начните с определения класса, унаследованного отtf.data.Dataset под названием ArtificialDataset . Этот набор данных:

  • Создает образцы num_samples (по умолчанию 3)
  • Спит некоторое время перед первым элементом, чтобы имитировать открытие файла
  • Спит некоторое время перед созданием каждого элемента для имитации чтения данных из файла
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Этот набор данных похож на tf.data.Dataset.range , добавляя фиксированную задержку в начале и между каждым отсчетом.

Цикл обучения

Затем напишите фиктивный цикл обучения, который измеряет, сколько времени требуется для перебора набора данных. Время тренировки смоделировано.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Оптимизировать производительность

Чтобы продемонстрировать, как можно оптимизировать производительность, вы улучшите производительность ArtificialDataset .

Наивный подход

Начните с простого конвейера без каких-либо уловок, перебирая набор данных как есть.

benchmark(ArtificialDataset())
Execution time: 0.2541472299999441

Под капотом вот как было потрачено ваше время выполнения:

График выполнения данных - наивный метод

Сюжет показывает, что выполнение обучающего шага предполагает:

  • Открытие файла, если он еще не открыт
  • Получение записи данных из файла
  • Использование данных для обучения

Однако в такой наивной синхронной реализации, как здесь, пока ваш конвейер получает данные, ваша модель простаивает. И наоборот, пока ваша модель обучается, конвейер ввода простаивает. Таким образом, время шага обучения - это сумма времени открытия, чтения и тренировки.

Следующие разделы основаны на этом конвейере ввода, иллюстрируя передовые методы проектирования производительных конвейеров ввода TensorFlow.

Предварительная загрузка

Предварительная выборка перекрывает предварительную обработку и выполнение модели на этапе обучения. Пока модель выполняет шаг обучения s , входной конвейер считывает данные для шага s+1 . Это сокращает время шага до максимума (в отличие от суммы) обучения и времени, необходимого для извлечения данных.

API tf.data предоставляет преобразование tf.data.Dataset.prefetch . Его можно использовать для разделения времени, когда данные производятся, от времени, когда данные потребляются. В частности, преобразование использует фоновый поток и внутренний буфер для предварительной выборки элементов из входного набора данных до того, как они будут запрошены. Количество элементов для предварительной выборки должно быть равно (или, возможно, больше) количеству пакетов, потребляемых одним шагом обучения. Вы можете либо вручную настроить это значение, либо установить его на tf.data.AUTOTUNE , что побудит tf.data выполнения tf.data настроить значение динамически во время выполнения.

Обратите внимание, что преобразование предварительной выборки дает преимущества всякий раз, когда есть возможность перекрыть работу «производителя» с работой «потребителя».

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.20805208699994182

График времени выполнения данных - метод предварительной выборки

Теперь, как показано на графике времени выполнения данных, пока выполняется шаг обучения для образца 0, входной конвейер считывает данные для образца 1 и так далее.

Распараллеливание извлечения данных

В реальных условиях входные данные могут храниться удаленно (например, в Google Cloud Storage или HDFS). Конвейер набора данных, который хорошо работает при локальном чтении данных, может стать узким местом при вводе-выводе при удаленном чтении данных из-за следующих различий между локальным и удаленным хранилищами:

  • Время до первого байта : чтение первого байта файла из удаленного хранилища может занять на порядки больше времени, чем из локального.
  • Читайте пропускную способность : В то время как удаленное хранение , как правило , предлагает большую суммарную пропускную способность, чтение одного файла может только быть в состоянии использовать небольшую часть этой полосы пропускания.

Кроме того, как только необработанные байты загружены в память, может также потребоваться десериализация и / или дешифрование данных (например, protobuf ), что потребует дополнительных вычислений. Эти накладные расходы присутствуют независимо от того, хранятся ли данные локально или удаленно, но могут быть хуже в удаленном случае, если данные предварительно не загружаются эффективно.

Чтобы уменьшить влияние различных накладных расходов на извлечение данных, можно использовать преобразование tf.data.Dataset.interleave для распараллеливания этапа загрузки данных, чередуя содержимое других наборов данных (например, считывателей файлов данных). Количество перекрывающихся наборов данных можно указать аргументом cycle_length , а уровень параллелизма можно указать аргументом num_parallel_calls . Подобно преобразованию prefetch преобразование interleave поддерживает tf.data.AUTOTUNE , который делегирует решение о том, какой уровень параллелизма использовать tf.data выполнения tf.data .

Последовательное чередование

Аргументы по умолчанию для преобразования tf.data.Dataset.interleave позволяют последовательно чередовать отдельные выборки из двух наборов данных.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4883518669998921

График времени выполнения данных - последовательное чередование

Этот график времени выполнения данных позволяет продемонстрировать поведение преобразования interleave , поочередно выбирая выборки из двух доступных наборов данных. Однако здесь не происходит никакого улучшения производительности.

Параллельное чередование

Теперь используйте аргумент num_parallel_calls преобразования interleave . Это загружает несколько наборов данных параллельно, сокращая время ожидания открытия файлов.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.26920967700016263

График времени выполнения данных - метод параллельного чередования

На этот раз, как показывает график времени выполнения данных, чтение двух наборов данных распараллеливается, что сокращает время обработки глобальных данных.

Распараллеливание преобразования данных

При подготовке данных может потребоваться предварительная обработка входных элементов. Для этого tf.data API предлагает преобразование tf.data.Dataset.map , которое применяет пользовательскую функцию к каждому элементу входного набора данных. Поскольку элементы ввода не зависят друг от друга, предварительная обработка может быть распараллелена между несколькими ядрами ЦП. Чтобы сделать это возможным, подобно преобразованиям prefetch и interleave преобразование map предоставляет аргумент num_parallel_calls для указания уровня параллелизма.

Выбор наилучшего значения для аргумента num_parallel_calls зависит от вашего оборудования, характеристик ваших обучающих данных (таких как их размер и форма), стоимости вашей функции карты и того, какая другая обработка выполняется в ЦП в то же время. Простая эвристика - использовать количество доступных ядер ЦП. Однако, что касается преобразования prefetch и interleave преобразование map поддерживает tf.data.AUTOTUNE который делегирует решение о том, какой уровень параллелизма использовать tf.data выполнения tf.data .

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Последовательное отображение

Начните с использования преобразования map без параллелизма в качестве базового примера.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4379127629999857

График времени выполнения данных - метод последовательного сопоставления

Что касается наивного подхода , то здесь, как показывает график, время, затраченное на открытие, чтение, предварительную обработку (отображение) и шаги обучения, суммируется для одной итерации.

Параллельное отображение

Теперь используйте ту же функцию предварительной обработки, но примените ее параллельно к нескольким образцам.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2747970279999663

Время выполнения данных - параллельное отображение

Как видно из графика данных, этапы предварительной обработки перекрываются, что сокращает общее время одной итерации.

Кеширование

Преобразование tf.data.Dataset.cache может кэшировать набор данных либо в памяти, либо в локальном хранилище. Это убережет некоторые операции (например, открытие файла и чтение данных) от выполнения в каждую эпоху.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3715158390000397

Время выполнения данных - метод кэшированного набора данных

Здесь график времени выполнения данных показывает, что, когда вы кэшируете набор данных, преобразования до cache (например, открытие файла и чтение данных) выполняются только в течение первой эпохи. Следующие эпохи будут повторно использовать данные, кэшированные преобразованием cache .

Если определяемая пользователем функция, переданная в преобразование map является дорогостоящей, примените преобразование cache после преобразования map пока результирующий набор данных все еще может уместиться в памяти или локальном хранилище. Если определяемая пользователем функция увеличивает пространство, необходимое для хранения набора данных, сверх емкости кеша, либо примените ее после преобразования cache либо рассмотрите возможность предварительной обработки данных перед учебным заданием, чтобы уменьшить использование ресурсов.

Векторизация отображения

Вызов пользовательской функции, переданной в преобразование map имеет накладные расходы, связанные с планированием и выполнением пользовательской функции. Векторизуйте пользовательскую функцию (то есть пусть она работает с пакетом входных данных одновременно) и примените batch преобразование перед преобразованием map .

Чтобы проиллюстрировать эту передовую практику, ваш искусственный набор данных не подходит. Задержка планирования составляет около 10 микросекунд (10e-6 секунд), что намного меньше, чем десятки миллисекунд, используемых в ArtificialDataset , и поэтому ее влияние трудно увидеть.

В этом примере используйте базовую функцию tf.data.Dataset.range и упростите цикл обучения до его простейшей формы.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Скалярное отображение

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.9082538790000854

Время выполнения данных - метод скалярной карты

На приведенном выше графике показано, что происходит (с меньшим количеством выборок) с использованием метода скалярного сопоставления. Он показывает, что отображенная функция применяется для каждого образца. Хотя эта функция работает очень быстро, у нее есть некоторые накладные расходы, влияющие на временные характеристики.

Векторизованное отображение

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.03624614399996062

Время выполнения данных - метод векторизации карты

На этот раз сопоставленная функция вызывается один раз и применяется к партии выборки. Как показано на графике времени выполнения данных, хотя для выполнения функции может потребоваться больше времени, накладные расходы появляются только один раз, улучшая общую временную производительность.

Уменьшение объема памяти

Ряд преобразований, включая interleave , prefetch и shuffle , поддерживает внутренний буфер элементов. Если определяемая пользователем функция, переданная в преобразование map изменяет размер элементов, то порядок преобразования карты и преобразований, которые буферизуют элементы, влияют на использование памяти. В общем, выбирайте порядок, который приводит к меньшему объему памяти, если другой порядок не желателен для производительности.

Кеширование частичных вычислений

Рекомендуется кэшировать набор данных после преобразования map кроме случаев, когда это преобразование делает данные слишком большими, чтобы поместиться в памяти. Компромисс может быть достигнут, если ваша отображаемая функция может быть разделена на две части: часть, занимающую много времени, и часть, потребляющую память. В этом случае вы можете связать свои преобразования, как показано ниже:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Таким образом, трудоемкая часть выполняется только в течение первой эпохи, и вы избегаете использования слишком большого объема кеш-памяти.

Сводка передовой практики

Вот краткое изложение лучших практик для проектирования производительных входных конвейеров TensorFlow:

Воспроизведение фигур

Чтобы глубжеtf.data.Dataset APItf.data.Dataset , вы можете поиграть со своими собственными конвейерами. Ниже приведен код, используемый для построения изображений из этого руководства. Это может быть хорошей отправной точкой, показывая некоторые обходные пути для типичных трудностей, таких как:

  • Воспроизводимость времени выполнения
  • Отображенные функции стремятся к выполнению
  • вызываемое преобразование interleave
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Набор данных

Подобно ArtificialDataset вы можете создать набор данных, возвращающий время, потраченное на каждом шаге.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Этот набор данных предоставляет образцы формы [[2, 1], [2, 2], [2, 3]] и типа [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Каждый образец:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Где:

  • Open и Read - идентификаторы шагов
  • t0 - отметка времени, когда начался соответствующий шаг
  • d - время, затраченное на соответствующий шаг
  • i - индекс экземпляра
  • e - индекс эпохи (количество повторений набора данных)
  • s - индекс выборки

Итерационный цикл

Сделайте итерационный цикл немного сложнее, чтобы собрать все тайминги. Это будет работать только с наборами данных, генерирующими образцы, как описано выше.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Метод построения

Наконец, определите функцию, способную построить временную шкалу с учетом значений, возвращаемых функцией timelined_benchmark .

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Используйте оболочки для отображаемой функции

Чтобы запустить отображаемую функцию в активном контексте, вы должны tf.py_function их в вызов tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Сравнение трубопроводов

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Наивный

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 12.445692234000035

Оптимизировано

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.326935971000012
draw_timeline(naive_timeline, "Naive", 15)

PNG

draw_timeline(optimized_timeline, "Optimized", 15)

PNG