احفظ التاريخ! يعود مؤتمر Google I / O من 18 إلى 20 مايو. سجل الآن
ترجمت واجهة Cloud Translation API‏ هذه الصفحة.
Switch to English

أداء أفضل مع tf.data API

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يمكن لوحدات معالجة الرسومات (GPU) و (TPU) تقليل الوقت المطلوب بشكل جذري لتنفيذ خطوة تدريب واحدة. يتطلب تحقيق ذروة الأداء وجود خط إدخال فعال يوفر البيانات للخطوة التالية قبل انتهاء الخطوة الحالية. تساعد واجهة برمجة تطبيقات tf.data على بناء خطوط إدخال مرنة وفعالة. يوضح هذا المستند كيفية استخدام tf.data API لبناء خطوط أنابيب إدخال TensorFlow عالية الأداء.

قبل المتابعة ، تحقق من دليل إنشاء خطوط إدخال TensorFlow للتعرف على كيفية استخدام tf.data API.

موارد

يثبت

import tensorflow as tf

import time

خلال هذا الدليل ، سوف تتكرر عبر مجموعة بيانات وتقيس الأداء. قد يكون من الصعب إجراء مقاييس أداء قابلة للتكرار. تشمل العوامل المختلفة التي تؤثر على قابلية التكاثر ما يلي:

  • الحمل الحالي لوحدة المعالجة المركزية
  • حركة مرور الشبكة
  • آليات معقدة ، مثل ذاكرة التخزين المؤقت

للحصول على معيار قابل للتكرار ، ستقوم ببناء مثال مصطنع.

مجموعة البيانات

ابدأ بتحديد فئةtf.data.Dataset منtf.data.Dataset تسمى ArtificialDataset . مجموعة البيانات هذه:

  • يولد num_samples العينات (الافتراضي هو 3)
  • ينام لبعض الوقت قبل العنصر الأول لمحاكاة فتح ملف
  • ينام لبعض الوقت قبل إنتاج كل عنصر لمحاكاة قراءة البيانات من ملف
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

تشبه مجموعة البيانات هذه tf.data.Dataset.range الأول ، مضيفًا تأخيرًا ثابتًا في بداية كل عينة وفيما بينها.

حلقة التدريب

بعد ذلك ، اكتب حلقة تدريب وهمية تقيس الوقت المستغرق للتكرار على مجموعة بيانات. يتم محاكاة وقت التدريب.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

تحسين الأداء

لعرض كيفية تحسين الأداء ، ستقوم بتحسين أداء مجموعة البيانات ArtificialDataset .

النهج الساذج

ابدأ بخط أنابيب ساذج بدون حيل ، وقم بالتكرار على مجموعة البيانات كما هي.

benchmark(ArtificialDataset())
Execution time: 0.2541472299999441

تحت الغطاء ، هذه هي الطريقة التي أمضيت بها وقت التنفيذ:

مخطط وقت تنفيذ البيانات - طريقة ساذجة

توضح الحبكة أن تنفيذ خطوة التدريب يتضمن:

  • فتح ملف إذا لم يتم فتحه بعد
  • إحضار إدخال بيانات من الملف
  • استخدام البيانات للتدريب

ومع ذلك ، في تنفيذ متزامن ساذج مثل هنا ، بينما يقوم خط الأنابيب الخاص بك بجلب البيانات ، يكون نموذجك في وضع الخمول. على العكس من ذلك ، أثناء تدريب النموذج الخاص بك ، يكون خط أنابيب الإدخال في وضع الخمول. وبالتالي ، فإن وقت خطوة التدريب هو مجموع أوقات الفتح والقراءة والتدريب.

تعتمد الأقسام التالية على خط أنابيب الإدخال هذا ، مما يوضح أفضل الممارسات لتصميم أنابيب إدخال TensorFlow ذات الأداء العالي.

الجلب المسبق

يتداخل الجلب المسبق مع المعالجة المسبقة وتنفيذ النموذج لخطوة التدريب. في حين أن نموذج يتم تنفيذ التدريب خطوة s ، خط أنابيب مدخلات يقوم بقراءة البيانات لخطوة s+1 . يؤدي القيام بذلك إلى تقليل وقت الخطوة إلى الحد الأقصى (على عكس المجموع) من التدريب والوقت الذي يستغرقه استخراج البيانات.

