کمک به حفاظت از دیواره بزرگ مرجانی با TensorFlow در Kaggle اضافه کردن چالش

عملکرد بهتر با API tf.data

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

GPU ها و TPU ها می توانند زمان مورد نیاز برای اجرای یک مرحله آموزشی را به شدت کاهش دهند. دستیابی به حداکثر عملکرد به یک خط لوله ورودی کارآمد نیاز دارد که قبل از پایان مرحله فعلی، داده های مرحله بعدی را ارائه می دهد. tf.data API کمک می کند تا به ساخت خط لوله ورودی انعطاف پذیر و کارآمد. این سند نشان می دهد که چگونه به استفاده از tf.data API برای ساختن بسیار سازگار خطوط لوله ورودی TensorFlow.

قبل از ادامه، بررسی ساخت TensorFlow خطوط لوله ورودی راهنمایی به یاد بگیرند که چگونه به استفاده از tf.data API.

منابع

برپایی

import tensorflow as tf

import time

در طول این راهنما، شما یک مجموعه داده را تکرار کرده و عملکرد را اندازه گیری خواهید کرد. ایجاد معیارهای عملکرد قابل تکرار می تواند دشوار باشد. عوامل مختلف مؤثر بر تکرارپذیری عبارتند از:

  • بار CPU فعلی
  • ترافیک شبکه
  • مکانیسم های پیچیده، مانند حافظه پنهان

برای بدست آوردن یک معیار قابل تکرار، یک نمونه مصنوعی خواهید ساخت.

مجموعه داده

شروع با تعریف یک کلاس ارث بردن از tf.data.Dataset نام ArtificialDataset . این مجموعه داده:

  • تولید num_samples نمونه (به طور پیش فرض 3 است)
  • مدتی قبل از اولین مورد برای شبیه سازی باز کردن یک فایل می خوابد
  • قبل از تولید هر مورد برای شبیه سازی خواندن داده ها از یک فایل، مدتی می خوابد
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

این مجموعه داده است شبیه به tf.data.Dataset.range یکی، اضافه کردن یک تاخیر ثابت در آغاز و در میان هر نمونه.

حلقه آموزش

بعد، یک حلقه آموزشی ساختگی بنویسید که مدت زمان تکرار روی یک مجموعه داده را اندازه می‌گیرد. زمان آموزش شبیه سازی شده است.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

بهینه سازی عملکرد

به نمایشگاه چگونه عملکرد را می توان بهینه سازی شده، شما را به عملکرد بهبود ArtificialDataset .

رویکرد ساده لوحانه

با یک خط لوله ساده و بدون هیچ ترفندی شروع کنید و روی مجموعه داده همانطور که هست تکرار کنید.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

در زیر کاپوت، زمان اعدام شما به این صورت سپری شد:

نمودار زمان اجرای داده ها - یک روش ساده لوحانه

نمودار نشان می دهد که انجام یک مرحله آموزشی شامل موارد زیر است:

  • باز کردن فایل در صورتی که هنوز باز نشده باشد
  • دریافت ورودی داده از فایل
  • استفاده از داده ها برای آموزش

با این حال، در یک پیاده‌سازی همزمان ساده مانند اینجا، در حالی که خط لوله شما داده‌ها را واکشی می‌کند، مدل شما بیکار است. برعکس، در حالی که مدل شما در حال آموزش است، خط لوله ورودی بیکار است. بنابراین زمان گام آموزش مجموع زمان های باز کردن، خواندن و آموزش است.

بخش‌های بعدی بر روی این خط لوله ورودی ساخته می‌شوند و بهترین روش‌ها را برای طراحی خطوط لوله ورودی TensorFlow عملکردی نشان می‌دهند.

پیش واکشی

واکشی اولیه با پیش پردازش و اجرای مدل یک مرحله آموزشی همپوشانی دارد. در حالی که مدل در حال اجرای آموزش گام s ، خط لوله ورودی در حال خواندن داده ها را برای مرحله s+1 . انجام این کار زمان گام را به حداکثر (برخلاف مجموع) آموزش و زمان استخراج داده ها کاهش می دهد.

