أداء أفضل مع tf.data API

تنظيم صفحاتك في مجموعات يمكنك حفظ المحتوى وتصنيفه حسب إعداداتك المفضّلة.

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يمكن لوحدات معالجة الرسومات (GPU) و (TPU) تقليل الوقت المطلوب بشكل جذري لتنفيذ خطوة تدريب واحدة. يتطلب تحقيق ذروة الأداء وجود خط إدخال فعال يوفر البيانات للخطوة التالية قبل انتهاء الخطوة الحالية. تساعد واجهة برمجة تطبيقات tf.data في بناء خطوط إدخال مرنة وفعالة. يوضح هذا المستند كيفية استخدام tf.data API لبناء خطوط أنابيب إدخال TensorFlow عالية الأداء.

قبل المتابعة ، راجع دليل إنشاء خطوط إدخال TensorFlow للتعرف على كيفية استخدام tf.data API.

موارد

يثبت

import tensorflow as tf

import time

خلال هذا الدليل ، سوف تتكرر عبر مجموعة بيانات وتقيس الأداء. قد يكون من الصعب إجراء مقاييس أداء قابلة للتكرار. تشمل العوامل المختلفة التي تؤثر على قابلية التكاثر ما يلي:

  • الحمل الحالي لوحدة المعالجة المركزية
  • حركة مرور الشبكة
  • آليات معقدة ، مثل ذاكرة التخزين المؤقت

للحصول على معيار قابل للتكرار ، ستقوم ببناء مثال مصطنع.

مجموعة البيانات

ابدأ بتحديد فئة موروثة من tf.data.Dataset تسمى ArtificialDataset . مجموعة البيانات هذه:

  • يولد num_samples العينات (الافتراضي هو 3)
  • ينام لبعض الوقت قبل العنصر الأول لمحاكاة فتح ملف
  • ينام لبعض الوقت قبل إنتاج كل عنصر لمحاكاة قراءة البيانات من ملف
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

تشبه مجموعة البيانات هذه tf.data.Dataset.range الأول ، مضيفًا تأخيرًا ثابتًا في بداية كل عينة وفيما بينها.

حلقة التدريب

بعد ذلك ، اكتب حلقة تدريب وهمية تقيس الوقت المستغرق للتكرار على مجموعة بيانات. يتم محاكاة وقت التدريب.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

تحسين الأداء

لعرض كيفية تحسين الأداء ، ستقوم بتحسين أداء مجموعة البيانات ArtificialDataset .

النهج الساذج

ابدأ بخط أنابيب ساذج بدون حيل ، وقم بالتكرار على مجموعة البيانات كما هي.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

تحت الغطاء ، هذه هي الطريقة التي أمضيت بها وقت التنفيذ:

مخطط وقت تنفيذ البيانات - طريقة ساذجة

توضح الحبكة أن تنفيذ خطوة تدريبية تتضمن:

  • فتح ملف إذا لم يتم فتحه بعد
  • إحضار إدخال بيانات من الملف
  • استخدام البيانات للتدريب

ومع ذلك ، في تنفيذ متزامن ساذج مثل هنا ، بينما يقوم خط الأنابيب الخاص بك بجلب البيانات ، يكون نموذجك في وضع الخمول. على العكس من ذلك ، أثناء تدريب النموذج الخاص بك ، يكون خط أنابيب الإدخال في وضع الخمول. وبالتالي فإن وقت خطوة التدريب هو مجموع أوقات الفتح والقراءة والتدريب.

تعتمد الأقسام التالية على خط أنابيب الإدخال هذا ، مما يوضح أفضل الممارسات لتصميم أنابيب إدخال TensorFlow ذات الأداء العالي.

الجلب المسبق

يتداخل الجلب المسبق مع المعالجة المسبقة وتنفيذ النموذج لخطوة التدريب. أثناء تنفيذ النموذج لخطوات التدريب ، يقرأ خط أنابيب الإدخال البيانات للخطوة s s+1 . يؤدي القيام بذلك إلى تقليل وقت الخطوة إلى الحد الأقصى (على عكس المجموع) للتدريب والوقت الذي يستغرقه استخراج البيانات.

