تاریخ را ذخیره کنید! Google I / O 18-20 مه بازمی گردد اکنون ثبت نام کنید
این صفحه به‌وسیله ‏Cloud Translation API‏ ترجمه شده است.
Switch to English

عملکرد بهتر با API tf.data

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

GPU ها و TPU ها می توانند زمان مورد نیاز برای اجرای یک مرحله آموزشی را کاملاً کاهش دهند. دستیابی به حداکثر عملکرد به یک خط لوله ورودی کارآمد نیاز دارد که داده ها را برای مرحله بعدی قبل از اتمام مرحله فعلی ارائه دهد. tf.data API به ساخت خطوط لوله ورودی انعطاف پذیر و کارآمد کمک می کند. این سند نحوه استفاده از tf.data API برای ساخت خطوط لوله ورودی tf.data بسیار کارآمد را نشان می دهد.

قبل از ادامه ، راهنمای ساخت خطوط لوله ورودی tf.data بررسی کنید تا نحوه استفاده از tf.data API را tf.data .

منابع

برپایی

import tensorflow as tf

import time

در طول این راهنما ، شما در یک مجموعه داده تکرار می شوید و عملکرد را اندازه گیری می کنید. تهیه معیارهای عملکرد قابل تکرار ممکن است دشوار باشد. عوامل مختلف م affectثر در تولید مثل عبارتند از:

  • بار پردازنده فعلی
  • ترافیک شبکه
  • مکانیسم های پیچیده ای مانند حافظه نهان

برای به دست آوردن یک محک قابل تجدید ، یک مثال مصنوعی ایجاد خواهید کرد.

مجموعه داده

با تعریف یک کلاس ارثtf.data.Dataset ازtf.data.Dataset به نام ArtificialDataset . این مجموعه داده:

  • num_samples نمونه ایجاد می num_samples (پیش فرض 3 است)
  • قبل از اولین مورد برای شبیه سازی باز کردن یک پرونده مدتی می خوابد
  • قبل از تولید هر مورد برای شبیه سازی خواندن داده ها از یک فایل مدتی می خوابد
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

این مجموعه داده مشابه tf.data.Dataset.range و یک تاخیر ثابت را در ابتدای و در بین هر نمونه اضافه می کند.

حلقه آموزش

بعد ، یک حلقه ساختگی بنویسید که مدت زمان تکرار بیش از یک مجموعه داده را اندازه گیری کند. زمان آموزش شبیه سازی شده است.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

عملکرد را بهینه کنید

برای نشان دادن چگونگی بهینه سازی عملکرد ، عملکرد ArtificialDataset را بهبود می بخشید.

رویکرد ساده لوحانه

با استفاده از خط لوله ساده و بدون استفاده از هیچ ترفندی شروع کنید و از مجموعه داده همانطور که هست تکرار کنید.

benchmark(ArtificialDataset())
Execution time: 0.2541472299999441

زیر کاپوت ، زمان اعدام شما اینگونه سپری شده است:

نمودار زمان اجرای داده ها - یک روش ساده لوحانه است

طرح نشان می دهد که انجام یک مرحله آموزش شامل موارد زیر است:

  • باز کردن پرونده اگر هنوز باز نشده است
  • واکشی ورودی داده از پرونده
  • استفاده از داده ها برای آموزش

با این حال ، در حالی که خط لوله شما در حال واکشی داده ها است ، در یک اجرای همزمان ساده و ساده ، مدل شما بیکار نشسته است. برعکس ، در حالی که مدل شما در حال آموزش است ، خط لوله ورودی بیکار نشسته است. بنابراین زمان گام آموزش مجموع زمانهای باز ، خواندن و آموزش است.

بخشهای بعدی با استفاده از این خط لوله ورودی ، بهترین روشهای طراحی خطوط لوله ورودی TensorFlow را نشان می دهد.

واکشی

پیش بارگیری با پیش پردازش و اجرای مدل یک مرحله آموزشی همپوشانی دارد. در حالی که مدل مرحله آموزش s ، خط لوله ورودی داده های مرحله s+1 می خواند. با انجام این کار زمان گام به حداکثر (در مقابل جمع) آموزش و مدت زمان استخراج داده ها کاهش می یابد.

