مساعدة في حماية الحاجز المرجاني العظيم مع TensorFlow على Kaggle تاريخ التحدي

أداء أفضل مع tf.data API

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يمكن لوحدات معالجة الرسومات (GPU) و (TPU) تقليل الوقت المطلوب بشكل جذري لتنفيذ خطوة تدريب واحدة. يتطلب تحقيق ذروة الأداء وجود خط إدخال فعال يوفر البيانات للخطوة التالية قبل انتهاء الخطوة الحالية. و tf.data API يساعد على بناء خطوط أنابيب إدخال مرنة وفعالة. يوضح هذا المستند كيفية استخدام tf.data API لبناء performant للغاية أنابيب TensorFlow الإدخال.

قبل المتابعة، تحقق من خطوط الأنابيب مدخلات البناء TensorFlow توجه لمعرفة كيفية استخدام tf.data API.

موارد

اقامة

import tensorflow as tf

import time

خلال هذا الدليل ، سوف تتكرر عبر مجموعة بيانات وتقيس الأداء. قد يكون من الصعب إجراء مقاييس أداء قابلة للتكرار. تشمل العوامل المختلفة التي تؤثر على قابلية التكاثر ما يلي:

  • الحمل الحالي لوحدة المعالجة المركزية
  • حركة مرور الشبكة
  • آليات معقدة ، مثل ذاكرة التخزين المؤقت

للحصول على معيار قابل للتكرار ، ستقوم ببناء مثال مصطنع.

مجموعة البيانات

نبدأ مع تعريف فئة وراثة من tf.data.Dataset دعا ArtificialDataset . مجموعة البيانات هذه:

  • يولد num_samples عينات (الافتراضي هو 3)
  • ينام لبعض الوقت قبل العنصر الأول لمحاكاة فتح ملف
  • ينام لبعض الوقت قبل إنتاج كل عنصر لمحاكاة قراءة البيانات من ملف
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

هذه البينات يشبه tf.data.Dataset.range واحد، إضافة تأخير الثابتة في بداية وفي الفترات الفاصلة بين كل عينة.

حلقة التدريب

بعد ذلك ، اكتب حلقة تدريب وهمية تقيس الوقت المستغرق للتكرار على مجموعة بيانات. يتم محاكاة وقت التدريب.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

تحسين الأداء

لعرض كيف يمكن تحسين أداء، وسوف تحسين أداء ArtificialDataset .

النهج الساذج

ابدأ بخط أنابيب ساذج بدون حيل ، وقم بالتكرار على مجموعة البيانات كما هي.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

تحت الغطاء ، هذه هي الطريقة التي أمضيت بها وقت التنفيذ:

مخطط وقت تنفيذ البيانات - طريقة ساذجة

توضح الحبكة أن تنفيذ خطوة تدريبية تتضمن:

  • فتح ملف إذا لم يتم فتحه بعد
  • إحضار إدخال بيانات من الملف
  • استخدام البيانات للتدريب

ومع ذلك ، في تنفيذ متزامن ساذج مثل هنا ، بينما يقوم خط الأنابيب الخاص بك بجلب البيانات ، يكون نموذجك في وضع الخمول. على العكس من ذلك ، أثناء تدريب النموذج الخاص بك ، يكون خط أنابيب الإدخال في وضع الخمول. وبالتالي ، فإن وقت خطوة التدريب هو مجموع أوقات الفتح والقراءة والتدريب.

تعتمد الأقسام التالية على خط أنابيب الإدخال هذا ، مما يوضح أفضل الممارسات لتصميم أنابيب إدخال TensorFlow ذات الأداء العالي.

الجلب المسبق

يتداخل الجلب المسبق مع المعالجة المسبقة وتنفيذ النموذج لخطوة التدريب. في حين أن نموذج يتم تنفيذ التدريب خطوة s ، خط أنابيب مدخلات يقوم بقراءة البيانات لخطوة s+1 . يؤدي القيام بذلك إلى تقليل وقت الخطوة إلى الحد الأقصى (على عكس المجموع) للتدريب والوقت الذي يستغرقه استخراج البيانات.