توفر واجهة برمجة تطبيقات tf.data تحويل tf.data.Dataset.prefetch . يمكن استخدامه لفصل الوقت الذي يتم فيه إنتاج البيانات عن وقت استهلاك البيانات. على وجه الخصوص ، يستخدم التحويل مؤشر ترابط في الخلفية ومخزنًا مؤقتًا داخليًا للجلب المسبق للعناصر من مجموعة بيانات الإدخال قبل الوقت المطلوب. يجب أن يكون عدد العناصر المطلوب الجلب المسبق مساويًا (أو ربما أكبر من) عدد الدُفعات المستهلكة في خطوة تدريب واحدة. يمكنك إما ضبط هذه القيمة يدويًا ، أو تعيينها على tf.data.AUTOTUNE ، والذي سيطلب من وقت تشغيل tf.data ضبط القيمة ديناميكيًا في وقت التشغيل.

لاحظ أن تحويل الجلب المسبق يوفر فوائد في أي وقت توجد فيه فرصة لتداخل عمل "المنتج" مع عمل "المستهلك".

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

مخطط وقت تنفيذ البيانات - طريقة الجلب المسبق

الآن ، كما يظهر مخطط وقت تنفيذ البيانات ، أثناء تشغيل خطوة التدريب للعينة 0 ، يقرأ خط أنابيب الإدخال البيانات الخاصة بالعينة 1 ، وما إلى ذلك.

موازاة استخراج البيانات

في إعدادات العالم الحقيقي ، قد يتم تخزين بيانات الإدخال عن بُعد (على سبيل المثال ، على Google Cloud Storage أو HDFS). قد يتعرض خط أنابيب مجموعة البيانات الذي يعمل بشكل جيد عند قراءة البيانات محليًا للاختناق في الإدخال / الإخراج عند قراءة البيانات عن بُعد بسبب الاختلافات التالية بين التخزين المحلي والتخزين البعيد:

  • الوقت إلى البايت الأول : يمكن أن تستغرق قراءة البايت الأول من الملف من التخزين البعيد أوامر من حيث الحجم أطول من التخزين المحلي.
  • معدل نقل القراءة : بينما يوفر التخزين البعيد عادةً نطاقًا تردديًا مجمعًا كبيرًا ، قد تتمكن قراءة ملف واحد فقط من استخدام جزء صغير من هذا النطاق الترددي.

بالإضافة إلى ذلك ، بمجرد تحميل البايت الخام في الذاكرة ، قد يكون من الضروري أيضًا إلغاء تسلسل و / أو فك تشفير البيانات (مثل protobuf ) ، الأمر الذي يتطلب حسابًا إضافيًا. هذا الحمل موجود بغض النظر عما إذا كانت البيانات مخزنة محليًا أو عن بُعد ، ولكن يمكن أن تكون أسوأ في الحالة البعيدة إذا لم يتم جلب البيانات مسبقًا بشكل فعال.

للتخفيف من تأثير النفقات العامة المختلفة لاستخراج البيانات ، يمكن استخدام التحويل tf.data.Dataset.interleave خطوة تحميل البيانات ، وتشذير محتويات مجموعات البيانات الأخرى (مثل قارئات ملفات البيانات). يمكن تحديد عدد مجموعات البيانات المراد تداخلها بواسطة الوسيطة cycle_length ، بينما يمكن تحديد مستوى التوازي بواسطة الوسيطة num_parallel_calls . على غرار تحويل prefetch ، يدعم تحويل interleave tf.data.AUTOTUNE ، والذي سيفوض القرار حول مستوى التوازي الذي يجب استخدامه لوقت تشغيل tf.data .

تشذير متسلسل

تجعل الوسائط الافتراضية tf.data.Dataset.interleave تشذير عينات مفردة من مجموعتي بيانات بالتتابع.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

مخطط وقت تنفيذ البيانات - تشذير متسلسل

يسمح مخطط وقت تنفيذ البيانات هذا بعرض سلوك تحويل interleave ، وجلب العينات بدلاً من ذلك من مجموعتي البيانات المتاحتين. ومع ذلك ، لا يتم تضمين تحسين الأداء هنا.

تشذير متوازي

الآن ، استخدم وسيطة num_parallel_calls interleave يؤدي هذا إلى تحميل مجموعات بيانات متعددة بشكل متوازٍ ، مما يقلل من وقت انتظار فتح الملفات.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

مخطط وقت تنفيذ البيانات - طريقة التشذير المتوازي

هذه المرة ، كما يظهر مخطط وقت تنفيذ البيانات ، تتم قراءة مجموعتي البيانات بشكل متوازي ، مما يقلل من وقت معالجة البيانات العالمي.

موازاة تحويل البيانات