tf.data API فراهم می کند tf.data.Dataset.prefetch تحول. می توان از آن برای جدا کردن زمان تولید داده از زمان مصرف داده استفاده کرد. به طور خاص، تبدیل از یک رشته پس‌زمینه و یک بافر داخلی برای واکشی اولیه عناصر از مجموعه داده ورودی قبل از زمان درخواستی استفاده می‌کند. تعداد عناصر برای واکشی اولیه باید برابر (یا احتمالاً بیشتر از) تعداد دسته‌های مصرف‌شده در یک مرحله آموزشی باشد. شما هم می تواند به صورت دستی تنظیم این ارزش ها، یا آن را به مجموعه tf.data.AUTOTUNE ، که بی درنگ tf.data زمان اجرا به لحن به صورت پویا در زمان اجرا مقدار است.

توجه داشته باشید که تبدیل پیش واکشی هر زمان که فرصتی برای همپوشانی کار یک «تولیدکننده» با کار «مصرف‌کننده» وجود دارد، مزایایی را فراهم می‌کند.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

نمودار زمان اجرای داده ها - روش واکشی اولیه

اکنون همانطور که نمودار زمان اجرای داده ها نشان می دهد، در حالی که مرحله آموزش برای نمونه 0 در حال اجرا است، خط لوله ورودی در حال خواندن داده ها برای نمونه 1 و غیره است.

استخراج داده های موازی

در یک محیط واقعی، داده های ورودی ممکن است از راه دور ذخیره شوند (به عنوان مثال، در Google Cloud Storage یا HDFS). خط لوله داده ای که هنگام خواندن داده ها به صورت محلی خوب کار می کند، ممکن است هنگام خواندن داده ها از راه دور، به دلیل تفاوت های زیر بین ذخیره سازی محلی و راه دور، در ورودی/خروجی با تنگنا مواجه شود:

  • زمان به اولین بایت: خواندن اولین بایت از یک فایل را از ذخیره سازی از راه دور می توانید سفارشات از قدر طولانی تر از از ذخیره سازی محلی است.
  • خوانده شده توان: در حالی که ذخیره سازی از راه دور به طور معمول ارائه می دهد پهنای باند بزرگ دانه ها، خواندن یک فایل ممکن است تنها قادر به استفاده از بخش کوچکی از این پهنای باند باشد.

علاوه بر این، یک بار کلمه در ادامه متن خام در حافظه بارگذاری، همچنین ممکن است لازم به deserialize و / یا رمزگشایی داده شود (به عنوان مثال protobuf ) که نیاز به محاسبات اضافی. این سربار صرف نظر از اینکه داده ها به صورت محلی یا از راه دور ذخیره می شوند وجود دارد، اما اگر داده ها به طور موثر واکشی نشده باشند، می تواند در حالت راه دور بدتر باشد.

برای کاهش تاثیر سربار مختلف استخراج داده ها، tf.data.Dataset.interleave تحول را می توان مورد استفاده قرار گیرد به parallelize مرحله بارگذاری داده ها، Interleaving در محتویات دیگر مجموعه داده (مانند خوانندگان فایل داده ها). تعداد مجموعه داده به همپوشانی را می توان با مشخص cycle_length استدلال، در حالی که سطح موازی را می توان با مشخص num_parallel_calls استدلال است. شبیه به prefetch تحول، interleave تحول از tf.data.AUTOTUNE ، که تصمیم گیری در مورد آنچه در سطح موازی استفاده به تفویض خواهد tf.data زمان اجرا.

متقابل متوالی

استدلال به طور پیش فرض از tf.data.Dataset.interleave تحول آن را در میان چیزی نمونه های تک از دو مجموعه داده پی در پی.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

نمودار زمان اجرای داده ها - interleave متوالی

این اعدام دادههای طرح زمان اجازه می دهد تا به نمایشگاه رفتار interleave تحول، دلربا نمونه به تناوب از دو مجموعه داده موجود است. با این حال، هیچ بهبود عملکردی در اینجا وجود ندارد.

متقابل موازی

در حال حاضر، استفاده از num_parallel_calls استدلال از interleave تغییر و تحول. این چندین مجموعه داده را به صورت موازی بارگیری می کند و زمان انتظار برای باز شدن فایل ها را کاهش می دهد.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

نمودار زمان اجرای داده ها - روش interleave موازی

این بار، همانطور که نمودار زمان اجرای داده نشان می دهد، خواندن دو مجموعه داده موازی می شود و زمان پردازش داده های جهانی را کاهش می دهد.

تبدیل داده های موازی