و tf.data يوفر API ل tf.data.Dataset.prefetch التحول. يمكن استخدامه لفصل الوقت الذي يتم فيه إنتاج البيانات عن وقت استهلاك البيانات. على وجه الخصوص ، يستخدم التحويل مؤشر ترابط في الخلفية ومخزنًا مؤقتًا داخليًا للجلب المسبق للعناصر من مجموعة بيانات الإدخال قبل الوقت المطلوب. يجب أن يكون عدد العناصر المطلوب الجلب المسبق مساويًا (أو ربما أكبر من) عدد الدُفعات المستهلكة في خطوة تدريب واحدة. هل يمكن إما يدويا ضبط هذه القيمة، أو تعيينها إلى tf.data.AUTOTUNE ، الذي سيدفع tf.data وقت التشغيل لضبط قيمة حيوي في وقت التشغيل.

لاحظ أن تحويل الجلب المسبق يوفر فوائد في أي وقت توجد فيه فرصة لتداخل عمل "منتج" مع عمل "المستهلك".

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

مخطط وقت تنفيذ البيانات - طريقة الجلب المسبق

الآن ، كما يظهر مخطط وقت تنفيذ البيانات ، أثناء تشغيل خطوة التدريب للعينة 0 ، يقرأ خط أنابيب الإدخال البيانات الخاصة بالعينة 1 ، وما إلى ذلك.

موازاة استخراج البيانات

في إعدادات العالم الحقيقي ، قد يتم تخزين بيانات الإدخال عن بُعد (على سبيل المثال ، على Google Cloud Storage أو HDFS). قد يتعرض خط أنابيب مجموعة البيانات الذي يعمل بشكل جيد عند قراءة البيانات محليًا للاختناق في الإدخال / الإخراج عند قراءة البيانات عن بُعد بسبب الاختلافات التالية بين التخزين المحلي والتخزين البعيد:

  • الوقت للوصول الى البايت الأول: قراءة البايت الأول من ملف من التخزين البعيد يمكن أن تتخذ أوامر من حجم أطول من من التخزين المحلي.
  • قراءة الإنتاجية: في حين أن تخزين بعيد عادة يوفر عرض النطاق الترددي الكلي كبير، وقراءة ملف واحد قد تكون قادرة على الاستفادة من جزء صغير من هذا النطاق الترددي فقط.

وبالإضافة إلى ذلك، مرة واحدة يتم تحميل بايت الخام في الذاكرة، قد يكون من الضروري أيضا إلغاء تسلسل و / أو فك تشفير البيانات (على سبيل المثال protobuf )، الأمر الذي يتطلب حساب إضافي. هذا الحمل موجود بغض النظر عما إذا كانت البيانات مخزنة محليًا أو عن بُعد ، ولكن يمكن أن تكون أسوأ في الحالة البعيدة إذا لم يتم جلب البيانات مسبقًا بشكل فعال.

للتخفيف من تأثير مختلف النفقات استخراج البيانات، و tf.data.Dataset.interleave التحول يمكن استخدامها لتتم بشكل مواز الخطوة تحميل البيانات، التداخل محتويات قواعد البيانات الأخرى (مثل قارئات ملف البيانات). يمكن تحديد عدد من مجموعات البيانات إلى تداخل من قبل cycle_length حجة، في حين يمكن تحديد مستوى التوازي من قبل num_parallel_calls حجة. على غرار prefetch التحول، و interleave التحول يدعم tf.data.AUTOTUNE ، والتي سوف تفويض قرار حول ما هو مستوى التوازي الاستخدام إلى tf.data وقت التشغيل.

تشذير متسلسل

الحجج الافتراضي ل tf.data.Dataset.interleave التحول تجعل من تعشيق عينات واحدة من مجموعتين من البيانات بشكل تسلسلي.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

مخطط وقت تنفيذ البيانات - تشذير متسلسل

هذه المرة مؤامرة تنفيذ البيانات تسمح لعرض سلوك interleave التحول، وجلب عينات بدلا من اثنين من مجموعات البيانات المتاحة. ومع ذلك ، لا يتم تضمين تحسين الأداء هنا.

تشذير متوازي

الآن، استخدم num_parallel_calls حجة interleave التحول. يؤدي هذا إلى تحميل مجموعات بيانات متعددة بشكل متوازٍ ، مما يقلل من وقت انتظار فتح الملفات.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

مخطط وقت تنفيذ البيانات - طريقة التشذير المتوازي

هذه المرة ، كما يظهر مخطط وقت تنفيذ البيانات ، تتم قراءة مجموعتي البيانات بشكل متوازي ، مما يقلل من وقت معالجة البيانات العالمي.

موازاة تحويل البيانات