عند تحضير البيانات ، قد تحتاج عناصر الإدخال إلى المعالجة المسبقة. تحقيقا لهذه الغاية ، تقدم واجهة برمجة تطبيقات tf.data تحويل tf.data.Dataset.map ، والذي يطبق وظيفة محددة من قبل المستخدم على كل عنصر من عناصر مجموعة بيانات الإدخال. نظرًا لأن عناصر الإدخال مستقلة عن بعضها البعض ، يمكن موازاة المعالجة المسبقة عبر أنوية وحدة المعالجة المركزية المتعددة. لجعل هذا ممكنًا ، على غرار عمليات prefetch والتحويلات num_parallel_calls ، يوفر تحويل map وسيطة interleave لتحديد مستوى التوازي.

يعتمد اختيار أفضل قيمة لوسيطة num_parallel_calls على أجهزتك ، وخصائص بيانات التدريب (مثل حجمها وشكلها) ، وتكلفة وظيفة الخريطة ، والمعالجة الأخرى التي تحدث على وحدة المعالجة المركزية في نفس الوقت. إرشاد بسيط هو استخدام عدد النوى المتاحة لوحدة المعالجة المركزية. ومع ذلك ، بالنسبة للتحويل prefetch interleave ، فإن تحويل map يدعم tf.data.AUTOTUNE الذي سيفوض القرار حول مستوى التوازي الذي يجب استخدامه لوقت تشغيل tf.data .

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

رسم الخرائط المتسلسلة

ابدأ باستخدام تحويل map بدون التوازي كمثال أساسي.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

مخطط وقت تنفيذ البيانات - طريقة التعيين المتسلسل

بالنسبة للنهج الساذج ، هنا ، كما تظهر الحبكة ، يتم جمع الأوقات التي يتم قضاؤها في الفتح والقراءة والمعالجة المسبقة (رسم الخرائط) والتدريب معًا لتكرار واحد.

رسم الخرائط المتوازية

الآن ، استخدم نفس وظيفة المعالجة المسبقة ولكن قم بتطبيقها بالتوازي على عينات متعددة.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

وقت تنفيذ البيانات - رسم الخرائط المتوازية

كما يوضح مخطط البيانات ، تتداخل خطوات المعالجة المسبقة ، مما يقلل الوقت الإجمالي لتكرار واحد.

التخزين المؤقت

يمكن أن يخزن تحويل tf.data.Dataset.cache مجموعة بيانات مؤقتًا ، إما في الذاكرة أو في التخزين المحلي. سيوفر هذا بعض العمليات (مثل فتح الملف وقراءة البيانات) من التنفيذ خلال كل حقبة.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

وقت تنفيذ البيانات - طريقة مجموعة البيانات المخزنة مؤقتًا

هنا ، يُظهر مخطط وقت تنفيذ البيانات أنه عند تخزين مجموعة بيانات مؤقتًا ، يتم تنفيذ التحويلات قبل cache (مثل فتح الملف وقراءة البيانات) فقط خلال الحقبة الأولى. ستعيد العهود التالية استخدام البيانات المخزنة مؤقتًا بواسطة تحويل cache .

إذا كانت الوظيفة المحددة من قبل المستخدم التي تم تمريرها في تحويل map باهظة الثمن ، فقم بتطبيق تحويل cache بعد تحويل map طالما أن مجموعة البيانات الناتجة لا تزال تتناسب مع الذاكرة أو التخزين المحلي. إذا زادت الوظيفة التي يحددها المستخدم من المساحة المطلوبة لتخزين مجموعة البيانات بما يتجاوز سعة ذاكرة التخزين المؤقت ، فقم إما بتطبيقها بعد تحويل cache أو ضع في اعتبارك المعالجة المسبقة لبياناتك قبل مهمة التدريب لتقليل استخدام الموارد.

رسم الخرائط الموجهة

إن استدعاء وظيفة معرفة من قبل المستخدم تم تمريرها إلى تحويل map له عبء متعلق بجدولة وتنفيذ الوظيفة المحددة بواسطة المستخدم. قم بتوجيه الوظيفة المعرفة من قبل المستخدم (أي اجعلها تعمل عبر مجموعة من المدخلات في وقت واحد) وقم بتطبيق تحويل batch قبل تحويل map .

لتوضيح هذه الممارسة الجيدة ، فإن مجموعة البيانات الاصطناعية ليست مناسبة. يبلغ تأخير الجدولة حوالي 10 ميكروثانية (10e-6 ثوانٍ) ، أي أقل بكثير من عشرات المللي ثانية المستخدمة في مجموعة البيانات ArtificialDataset ، وبالتالي يصعب رؤية تأثيرها.

في هذا المثال ، استخدم الدالة الأساسية tf.data.Dataset.range وقم بتبسيط حلقة التدريب إلى أبسط أشكالها.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

رسم الخرائط العددية

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