و tf.data يوفر API ل tf.data.Dataset.prefetch التحول. يمكن استخدامه لفصل الوقت الذي يتم فيه إنتاج البيانات عن وقت استهلاك البيانات. على وجه الخصوص ، يستخدم التحويل مؤشر ترابط في الخلفية ومخزنًا مؤقتًا داخليًا للجلب المسبق للعناصر من مجموعة بيانات الإدخال قبل الوقت المطلوب. يجب أن يكون عدد العناصر المطلوب الجلب المسبق مساويًا (أو ربما أكبر من) عدد الدُفعات المستهلكة في خطوة تدريب واحدة. يمكنك إما ضبط هذه القيمة يدويًا ، أو تعيينها على tf.data.AUTOTUNE ، مما سيطلب من وقت تشغيل tf.data ضبط القيمة ديناميكيًا في وقت التشغيل.

لاحظ أن تحويل الجلب المسبق يوفر فوائد في أي وقت توجد فيه فرصة لتداخل عمل "المنتج" مع عمل "المستهلك".

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.20805208699994182

مخطط وقت تنفيذ البيانات - طريقة الجلب المسبق

الآن ، كما يظهر مخطط وقت تنفيذ البيانات ، أثناء تشغيل خطوة التدريب للعينة 0 ، يقرأ خط أنابيب الإدخال البيانات الخاصة بالعينة 1 ، وما إلى ذلك.

موازاة استخراج البيانات

في إعدادات العالم الحقيقي ، قد يتم تخزين بيانات الإدخال عن بُعد (على سبيل المثال ، على Google Cloud Storage أو HDFS). قد يتعرض خط أنابيب مجموعة البيانات الذي يعمل بشكل جيد عند قراءة البيانات محليًا للاختناق في الإدخال / الإخراج عند قراءة البيانات عن بُعد بسبب الاختلافات التالية بين التخزين المحلي والتخزين البعيد:

  • الوقت إلى البايت الأول : يمكن أن تستغرق قراءة البايت الأول من الملف من التخزين البعيد أوامر من حيث الحجم أطول من التخزين المحلي.
  • معدل نقل القراءة : بينما يوفر التخزين البعيد عادةً نطاقًا تردديًا مجمعًا كبيرًا ، قد تتمكن قراءة ملف واحد فقط من استخدام جزء صغير من هذا النطاق الترددي.

بالإضافة إلى ذلك ، بمجرد تحميل البايت الخام في الذاكرة ، قد يكون من الضروري أيضًا إلغاء تسلسل و / أو فك تشفير البيانات (مثل protobuf ) ، الأمر الذي يتطلب حسابًا إضافيًا. هذا الحمل موجود بغض النظر عما إذا كانت البيانات مخزنة محليًا أو عن بُعد ، ولكن يمكن أن تكون أسوأ في الحالة البعيدة إذا لم يتم جلب البيانات مسبقًا بشكل فعال.

للتخفيف من تأثير النفقات العامة لاستخراج البيانات المتنوعة ، يمكن استخدام التحويل tf.data.Dataset.interleave خطوة تحميل البيانات ، مع تشذير محتويات مجموعات البيانات الأخرى (مثل قارئات ملفات البيانات). يمكن تحديد عدد مجموعات البيانات المراد تداخلها بواسطة الوسيطة cycle_length ، بينما يمكن تحديد مستوى التوازي بواسطة الوسيطة num_parallel_calls . على غرار تحويل prefetch ، يدعم تحويل interleave tf.data.AUTOTUNE ، والذي tf.data.AUTOTUNE القرار حول مستوى التوازي الذي يجب استخدامه tf.data تشغيل tf.data .

تشذير متسلسل

تجعل الوسائط الافتراضية tf.data.Dataset.interleave تشذير عينات مفردة من مجموعتي بيانات بالتتابع.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4883518669998921

مخطط وقت تنفيذ البيانات - تشذير متسلسل

يسمح مخطط وقت تنفيذ البيانات هذا بعرض سلوك تحويل interleave ، وجلب العينات بدلاً من ذلك من مجموعتي البيانات المتاحتين. ومع ذلك ، لا يتم تضمين تحسين الأداء هنا.

تشذير متوازي

الآن، استخدم num_parallel_calls حجة interleave التحول. يؤدي هذا إلى تحميل مجموعات بيانات متعددة بشكل متوازٍ ، مما يقلل من وقت انتظار فتح الملفات.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.26920967700016263

مخطط وقت تنفيذ البيانات - طريقة التشذير المتوازي

هذه المرة ، كما يظهر مخطط وقت تنفيذ البيانات ، تتم قراءة مجموعتي البيانات بشكل متوازي ، مما يقلل من وقت معالجة البيانات العالمي.

موازاة تحويل البيانات