عند تحضير البيانات ، قد تحتاج عناصر الإدخال إلى المعالجة المسبقة. وتحقيقا لهذه الغاية، و tf.data العروض API في tf.data.Dataset.map التحول، وهو ما ينطبق دالة معرفة من قبل المستخدم إلى كل عنصر من عناصر بيانات الإدخال. نظرًا لأن عناصر الإدخال مستقلة عن بعضها البعض ، يمكن موازاة المعالجة المسبقة عبر أنوية وحدة المعالجة المركزية المتعددة. لجعل هذا ممكنا، على غرار prefetch و interleave التحولات، و map يوفر تحول num_parallel_calls حجة لتحديد مستوى التوازي.

اختيار أفضل قيمة مقابل num_parallel_calls حجة يعتمد على الأجهزة الخاصة بك، وخصائص البيانات التدريب الخاص بك (مثل حجمه وشكله)، وتكلفة وظيفة الخريطة، وما معالجة أخرى يحدث على وحدة المعالجة المركزية في نفس الوقت. إرشاد بسيط هو استخدام عدد النوى المتاحة لوحدة المعالجة المركزية. ومع ذلك، أما بالنسبة لل prefetch و interleave التحول، و map التحول يدعم tf.data.AUTOTUNE التي سوف تفويض قرار حول ما هو مستوى التوازي الاستخدام إلى tf.data وقت التشغيل.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

رسم الخرائط المتسلسلة

بدء باستخدام map تحول دون التوازي كمثال الأساس.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

مخطط وقت تنفيذ البيانات - طريقة التعيين المتسلسل

أما بالنسبة لل نهج ساذج ، هنا، كما يظهر في الرسم، وعن الأوقات التي عشتها لفتح، قراءة، قبل تجهيز (الخرائط) وخطوات التدريب تلخيص معا من أجل التكرار واحد.

رسم الخرائط المتوازية

الآن ، استخدم نفس وظيفة المعالجة المسبقة ولكن قم بتطبيقها بالتوازي على عينات متعددة.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

وقت تنفيذ البيانات - رسم الخرائط المتوازية

كما يوضح مخطط البيانات ، تتداخل خطوات المعالجة المسبقة ، مما يقلل الوقت الإجمالي لتكرار واحد.

التخزين المؤقت

و tf.data.Dataset.cache التحول يمكن تخزين مجموعة بيانات، إما في الذاكرة أو على التخزين المحلي. سيوفر هذا بعض العمليات (مثل فتح الملف وقراءة البيانات) من التنفيذ خلال كل حقبة.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

وقت تنفيذ البيانات - طريقة مجموعة البيانات المخزنة مؤقتًا

هنا، في وقت التنفيذ البيانات تظهر المؤامرة التي عند تخزين مجموعة بيانات، التحولات قبل cache يتم تنفيذ واحد (مثل فتح الملف وقراءة البيانات) فقط خلال الحقبة الأولى. والحقب القادمة إعادة استخدام البيانات المخزنة مؤقتا من قبل cache التحول.

إذا كانت دالة معرفة من قبل المستخدم تمريرها إلى map التحول مكلفة، وتطبيق cache التحول بعد map التحول طالما مجموعة البيانات الناتجة يمكن أن ندخلها في الذاكرة أو التخزين المحلي. إذا يزيد من دالة معرفة من قبل المستخدم المساحة المطلوبة لتخزين بيانات تتجاوز قدرة ذاكرة التخزين المؤقت، إما تطبيقه بعد cache التحول أو النظر قبل معالجة البيانات الخاصة بك قبل التدريب أثناء العمل لتقليل استخدام الموارد.

رسم الخرائط الموجهة

استدعاء دالة معرفة من قبل المستخدم تمريرها إلى map التحول قد ارتبط النفقات العامة للجدولة وتنفيذ دالة معرفة من قبل المستخدم. Vectorize دالة معرفة من قبل المستخدم (أي أنها قد تعمل على مجموعة من المدخلات في آن واحد) وتطبيق batch التحول قبل map التحول.

لتوضيح هذه الممارسة الجيدة ، فإن مجموعة البيانات الاصطناعية الخاصة بك ليست مناسبة. تأخير جدولة حوالي 10 ميكروثانية (10E-6 ثوان)، أقل بكثير من عشرات الالف المستخدمة في ArtificialDataset ، وبالتالي تأثيرها من الصعب أن نرى.

على سبيل المثال، استخدام قاعدة tf.data.Dataset.range وظيفة وتبسيط حلقة تدريبية لأبسط أشكالها.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

رسم الخرائط العددية

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