هنگام تهیه داده ها، عناصر ورودی ممکن است نیاز به پیش پردازش داشته باشند. برای این منظور، tf.data API ارائه می دهد tf.data.Dataset.map تحول، که یک تابع تعریف شده توسط کاربر را به هر عنصر مجموعه داده ورودی اعمال می شود. از آنجا که عناصر ورودی مستقل از یکدیگر هستند، پیش پردازش را می توان در چندین هسته CPU موازی کرد. برای اینکه این ممکن است، به طور مشابه به prefetch و interleave تحولات از map تحول فراهم می کند num_parallel_calls استدلال تعیین سطح موازی.

انتخاب بهترین ارزش برای num_parallel_calls استدلال بستگی به سخت افزار خود را، از ویژگی های داده های آموزشی خود (مانند اندازه و شکل)، هزینه تابع نقشه خود را، و چه پردازش های دیگر است بر روی CPU اتفاق می افتد در همان زمان. یک روش اکتشافی ساده استفاده از تعداد هسته های CPU موجود است. با این حال، برای prefetch و interleave تحول، map تحول از tf.data.AUTOTUNE که تصمیم گیری در مورد آنچه در سطح موازی استفاده به تفویض خواهد tf.data زمان اجرا.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

نقشه برداری متوالی

شروع با استفاده از map تحول بدون موازی به عنوان یک مثال پایه.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

نمودار زمان اجرای داده ها - روش نگاشت متوالی

همانطور که برای روش ساده ، در اینجا، به عنوان طرح نشان می دهد، زمان صرف شده برای باز کردن، خواندن، پردازش اولیه (نقشه برداری) و مراحل آموزش خلاصه با هم برای یک تکرار است.

نقشه برداری موازی

اکنون، از همان تابع پیش پردازش استفاده کنید، اما آن را به صورت موازی روی چندین نمونه اعمال کنید.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

زمان اجرای داده ها - نقشه برداری موازی

همانطور که نمودار داده نشان می دهد، مراحل پیش پردازش با هم همپوشانی دارند و زمان کلی برای یک تکرار را کاهش می دهند.

ذخیره سازی

tf.data.Dataset.cache تحول می توانید یک مجموعه داده یا در حافظه یا حافظه داخلی کش،. این باعث می‌شود برخی از عملیات (مانند باز کردن فایل و خواندن داده‌ها) از اجرای هر دوره نجات پیدا کند.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

زمان اجرای داده - روش داده های ذخیره شده در حافظه پنهان

در اینجا، اعدام داده های زمان طرح نشان می دهد که زمانی که شما کش یک مجموعه داده، تحولات قبل از cache یکی (مانند باز کردن فایل ها و داده ها خواندن) فقط در دوره اول اجرا. دوره های بعدی خواهد داده های کش شده توسط استفاده مجدد از cache و تحول است.

اگر تابع تعریف شده توسط کاربر به تصویب رسید به map تحول گران است، اعمال cache تحول پس از map تحول تا زمانی که مجموعه داده و در نتیجه هنوز هم می تواند به حافظه و یا ذخیره سازی محلی مناسب است. اگر تابع تعریف شده توسط کاربر را افزایش می دهد فضای مورد نیاز برای ذخیره مجموعه داده فراتر از ظرفیت ذخیره سازی، یا درخواست آن را پس از cache تحول و یا در نظر پردازش اولیه داده های خود را قبل از کار آموزشی خود را به منظور کاهش استفاده از منابع.

نقشه برداری برداری

استناد به یک تابع به تصویب رسید به map تحول سربار به برنامه ریزی مربوط و اجرای تابع تعریف شده توسط کاربر. برداری از تابع تعریف شده توسط کاربر (این است که، آن را بیش از یک دسته ای از ورودی در یک بار عمل) و اعمال batch قبل از تحول map تحول.

برای نشان دادن این عمل خوب، مجموعه داده مصنوعی شما مناسب نیست. تاخیر برنامه ریزی در حدود 10 میکروثانیه (10E-6 ثانیه)، به مراتب کمتر از ده میلی ثانیه مورد استفاده در ArtificialDataset ، و در نتیجه تاثیر آن سخت است برای دیدن.

برای این مثال، استفاده از پایه tf.data.Dataset.range عملکرد و ساده حلقه آموزش به ساده ترین شکل آن است.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