tf.data API فراهم می کند tf.data.Dataset.prefetch تحول. این می تواند برای جدا کردن زمان تولید داده از زمان مصرف داده مورد استفاده قرار گیرد. به طور خاص ، تحول از یک پیش زمینه زمینه و یک بافر داخلی برای پیش فرض عناصر از مجموعه داده ورودی قبل از زمان درخواست آنها استفاده می شود. تعداد عناصر برای پیش نصب باید برابر با (یا احتمالاً بیشتر از) تعداد دسته های مصرف شده در یک مرحله آموزش باشد. شما می توانید به صورت دستی این مقدار را تنظیم کنید یا آن را روی tf.data.AUTOTUNE تنظیم کنید ، که باعث می شود زمان اجرای tf.data مقدار را به صورت پویا در زمان اجرا تنظیم کند.

توجه داشته باشید که هر زمان که فرصتی برای همپوشانی کار یک "تولید کننده" با کار "مصرف کننده" وجود داشته باشد ، تحول پیش ساخته مزایایی را به همراه دارد.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.20805208699994182

نمودار زمان اجرای داده ها - روش پیش فرض

اکنون ، همانطور که نمودار زمان اجرای داده ها نشان می دهد ، در حالی که مرحله آموزش برای نمونه 0 در حال اجرا است ، خط لوله ورودی در حال خواندن داده ها برای نمونه 1 و غیره است.

استخراج داده ها به صورت موازی

در یک تنظیم دنیای واقعی ، داده های ورودی ممکن است از راه دور ذخیره شوند (به عنوان مثال ، در Google Cloud Storage یا HDFS). خط لوله مجموعه داده ای که هنگام خواندن محلی اطلاعات به خوبی کار می کند ، ممکن است هنگام خواندن داده ها از راه دور ، تنگنا بر روی I / O باشد ، به دلیل تفاوت های زیر بین حافظه محلی و ذخیره از راه دور:

  • زمان به بایت اول : خواندن اولین بایت فایل از حافظه از راه دور می تواند سفارشاتی به بزرگی بیشتر از حافظه محلی داشته باشد.
  • توان خواندن : در حالی که فضای ذخیره سازی از راه دور معمولاً پهنای باند گسترده ای را ارائه می دهد ، با خواندن یک فایل تنها ممکن است بتوان از بخش کوچکی از این پهنای باند استفاده کرد.

علاوه بر این ، هنگامی که بایت های خام در حافظه بارگیری می شوند ، ممکن است لازم باشد که داده ها را از لیست خارج کرده یا رمزگشایی کنید (به عنوان مثال protobuf ) ، که به محاسبه اضافی نیاز دارد. این سربار صرف نظر از اینکه داده ها به صورت محلی ذخیره می شوند یا از راه دور وجود دارد ، اما در صورت عدم پیش نصب م prefثر داده ها ، می تواند در مورد از راه دور بدتر باشد.

برای کاهش تأثیر سربارهای مختلف استخراج داده ها ، می توان از تحول tf.data.Dataset.interleave برای موازی سازی مرحله بارگذاری داده ، استفاده از مطالب سایر مجموعه های داده (مانند خوانندگان پرونده داده) استفاده کرد. تعداد مجموعه داده های همپوشانی را می توان با آرگومان cycle_length تعیین کرد ، در حالی که سطح موازی کاری را می توان با آرگومان num_parallel_calls مشخص کرد. مشابه تغییر شکل prefetch ، تغییر شکل interleave از tf.data.AUTOTUNE پشتیبانی می tf.data.AUTOTUNE ، که تصمیم در مورد سطح موازی استفاده را به زمان اجرای tf.data .

تراز وسطی

آرگومان های پیش فرض تحول tf.data.Dataset.interleave باعث می شود که نمونه های منفرد از دو مجموعه داده به طور متوالی tf.data.Dataset.interleave .

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4883518669998921

نمودار زمان اجرای داده ها - لایه به هم پیوسته

این اعدام دادههای طرح زمان اجازه می دهد تا به نمایشگاه رفتار interleave تحول، دلربا نمونه به تناوب از دو مجموعه داده موجود است. با این حال ، هیچ بهبود عملکردی در اینجا وجود ندارد.

تراز موازی

اکنون ، از آرگومان num_parallel_calls تغییر شکل interleave num_parallel_calls ای استفاده کنید. با این کار چندین مجموعه داده به طور موازی بارگیری می شود و زمان انتظار برای باز شدن پرونده ها کاهش می یابد.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.26920967700016263

نمودار زمان اجرای داده ها - روش interleave موازی

این بار ، همانطور که نمودار زمان اجرای داده ها نشان می دهد ، خواندن دو مجموعه داده به طور موازی انجام می شود و زمان جهانی پردازش داده کاهش می یابد.