وقت تنفيذ البيانات - طريقة الخريطة العددية

يوضح الرسم أعلاه ما يجري (مع عدد أقل من العينات) باستخدام طريقة رسم الخرائط العددية. يوضح أنه يتم تطبيق الوظيفة المعينة لكل عينة. في حين أن هذه الوظيفة سريعة جدًا ، إلا أن لها بعض النفقات العامة التي تؤثر على أداء الوقت.

رسم الخرائط الموجهة

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

وقت تنفيذ البيانات - طريقة الخريطة الموجهة

هذه المرة ، يتم استدعاء الوظيفة المعينة مرة واحدة ويتم تطبيقها على مجموعة من العينات. كما يظهر مخطط وقت تنفيذ البيانات ، بينما قد تستغرق الوظيفة مزيدًا من الوقت للتنفيذ ، تظهر النفقات العامة مرة واحدة فقط ، مما يؤدي إلى تحسين أداء الوقت الإجمالي.

تقليل البصمة على الذاكرة

يحافظ عدد من التحويلات ، بما في ذلك interleave ، prefetch ، والخلط shuffle ، على مخزن مؤقت داخلي للعناصر. إذا كانت الوظيفة المعرفة من قبل المستخدم التي تم تمريرها إلى تحويل map تغير حجم العناصر ، فإن ترتيب تحويل الخريطة والتحولات التي تقوم بها عناصر المخزن المؤقت تؤثر على استخدام الذاكرة. بشكل عام ، اختر الترتيب الذي ينتج عنه مساحة أقل للذاكرة ، ما لم يكن الترتيب المختلف مطلوبًا للأداء.

التخزين المؤقت للحسابات الجزئية

يوصى بتخزين مجموعة البيانات مؤقتًا بعد تحويل map إلا إذا كان هذا التحويل يجعل البيانات أكبر من أن تتناسب مع الذاكرة. يمكن تحقيق المفاضلة إذا كان من الممكن تقسيم الوظيفة المعينة إلى جزأين: جزء مستهلك للوقت وجزء يستهلك ذاكرة. في هذه الحالة ، يمكنك إجراء سلسلة من التحولات كما يلي:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

بهذه الطريقة ، يتم تنفيذ الجزء الذي يستغرق وقتًا طويلاً فقط خلال الحقبة الأولى ، وتتجنب استخدام مساحة كبيرة جدًا من ذاكرة التخزين المؤقت.

ملخص أفضل الممارسات

فيما يلي ملخص لأفضل الممارسات لتصميم أنابيب إدخال TensorFlow عالية الأداء:

استنساخ الأشكال

للتعمق في فهم tf.data.Dataset API ، يمكنك اللعب بخطوط الأنابيب الخاصة بك. يوجد أدناه الكود المستخدم لرسم الصور من هذا الدليل. يمكن أن يكون نقطة انطلاق جيدة ، مع عرض بعض الحلول للصعوبات الشائعة مثل:

  • وقت التنفيذ التكاثر
  • وظائف المعينة التنفيذ حريص
  • تحويل interleave callable
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

مجموعة البيانات

على غرار مجموعة البيانات ArtificialDataset ، يمكنك إنشاء مجموعة بيانات تعيد الوقت الذي تقضيه في كل خطوة.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

توفر مجموعة البيانات هذه عينات من الشكل [[2, 1], [2, 2], [2, 3]] ومن النوع [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . كل عينة هي:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

أين:

  • Open Read هما معرفات للخطوات
  • t0 هو الطابع الزمني عندما بدأت الخطوة المقابلة
  • d هو الوقت الذي يقضيه في الخطوة المقابلة
  • i هو فهرس المثيل
  • e هو مؤشر العصر (عدد المرات التي تم فيها تكرار مجموعة البيانات)
  • s هو مؤشر العينة

حلقة التكرار

اجعل حلقة التكرار أكثر تعقيدًا قليلاً لتجميع كل التوقيتات. لن يعمل هذا إلا مع مجموعات البيانات التي تنشئ عينات كما هو مفصل أعلاه.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

طريقة التآمر

أخيرًا ، حدد وظيفة قادرة على رسم مخطط زمني وفقًا للقيم التي يتم إرجاعها بواسطة دالة timelined_benchmark .

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

استخدم الأغلفة للوظيفة المعينة

لتشغيل وظيفة تعيين في سياق حريص ، عليك لفها داخل استدعاء tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

مقارنة خطوط الأنابيب

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

ساذج

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

المحسن

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

بي إن جي

draw_timeline(optimized_timeline, "Optimized", 15)

بي إن جي