عند تحضير البيانات ، قد تحتاج عناصر الإدخال إلى المعالجة المسبقة. وتحقيقا لهذه الغاية، و tf.data العروض API في tf.data.Dataset.map التحول، وهو ما ينطبق دالة معرفة من قبل المستخدم إلى كل عنصر من عناصر بيانات الإدخال. نظرًا لأن عناصر الإدخال مستقلة عن بعضها البعض ، يمكن موازاة المعالجة المسبقة عبر أنوية وحدة المعالجة المركزية المتعددة. لجعل هذا ممكنا، على غرار prefetch و interleave التحولات، و map يوفر تحول num_parallel_calls حجة لتحديد مستوى التوازي.

يعتمد اختيار أفضل قيمة لوسيطة num_parallel_calls على أجهزتك ، وخصائص بيانات التدريب (مثل حجمها وشكلها) ، وتكلفة وظيفة الخريطة ، والمعالجة الأخرى التي تحدث على وحدة المعالجة المركزية في نفس الوقت. إرشاد بسيط هو استخدام عدد النوى المتاحة لوحدة المعالجة المركزية. ومع ذلك، أما بالنسبة لل prefetch و interleave التحول، و map التحول يدعم tf.data.AUTOTUNE التي سوف تفويض قرار حول ما هو مستوى التوازي الاستخدام إلى tf.data وقت التشغيل.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

رسم الخرائط المتسلسلة

ابدأ باستخدام تحويل map بدون التوازي كمثال أساسي.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4379127629999857

مخطط وقت تنفيذ البيانات - طريقة التعيين المتسلسل

بالنسبة للنهج الساذج ، هنا ، كما توضح الحبكة ، يتم جمع الأوقات التي يتم قضاؤها في الفتح والقراءة والمعالجة المسبقة (رسم الخرائط) والتدريب معًا لتكرار واحد.

رسم الخرائط المتوازية

الآن ، استخدم نفس وظيفة المعالجة المسبقة ولكن قم بتطبيقها بالتوازي على عينات متعددة.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2747970279999663

وقت تنفيذ البيانات - رسم الخرائط المتوازية

كما يوضح مخطط البيانات ، تتداخل خطوات المعالجة المسبقة ، مما يقلل الوقت الإجمالي لتكرار واحد.

التخزين المؤقت

يمكن للتحويل tf.data.Dataset.cache تخزين مجموعة بيانات مؤقتًا ، إما في الذاكرة أو على وحدة تخزين محلية. سيوفر هذا بعض العمليات (مثل فتح الملف وقراءة البيانات) من التنفيذ خلال كل حقبة.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3715158390000397

وقت تنفيذ البيانات - طريقة مجموعة البيانات المخزنة مؤقتًا

هنا ، يُظهر مخطط وقت تنفيذ البيانات أنه عند تخزين مجموعة بيانات مؤقتًا ، يتم تنفيذ التحويلات قبل cache (مثل فتح الملف وقراءة البيانات) فقط خلال الحقبة الأولى. ستعيد العهود التالية استخدام البيانات المخزنة مؤقتًا بواسطة تحويل cache .

إذا كانت الوظيفة المحددة من قبل المستخدم التي تم تمريرها إلى تحويل map باهظة الثمن ، فقم بتطبيق تحويل cache بعد تحويل map طالما أن مجموعة البيانات الناتجة لا تزال تتناسب مع الذاكرة أو التخزين المحلي. إذا زادت الوظيفة التي يحددها المستخدم من المساحة المطلوبة لتخزين مجموعة البيانات بما يتجاوز سعة ذاكرة التخزين المؤقت ، فقم إما بتطبيقها بعد تحويل cache أو ضع في اعتبارك المعالجة المسبقة لبياناتك قبل مهمة التدريب لتقليل استخدام الموارد.

رسم الخرائط الموجهة

إن استدعاء وظيفة معرّفة من قبل المستخدم تم تمريرها إلى تحويل map له أعباء مرتبطة بجدولة الوظيفة المحددة من قبل المستخدم وتنفيذها. قم بتوجيه الوظيفة المعرفة من قبل المستخدم (أي اجعلها تعمل عبر مجموعة من المدخلات في وقت واحد) وقم بتطبيق تحويل batch قبل تحويل map .

لتوضيح هذه الممارسة الجيدة ، فإن مجموعة البيانات الاصطناعية الخاصة بك ليست مناسبة. يبلغ تأخير الجدولة حوالي 10 ميكروثانية (10e-6 ثوانٍ) ، أي أقل بكثير من عشرات المللي ثانية المستخدمة في مجموعة البيانات ArtificialDataset ، وبالتالي يصعب رؤية تأثيرها.

في هذا المثال ، استخدم الدالة الأساسية tf.data.Dataset.range وقم بتبسيط حلقة التدريب إلى أبسط أشكالها.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

رسم الخرائط العددية

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.9082538790000854