نقشه برداری اسکالر

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

زمان اجرای داده ها - روش نقشه اسکالر

نمودار بالا آنچه را که در حال وقوع است (با نمونه های کمتر) با استفاده از روش نگاشت اسکالر نشان می دهد. نشان می دهد که تابع نگاشت برای هر نمونه اعمال می شود. در حالی که این عملکرد بسیار سریع است، مقداری سربار دارد که بر عملکرد زمانی تأثیر می گذارد.

نقشه برداری برداری

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

زمان اجرای داده ها - روش نقشه برداری

این بار تابع نگاشت یک بار فراخوانی می شود و برای دسته ای از نمونه اعمال می شود. همانطور که نمودار زمان اجرای داده نشان می دهد، در حالی که اجرای تابع زمان بیشتری می برد، سربار فقط یک بار ظاهر می شود و عملکرد کلی زمان را بهبود می بخشد.

کاهش ردپای حافظه

تعدادی از تحولات، از جمله interleave ، prefetch ، و shuffle ، حفظ یک بافر داخلی از عناصر. اگر تابع تعریف شده توسط کاربر به تصویب رسید به map تحول اندازه عناصر، سپس ترتیب تحول نقشه و تحولات که عناصر بافر تحت تاثیر قرار استفاده از حافظه را تغییر میدهد. به طور کلی، ترتیبی را انتخاب کنید که منجر به ردپای حافظه کمتر شود، مگر اینکه ترتیب متفاوتی برای عملکرد مطلوب باشد.

کش کردن محاسبات جزئی

توصیه می شود به کش مجموعه داده پس از map تحول به جز اگر این تحول باعث می شود داده های بسیار بزرگ به جا در حافظه است. اگر عملکرد نقشه برداری شده شما را بتوان به دو بخش تقسیم کرد: یک بخش زمان بر و یک بخش مصرف کننده حافظه، می توان به یک مبادله دست یافت. در این مورد، می توانید تبدیل های خود را مانند زیر زنجیره بزنید:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

به این ترتیب بخش وقت گیر فقط در دوره اول اجرا می شود و از استفاده بیش از حد از فضای کش جلوگیری می کنید.

خلاصه بهترین تمرین

در اینجا خلاصه‌ای از بهترین روش‌ها برای طراحی خطوط لوله ورودی TensorFlow عملکردی آورده شده است:

بازتولید ارقام

برای بررسی عمیق تر در tf.data.Dataset درک API، شما می توانید با خطوط لوله خود را بازی. در زیر کد مورد استفاده برای رسم تصاویر از این راهنما آمده است. این می‌تواند نقطه شروع خوبی باشد و راه‌حل‌هایی را برای مشکلات رایج نشان دهد، مانند:

  • تکرارپذیری زمان اجرا
  • توابع نقشه برداری مشتاق اجرا
  • interleave قابل فراخوانی تغییر و تحول
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

مجموعه داده

شبیه به ArtificialDataset شما می توانید یک مجموعه داده از بازگشت زمان صرف شده در هر مرحله ساخت.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

این مجموعه داده نمونه از شکل فراهم می کند [[2, 1], [2, 2], [2, 3]] و از نوع [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . هر نمونه عبارت است از:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

جایی که:

  • Open و Read مراحل شناسه هستند
  • t0 برچسب زمان زمانی است که گام مربوط آغاز شده
  • d زمان صرف شده در گام مربوط است
  • i شاخص به عنوان مثال است
  • e شاخص عصر (تعداد بار مجموعه داده است تکرار شده است)
  • s شاخص نمونه است

حلقه تکرار

حلقه تکرار را کمی پیچیده تر کنید تا همه زمان بندی ها را جمع آوری کنید. این فقط با مجموعه داده‌هایی که نمونه‌هایی را تولید می‌کنند همانطور که در بالا توضیح داده شد کار می‌کند.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

روش رسم

در نهایت، تعریف یک تابع قادر به رسم یک جدول زمانی با توجه به ارزش های بازگردانده شده توسط timelined_benchmark تابع.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

از wrapper ها برای عملکرد نقشه برداری شده استفاده کنید

برای اجرای تابع نقشه برداری در زمینه مشتاق، شما باید آنها را بسته بندی در داخل یک tf.py_function پاسخ.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

مقایسه خطوط لوله

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

آدم ساده

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

بهینه شده است

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png