وقت تنفيذ البيانات - طريقة الخريطة العددية

يوضح الرسم أعلاه ما يجري (مع عدد أقل من العينات) باستخدام طريقة رسم الخرائط العددية. يوضح أنه يتم تطبيق الوظيفة المعينة لكل عينة. في حين أن هذه الوظيفة سريعة جدًا ، إلا أن لها بعض النفقات العامة التي تؤثر على أداء الوقت.

رسم الخرائط الموجهة

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

وقت تنفيذ البيانات - طريقة الخريطة الموجهة

هذه المرة ، يتم استدعاء الوظيفة المعينة مرة واحدة ويتم تطبيقها على مجموعة من العينات. كما يظهر مخطط وقت تنفيذ البيانات ، بينما قد تستغرق الوظيفة مزيدًا من الوقت للتنفيذ ، تظهر النفقات العامة مرة واحدة فقط ، مما يؤدي إلى تحسين أداء الوقت الإجمالي.

تقليل البصمة على الذاكرة

وهناك عدد من التحولات، بما في ذلك interleave ، prefetch ، و shuffle ، والحفاظ على وجود مخزن مؤقت داخلي من العناصر. إذا كانت دالة معرفة من قبل المستخدم تمريرها إلى map التحول تغيير حجم العناصر، ثم ترتيب التحول الخريطة والتحولات التي تؤثر عناصر عازلة استخدام الذاكرة. بشكل عام ، اختر الترتيب الذي ينتج عنه مساحة أقل للذاكرة ، ما لم يكن الترتيب المختلف مطلوبًا للأداء.

التخزين المؤقت للحسابات الجزئية

فمن المستحسن لتخزين ورقة العمل بعد map التحول إلا إذا كان هذا التحول يجعل بيانات كبيرة جدا لتناسب في الذاكرة. يمكن تحقيق المفاضلة إذا كان من الممكن تقسيم الوظيفة المعينة إلى جزأين: جزء مستهلك للوقت وجزء يستهلك ذاكرة. في هذه الحالة ، يمكنك إجراء سلسلة من التحولات كما يلي:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

بهذه الطريقة ، يتم تنفيذ الجزء الذي يستغرق وقتًا طويلاً فقط خلال الحقبة الأولى ، وتتجنب استخدام مساحة كبيرة جدًا من ذاكرة التخزين المؤقت.

ملخص أفضل الممارسات

فيما يلي ملخص لأفضل الممارسات لتصميم أنابيب إدخال TensorFlow عالية الأداء:

استنساخ الأشكال

إلى التعمق في tf.data.Dataset فهم API، يمكنك ان تلعب مع خطوط الأنابيب الخاصة بك. يوجد أدناه الكود المستخدم لرسم الصور من هذا الدليل. يمكن أن تكون نقطة انطلاق جيدة ، حيث تعرض بعض الحلول للصعوبات الشائعة مثل:

  • وقت التنفيذ التكاثر
  • وظائف المعينة التنفيذ حريص
  • interleave للاستدعاء التحول
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

مجموعة البيانات

على غرار ArtificialDataset تتمكن من بناء مجموعة بيانات عودته من الوقت الذي يقضيه في كل خطوة.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

يوفر هذه البينات عينات من شكل [[2, 1], [2, 2], [2, 3]] ونوع [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . كل عينة هي:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

أين:

  • Open و Read خطوات معرفات
  • t0 هو الطابع الزمني عندما بدأت الخطوة المقابلة
  • d هو الوقت الذي يقضيه في خطوة المقابلة
  • i هو مؤشر المثال
  • e هو مؤشر عصر (عدد المرات التي تم التكرارية مجموعة البيانات)
  • s هو مؤشر عينة

حلقة التكرار

اجعل حلقة التكرار أكثر تعقيدًا قليلاً لتجميع كل التوقيتات. سيعمل هذا فقط مع مجموعات البيانات التي تنشئ عينات كما هو مفصل أعلاه.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

طريقة التآمر

وأخيرا، وتحديد وظيفة قادرة على رسم جدول زمني معين القيم التي يتم إرجاعها من قبل timelined_benchmark وظيفة.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

استخدم الأغلفة للوظيفة المعينة

لتشغيل وظيفة المعينة في سياق حريصة، لديك للالتفاف عليها داخل tf.py_function المكالمة.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

مقارنة خطوط الأنابيب

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

ساذج

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

المحسن

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

بي إن جي

draw_timeline(optimized_timeline, "Optimized", 15)

بي إن جي