وقت تنفيذ البيانات - طريقة الخريطة العددية

يوضح الرسم أعلاه ما يجري (مع عدد أقل من العينات) باستخدام طريقة رسم الخرائط العددية. يوضح أنه يتم تطبيق الوظيفة المعينة لكل عينة. في حين أن هذه الوظيفة سريعة جدًا ، إلا أن لها بعض النفقات العامة التي تؤثر على أداء الوقت.

رسم الخرائط الموجهة

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.03624614399996062

وقت تنفيذ البيانات - طريقة الخريطة الموجهة

هذه المرة ، يتم استدعاء الوظيفة المعينة مرة واحدة ويتم تطبيقها على مجموعة من العينات. كما يظهر مخطط وقت تنفيذ البيانات ، بينما قد تستغرق الوظيفة مزيدًا من الوقت للتنفيذ ، تظهر النفقات العامة مرة واحدة فقط ، مما يؤدي إلى تحسين أداء الوقت الإجمالي.

تقليل البصمة على الذاكرة

يحافظ عدد من التحويلات ، بما في ذلك interleave ، prefetch ، prefetch shuffle ، على مخزن مؤقت داخلي للعناصر. إذا كانت الوظيفة المعرفة من قبل المستخدم التي تم تمريرها في تحويل map تغير حجم العناصر ، فسيؤثر ترتيب تحويل الخريطة والتحولات التي تقوم بها عناصر المخزن المؤقت على استخدام الذاكرة. بشكل عام ، اختر الترتيب الذي ينتج عنه مساحة أقل للذاكرة ، ما لم يكن الترتيب المختلف مطلوبًا للأداء.

التخزين المؤقت للحسابات الجزئية

يوصى بتخزين مجموعة البيانات مؤقتًا بعد تحويل map إلا إذا كان هذا التحويل يجعل البيانات أكبر من أن تتناسب مع الذاكرة. يمكن تحقيق المفاضلة إذا كان من الممكن تقسيم الوظيفة المعينة إلى جزأين: جزء مستهلك للوقت وجزء مستهلك للذاكرة. في هذه الحالة ، يمكنك إجراء سلسلة من التحولات كما يلي:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

بهذه الطريقة ، يتم تنفيذ الجزء الذي يستغرق وقتًا طويلاً فقط خلال الحقبة الأولى ، وتتجنب استخدام مساحة كبيرة جدًا من ذاكرة التخزين المؤقت.

ملخص أفضل الممارسات

فيما يلي ملخص لأفضل الممارسات لتصميم أنابيب إدخال TensorFlow عالية الأداء:

استنساخ الأشكال

للتعمق في فهمtf.data.Dataset API ، يمكنك اللعبtf.data.Dataset الأنابيب الخاصة بك. يوجد أدناه الكود المستخدم لرسم الصور من هذا الدليل. يمكن أن تكون نقطة انطلاق جيدة ، حيث تعرض بعض الحلول للصعوبات الشائعة مثل:

  • وقت التنفيذ التكاثر
  • وظائف المعينة التنفيذ حريص
  • تحويل interleave callable
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

مجموعة البيانات

على غرار مجموعة البيانات ArtificialDataset يمكنك إنشاء مجموعة بيانات تعيد الوقت الذي تقضيه في كل خطوة.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

توفر مجموعة البيانات هذه عينات من الشكل [[2, 1], [2, 2], [2, 3]] ومن النوع [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . كل عينة هي:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

أين:

  • Open Read هما معرفات للخطوات
  • t0 هو الطابع الزمني عندما بدأت الخطوة المقابلة
  • d هو الوقت الذي يقضيه في الخطوة المقابلة
  • i هو فهرس المثيل
  • e هو مؤشر العصر (عدد مرات تكرار مجموعة البيانات)
  • s هو مؤشر العينة

حلقة التكرار

اجعل حلقة التكرار أكثر تعقيدًا قليلاً لتجميع كل التوقيتات. لن يعمل هذا إلا مع مجموعات البيانات التي تنشئ عينات كما هو مفصل أعلاه.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

طريقة التآمر

أخيرًا ، حدد وظيفة قادرة على رسم مخطط زمني وفقًا للقيم التي يتم إرجاعها بواسطة دالة timelined_benchmark .

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

استخدم الأغلفة للوظيفة المعينة

لتشغيل وظيفة تعيين في سياق حريص ، عليك لفها داخل استدعاء tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

مقارنة خطوط الأنابيب

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

ساذج

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 12.445692234000035

المحسن

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.326935971000012
draw_timeline(naive_timeline, "Naive", 15)

بي إن جي

draw_timeline(optimized_timeline, "Optimized", 15)

بي إن جي