تبدیل موازی داده ها

هنگام تهیه داده ها ، ممکن است لازم باشد عناصر ورودی از قبل پردازش شوند. برای این منظور، tf.data API ارائه می دهد tf.data.Dataset.map تحول، که یک تابع تعریف شده توسط کاربر را به هر عنصر مجموعه داده ورودی اعمال می شود. از آنجا که عناصر ورودی از یکدیگر مستقل هستند ، پیش پردازش می تواند در چندین هسته پردازنده موازی شود. برای اینکه این ممکن است، به طور مشابه به prefetch و interleave تحولات از map تحول فراهم می کند num_parallel_calls استدلال تعیین سطح موازی.

انتخاب بهترین مقدار برای استدلال num_parallel_calls به سخت افزار ، مشخصات داده های آموزش شما (مانند اندازه و شکل آن) ، هزینه عملکرد نقشه شما و سایر پردازش های همزمان پردازنده بستگی دارد. یک ابتکار ساده استفاده از تعداد هسته های پردازنده موجود است. با این حال، برای prefetch و interleave تحول، map تحول از tf.data.AUTOTUNE که تصمیم گیری در مورد آنچه در سطح موازی استفاده به تفویض خواهد tf.data زمان اجرا.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

نقشه برداری پی در پی

با استفاده از تغییر شکل map بدون موازی کاری به عنوان یک مثال پایه شروع کنید.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4379127629999857

نمودار زمان اجرای داده ها - روش نقشه برداری متوالی

در مورد رویکرد ساده لوحانه ، در اینجا ، همانطور که طرح نشان می دهد ، زمان صرف شده برای مراحل باز کردن ، خواندن ، پیش پردازش (نقشه برداری) و آموزش با هم برای یک تکرار واحد جمع می شود.

نقشه برداری موازی

اکنون ، از همان عملکرد قبل از پردازش استفاده کنید اما آن را به صورت موازی روی چندین نمونه اعمال کنید.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2747970279999663

زمان اجرای داده ها - نقشه برداری موازی

همانطور که نمودار داده نشان می دهد ، مراحل پیش پردازش با هم همپوشانی دارند و باعث کاهش زمان کلی برای یک تکرار می شوند.

ذخیره سازی

تحول tf.data.Dataset.cache می تواند یک مجموعه داده را در حافظه یا در حافظه محلی ذخیره کند. با این کار برخی از عملیات (مانند باز کردن پرونده و خواندن داده) از اجرای هر دوره نجات می یابد.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3715158390000397

زمان اجرای داده - روش مجموعه داده cached

در اینجا نمودار زمان اجرای داده ها نشان می دهد که وقتی یک مجموعه داده را حافظه پنهان می کنید ، تغییرات قبل از cache (مانند باز کردن پرونده و خواندن داده ها) فقط در دوره اول اجرا می شوند. دوره های بعدی از داده های ذخیره شده توسط تغییر cache استفاده مجدد می کنند.

اگر عملکرد تعریف شده توسط کاربر به تغییر شکل map گران است ، تحول cache را پس از تغییر شکل map تا زمانی که مجموعه داده حاصل هنوز می تواند در حافظه یا حافظه محلی قرار گیرد. اگر عملکرد تعریف شده توسط کاربر فضای مورد نیاز برای ذخیره مجموعه داده را بیش از ظرفیت حافظه پنهان افزایش می دهد ، یا بعد از تغییر cache آن را اعمال کنید یا برای کاهش استفاده از منابع ، قبل از کار آموزشی خود پیش پردازش داده ها را در نظر بگیرید.

نگاشت برداری

فراخوانی یک تابع تعریف شده توسط کاربر که به دگرگونی map منتقل می شود ، مربوط به زمانبندی و اجرای تابع تعریف شده توسط کاربر است. عملکرد تعریف شده توسط کاربر را برداری کنید (به این معنی که بخواهید همزمان آن را روی دسته ای از ورودی ها کار کند) و قبل از تبدیل map تحول batch را اعمال کنید.

برای نشان دادن این روش خوب ، مجموعه داده مصنوعی شما مناسب نیست. تاخیر زمان بندی حدود 10 میکروثانیه (10e-6 ثانیه) است ، بسیار کمتر از ده ها میلی ثانیه استفاده شده در ArtificialDataset ، و بنابراین تأثیر آن دشوار است.

برای این مثال ، از تابع پایه tf.data.Dataset.range استفاده کنید و حلقه آموزش را به ساده ترین شکل آن ساده کنید.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

نقشه برداری اسکالر

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.9082538790000854

زمان اجرای داده ها - روش نقشه مقیاسی

نمودار بالا آنچه را که در حال انجام است (با نمونه های کمتر) با استفاده از روش نقشه برداری مقیاسی نشان می دهد. این نشان می دهد که تابع نگاشت شده برای هر نمونه اعمال می شود. در حالی که این عملکرد بسیار سریع است ، اما دارای برخی هزینه های اضافی است که بر عملکرد زمان تأثیر می گذارد.

نقشه برداری برداری شده

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.03624614399996062

زمان اجرای داده - روش نقشه برداری شده

این بار ، عملکرد نگاشت شده یک بار فراخوانی می شود و برای دسته ای از نمونه اعمال می شود. همانطور که نمودار زمان اجرای داده ها نشان می دهد ، در حالی که اجرای این عملکرد ممکن است زمان بیشتری را صرف کند ، اما سربار فقط یک بار ظاهر می شود و عملکرد کلی زمان را بهبود می بخشد.

کاهش اثر حافظه

تعدادی از تغییرات ، از جمله interleave ، prefetch و shuffle ، بافر داخلی عناصر را حفظ می کنند. اگر عملکرد تعریف شده توسط کاربر به تحول map تغییر یابد ، اندازه عناصر تغییر می کند ، ترتیب ترتیب تغییر شکل نقشه و تغییر شکل عناصر بافر بر میزان استفاده از حافظه تأثیر می گذارد. به طور کلی ، ترتیبی را انتخاب کنید که باعث کمبود اثر حافظه شود ، مگر اینکه ترتیب مختلفی برای عملکرد مطلوب باشد.

ذخیره محاسبات جزئی

توصیه می شود به کش مجموعه داده پس از map تحول به جز اگر این تحول باعث می شود داده های بسیار بزرگ به جا در حافظه است. اگر عملکرد نگاشتی شما به دو قسمت تقسیم شود: یک زمان بر و یک حافظه. در این حالت ، می توانید تبدیلات خود را مانند زیر زنجیر کنید:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

به این ترتیب قسمت زمان بر فقط در دوره اول اجرا می شود و از استفاده بیش از حد فضای حافظه نهان جلوگیری می کنید.

خلاصه بهترین تمرین

در اینجا خلاصه ای از بهترین روش ها برای طراحی خطوط ورودی عملکردی TensorFlow آورده شده است:

بازتولید ارقام

برای اینکه بیشتر به درکtf.data.Dataset APItf.data.Dataset ، می توانید با خطوط لوله خود بازی کنید. در زیر کدی است که برای ترسیم تصاویر از این راهنما استفاده شده است. این می تواند یک نقطه شروع خوب باشد ، برخی از راه حل ها برای مشکلات مشترک مانند:

  • تکرارپذیری زمان اعدام
  • توابع نقشه برداری اجرا مشتاقانه
  • interleave قابل فراخوانی تغییر و تحول
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

مجموعه داده

مشابه ArtificialDataset می توانید یک مجموعه داده بسازید که زمان صرف شده در هر مرحله را برگرداند.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

این مجموعه داده نمونه هایی از شکل [[2, 1], [2, 2], [2, 3]] و از نوع [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . هر نمونه به شرح زیر است:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

جایی که:

  • Open و Read شناسه های مراحل هستند
  • t0 زمان زمانی است که مرحله مربوطه شروع می شود
  • d زمان صرف شده در مرحله مربوطه است
  • i شاخص نمونه است
  • e شاخص دوره است (تعداد دفعات تکرار مجموعه داده)
  • s شاخص نمونه است

حلقه تکرار

حلقه تکرار را برای جمع بندی تمام زمان بندی ها کمی پیچیده تر کنید. این فقط با مجموعه داده هایی که نمونه ها را تولید می کنند همانطور که در بالا توضیح داده شده کار خواهد کرد.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

روش نقشه کشی

سرانجام ، با توجه به مقادیر برگشتی توسط تابع timelined_benchmark تابعی را تعریف کنید که بتواند جدول زمانی را ترسیم کند.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

برای عملکرد نگاشت شده از لفاف استفاده کنید

برای اجرای تابع mapped در یک زمینه مشتاق ، باید آنها را درون یک تماس tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

مقایسه خطوط لوله

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

آدم ساده

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 12.445692234000035

بهینه شده

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.326935971000012
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png