সাহায্য Kaggle উপর TensorFlow সঙ্গে গ্রেট বেরিয়ার রিফ রক্ষা চ্যালেঞ্জ যোগদান

Tf.data API- এর সাথে আরও ভাল পারফরম্যান্স

TensorFlow.org এ দেখুন Google Colab-এ চালান GitHub-এ উৎস দেখুন নোটবুক ডাউনলোড করুন

ওভারভিউ

জিপিইউ এবং টিপিইউগুলি একটি একক প্রশিক্ষণ পদক্ষেপ চালানোর জন্য প্রয়োজনীয় সময়কে আমূল কমাতে পারে। সর্বোচ্চ কর্মক্ষমতা অর্জনের জন্য একটি দক্ষ ইনপুট পাইপলাইন প্রয়োজন যা বর্তমান পদক্ষেপটি শেষ হওয়ার আগে পরবর্তী ধাপের জন্য ডেটা সরবরাহ করে। tf.data এপিআই নমনীয় এবং দক্ষ ইনপুট পাইপলাইনগুলি গড়ে তুলতে সাহায্য করে। এই দস্তাবেজটি প্রমান কিভাবে ব্যবহার করতে tf.data এপিআই অত্যন্ত performant TensorFlow ইনপুট পাইপলাইনগুলি গড়ে তুলতে।

আপনি চালিয়ে যাওয়ার আগে, চেক TensorFlow বিল্ড ইনপুট পাইপলাইনগুলি কিভাবে ব্যবহার করবেন তা জানতে গাইড tf.data API- টি।

সম্পদ

সেটআপ

import tensorflow as tf

import time

এই নির্দেশিকা জুড়ে, আপনি একটি ডেটাসেট জুড়ে পুনরাবৃত্তি করবেন এবং কর্মক্ষমতা পরিমাপ করবেন। প্রজননযোগ্য কর্মক্ষমতা মানদণ্ড তৈরি করা কঠিন হতে পারে। প্রজননযোগ্যতাকে প্রভাবিত করে এমন বিভিন্ন কারণের মধ্যে রয়েছে:

  • বর্তমান CPU লোড
  • নেটওয়ার্ক ট্রাফিক
  • জটিল প্রক্রিয়া, যেমন ক্যাশে

একটি প্রজননযোগ্য বেঞ্চমার্ক পেতে, আপনি একটি কৃত্রিম উদাহরণ তৈরি করবেন।

ডেটাসেট

একটি বর্গ থেকে উত্তরাধিকার সূত্রে প্রাপ্ত সংজ্ঞা দিয়ে শুরু করুন tf.data.Dataset নামক ArtificialDataset । এই ডেটাসেট:

  • উত্পন্ন num_samples নমুনা (ডিফল্ট 3)
  • একটি ফাইল খোলার অনুকরণ করার জন্য প্রথম আইটেমের আগে কিছু সময়ের জন্য ঘুমায়
  • একটি ফাইল থেকে ডেটা পড়ার অনুকরণ করতে প্রতিটি আইটেম তৈরি করার আগে কিছু সময়ের জন্য ঘুমায়
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

এই সেটটি অনুরূপ tf.data.Dataset.range এক, এবং শুরুতে একটি নির্দিষ্ট বিলম্ব যোগ মধ্যবর্তী প্রতিটি নমুনা।

প্রশিক্ষণ লুপ

এরপরে, একটি ডামি প্রশিক্ষণ লুপ লিখুন যা একটি ডেটাসেটের উপর পুনরাবৃত্তি করতে কতক্ষণ সময় নেয় তা পরিমাপ করে। প্রশিক্ষণ সময় অনুকরণ করা হয়.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

কর্মক্ষমতা অপ্টিমাইজ করুন

কিভাবে কর্মক্ষমতা অপ্টিমাইজ করা যেতে পারে প্রদর্শন করার জন্য, আপনাকে কর্মক্ষমতা উন্নত করতে হবে ArtificialDataset

নিষ্পাপ পন্থা

কোনো কৌশল ব্যবহার না করে একটি নিষ্পাপ পাইপলাইন দিয়ে শুরু করুন, ডেটাসেট যেমন আছে তেমনিভাবে পুনরাবৃত্তি করুন।

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

হুডের নীচে, এইভাবে আপনার মৃত্যুদন্ডের সময় ব্যয় হয়েছিল:

ডেটা এক্সিকিউশন টাইম প্লট - একটি সরল পদ্ধতি

প্লটটি দেখায় যে একটি প্রশিক্ষণের পদক্ষেপের মধ্যে রয়েছে:

  • একটি ফাইল খোলা হচ্ছে যদি এটি এখনও খোলা না হয়
  • ফাইল থেকে একটি ডাটা এন্ট্রি আনা হচ্ছে
  • প্রশিক্ষণের জন্য ডেটা ব্যবহার করা

যাইহোক, এখানকার মতো একটি নিষ্পাপ সিঙ্ক্রোনাস বাস্তবায়নে, আপনার পাইপলাইন ডেটা আনার সময়, আপনার মডেলটি নিষ্ক্রিয় বসে আছে। বিপরীতভাবে, আপনার মডেল প্রশিক্ষণের সময়, ইনপুট পাইপলাইন নিষ্ক্রিয় বসে আছে। প্রশিক্ষণের ধাপের সময়টি এইভাবে খোলা, পড়া এবং প্রশিক্ষণের সময়ের সমষ্টি।

পরবর্তী বিভাগগুলি এই ইনপুট পাইপলাইনে তৈরি করে, পারফরম্যান্ট টেনসরফ্লো ইনপুট পাইপলাইন ডিজাইন করার জন্য সর্বোত্তম অনুশীলনগুলিকে চিত্রিত করে।

প্রিফেচিং

প্রিফেচিং একটি প্রশিক্ষণ ধাপের প্রিপ্রসেসিং এবং মডেল এক্সিকিউশনকে ওভারল্যাপ করে। মডেল নির্বাহ করা হয় প্রশিক্ষণ পদক্ষেপ s , ইনপুট পাইপলাইন জন্য পদক্ষেপ ডেটা পড়া হয় s+1 । এটি করার ফলে প্রশিক্ষণের ধাপের সময় সর্বাধিক (সমষ্টির বিপরীতে) এবং ডেটা বের করতে যে সময় লাগে তা হ্রাস করে।

tf.data API উপলব্ধকারী tf.data.Dataset.prefetch রূপান্তর। এটি ডেটা ব্যবহার করার সময় থেকে ডেটা উত্পাদিত হওয়ার সময়কে দ্বিগুণ করতে ব্যবহার করা যেতে পারে। বিশেষ করে, ট্রান্সফরমেশন একটি ব্যাকগ্রাউন্ড থ্রেড এবং একটি অভ্যন্তরীণ বাফার ব্যবহার করে ইনপুট ডেটাসেট থেকে উপাদানগুলিকে অনুরোধ করার আগে প্রিফেচ করতে। প্রিফেচ করার জন্য উপাদানের সংখ্যা একটি একক প্রশিক্ষণ পদক্ষেপের দ্বারা ব্যবহৃত ব্যাচের সংখ্যার সমান (বা সম্ভবত এর চেয়ে বেশি) হওয়া উচিত। হয় আপনি নিজে সুর এই মান পারা, অথবা এটি সেট tf.data.AUTOTUNE , যা বলবে tf.data রানটাইম সুর মান পরিবর্তনশীল রানটাইম এ।

মনে রাখবেন যে প্রিফেচ ট্রান্সফরমেশন সুবিধা প্রদান করে যে কোনো সময় "উৎপাদক" এর কাজকে "ভোক্তার" কাজের সাথে ওভারল্যাপ করার সুযোগ থাকে।

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

ডেটা এক্সিকিউশন টাইম প্লট - প্রিফেচিং পদ্ধতি

এখন, যেমন ডেটা এক্সিকিউশন টাইম প্লট দেখায়, যখন নমুনা 0-এর জন্য প্রশিক্ষণের ধাপ চলছে, ইনপুট পাইপলাইন নমুনা 1-এর জন্য ডেটা পড়ছে, ইত্যাদি।

সমান্তরাল তথ্য নিষ্কাশন

একটি বাস্তব-বিশ্বের সেটিংয়ে, ইনপুট ডেটা দূরবর্তীভাবে সংরক্ষণ করা যেতে পারে (উদাহরণস্বরূপ, Google ক্লাউড স্টোরেজ বা HDFS-এ)। একটি ডেটাসেট পাইপলাইন যা স্থানীয়ভাবে ডেটা পড়ার সময় ভাল কাজ করে তা স্থানীয় এবং দূরবর্তী স্টোরেজের মধ্যে নিম্নলিখিত পার্থক্যগুলির কারণে দূরবর্তীভাবে ডেটা পড়ার সময় I/O তে বাধা হয়ে যেতে পারে:

  • টাইম টু প্রথম বাইট: রিমোট স্টোরেজ থেকে একটি ফাইল প্রথম বাইট পঠন স্থানীয় সংগ্রহস্থল থেকে চেয়ে দীর্ঘতর মাত্রার আদেশ গ্রহণ করতে পারেন।
  • থ্রুপুট পড়ুন: যদিও দূরবর্তী সংগ্রহস্থল সাধারণত বৃহৎ সমষ্টিগত ব্যান্ডউইডথ উপলব্ধ করা হয়, একটি একক ফাইল পড়া শুধুমাত্র এই ব্যান্ডউইথ একটি ছোট ভগ্নাংশ ব্যবহার করতে সক্ষম হতে পারেন।

উপরন্তু, একবার কাঁচা বাইট মেমরিতে লোড করা হয়, এটি প্রয়োজনীয় deserialize এবং / অথবা ডেটা ডিক্রিপ্ট করতে হতে পারে (যেমন protobuf ), যা অতিরিক্ত গণনার প্রয়োজন। ডেটা স্থানীয়ভাবে বা দূরবর্তীভাবে সংরক্ষিত হোক না কেন এই ওভারহেডটি উপস্থিত থাকে, তবে ডেটা কার্যকরভাবে প্রিফেচ করা না হলে দূরবর্তী ক্ষেত্রে আরও খারাপ হতে পারে।

বিভিন্ন তথ্য নিষ্কাশন overheads প্রভাব প্রশমিত করার জন্য, tf.data.Dataset.interleave রূপান্তর (যেমন ডেটা ফাইল রিডারের মতো) অন্য ডেটাসেট বিষয়বস্তু interleaving, ডাটা লোডিং ধাপটি parallelize ব্যবহার করা যেতে পারে। অধিক্রমনের ডেটাসেট সংখ্যা দ্বারা চিহ্নিত করা যাবে cycle_length , যুক্তি যখন উপমা মাত্রা দ্বারা নির্দিষ্ট করা যেতে পারে num_parallel_calls যুক্তি। অনুরূপ prefetch রূপান্তর, interleave রূপান্তর সমর্থন tf.data.AUTOTUNE , যা ব্যবহার করতে উপমা কি স্তর সম্পর্কে সিদ্ধান্ত প্রতিনিধি হবে tf.data রানটাইম।

অনুক্রমিক ইন্টারলিভ

ডিফল্ট আর্গুমেন্ট tf.data.Dataset.interleave রূপান্তর এটি দুটি ডেটাসেট ক্রমানুসারে থেকে একক নমুনা বইয়ের পাতার মাঝে মাঝে আছে।

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

ডেটা এক্সিকিউশন টাইম প্লট - ক্রমিক ইন্টারলিভ

এই ডেটা সঞ্চালনের সময় প্লটের আচরণ প্রদর্শন করতে পারবেন interleave দুই প্রাপ্তিসাধ্য ডেটাসেট থেকে অন্যথায় নমুনা আনার সময়, রূপান্তর। যাইহোক, এখানে কোন কর্মক্ষমতা উন্নতি জড়িত নয়।

সমান্তরাল ইন্টারলিভ

এখন, ব্যবহার num_parallel_calls আর্গুমেন্ট interleave রূপান্তর। এটি সমান্তরালভাবে একাধিক ডেটাসেট লোড করে, ফাইলগুলি খোলার জন্য অপেক্ষা করার সময় কমিয়ে দেয়।

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

ডেটা এক্সিকিউশন টাইম প্লট - সমান্তরাল ইন্টারলিভ পদ্ধতি

এইবার, ডেটা এক্সিকিউশন টাইম প্লট দেখায়, দুটি ডেটাসেটের রিডিং সমান্তরালভাবে করা হয়েছে, যা গ্লোবাল ডেটা প্রসেসিং সময়কে কমিয়ে দিচ্ছে।

সমান্তরাল তথ্য রূপান্তর

ডেটা প্রস্তুত করার সময়, ইনপুট উপাদানগুলিকে প্রি-প্রসেস করার প্রয়োজন হতে পারে। এই শেষ, tf.data এপিআই অফার tf.data.Dataset.map রূপান্তর, ইনপুট ডেটা সেটটি প্রতিটি উপাদান জন্য একটি ব্যবহারকারী-সংজ্ঞায়িত ফাংশন প্রযোজ্য পারে। যেহেতু ইনপুট উপাদান একে অপরের থেকে স্বাধীন, তাই প্রাক-প্রক্রিয়াকরণ একাধিক CPU কোর জুড়ে সমান্তরাল হতে পারে। একইভাবে, এই সম্ভাব্য করতে prefetch এবং interleave রূপান্তরের, map রূপান্তর প্রদান করে num_parallel_calls উপমা মাত্রা নির্দিষ্ট করার যুক্তি।

জন্য শ্রেষ্ঠ মান নির্বাচন num_parallel_calls , আপনার মানচিত্র ফাংশন খরচ নেই, এবং অন্য কোন প্রক্রিয়াকরণ একই সময়ে CPU তে ঘটছে (যেমন এর আকার এবং আকৃতি যেমন) আপনার প্রশিক্ষণ ডেটা বৈশিষ্ট্য যুক্তি আপনার হার্ডওয়ার উপর নির্ভর করে। একটি সহজ হিউরিস্টিক হল উপলব্ধ CPU কোরের সংখ্যা ব্যবহার করা। যাইহোক, হিসাবে prefetch এবং interleave রূপান্তর, map রূপান্তর সমর্থন tf.data.AUTOTUNE যা ব্যবহারের উপমা কি স্তর সম্পর্কে সিদ্ধান্ত প্রতিনিধি হবে tf.data রানটাইম।

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

অনুক্রমিক ম্যাপিং

ব্যবহার করে শুরু map একটি বেসলাইন উদাহরণ হিসাবে উপমা ছাড়া রূপান্তর।

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

ডেটা এক্সিকিউশন টাইম প্লট - ক্রমিক ম্যাপিং পদ্ধতি

হিসাবে সাদাসিধা পদ্ধতির , এখানে, চক্রান্ত শো হিসাবে, খোলার সময় ব্যয়, পড়ার প্রাক-প্রসেসিং (ম্যাপিং) ও প্রশিক্ষণ পদক্ষেপ একটি একক পুনরাবৃত্তির জন্য একসাথে যোগফল।

সমান্তরাল ম্যাপিং

এখন, একই প্রাক-প্রসেসিং ফাংশন ব্যবহার করুন তবে এটি একাধিক নমুনায় সমান্তরালভাবে প্রয়োগ করুন।

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

ডেটা এক্সিকিউশন সময় - সমান্তরাল ম্যাপিং

ডেটা প্লট যেমন দেখায়, প্রি-প্রসেসিং ধাপগুলি ওভারল্যাপ করে, একক পুনরাবৃত্তির জন্য সামগ্রিক সময় কমিয়ে দেয়।

ক্যাশিং

tf.data.Dataset.cache রূপান্তর একটি ডেটাসেটের ক্যাশে করতে পারেন পারেন মেমরি বা স্থানীয় সংগ্রহস্থল উপর। এটি প্রতিটি যুগে কার্যকর হওয়া থেকে কিছু ক্রিয়াকলাপ (যেমন ফাইল খোলা এবং ডেটা পড়া) সংরক্ষণ করবে।

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

ডেটা সঞ্চালনের সময় - ক্যাশে করা ডেটাসেট পদ্ধতি

এখানে, ডাটা সঞ্চালনের সময় চক্রান্ত শো যে আপনি যখন একটি ডেটাসেটের ক্যাশে, আগে রূপান্তরের cache এক (ফাইল খোলার এবং ডেটা পড়ার মত) শুধুমাত্র প্রথম অধিযুগ সময় মৃত্যুদন্ড কার্যকর করা হয়। পরবর্তী সময়কাল তথ্য দ্বারা ক্যাশে পুনরায় ব্যবহার করতে হবে cache রূপান্তর।

ব্যবহারকারী-সংজ্ঞায়িত ফাংশনে অতিক্রান্ত হওয়া যদি map রূপান্তর ব্যয়বহুল আবেদন cache পর রূপান্তর map যতদিন ফলে ডেটা সেটটি এখনও মেমরি বা স্থানীয় সংগ্রহস্থল মধ্যে ফিট করতে পারে রূপান্তর। ব্যবহারকারী-সংজ্ঞায়িত ফাংশন বৃদ্ধি তাহলে স্থান ক্যাশে ক্ষমতা অতিক্রম ডেটা সেটটি সংরক্ষণ, নয়তো পরে এটি প্রয়োগ করা প্রয়োজন cache রূপান্তর বা আপনার ডেটা প্রাক প্রক্রিয়াকরণ বিবেচনা আপনার প্রশিক্ষণ কাজ সামনে রিসোর্স ব্যবহার কমাতে।

ভেক্টরাইজিং ম্যাপিং

ব্যবহারকারী-নির্ধারিত ফাংশনে অতিক্রান্ত হওয়া invoking map রূপান্তর ওভারহেড সিডিউলিং এর সাথে সম্পর্কিত হয়েছে এবং ব্যবহারকারী-সংজ্ঞায়িত ফাংশন নির্বাহ। ব্যবহারকারী-সংজ্ঞায়িত ফাংশন Vectorize (যে, এটা একবারে ইনপুট একটি ব্যাচ উপর চালনা আছে) এবং প্রয়োগ batch সামনে রূপান্তর map রূপান্তর।

এই ভাল অভ্যাসটি ব্যাখ্যা করার জন্য, আপনার কৃত্রিম ডেটাসেট উপযুক্ত নয়। পূর্বপরিকল্পনা বিলম্ব, 10 মাইক্রোসেকেন্ড (10e -6 সেকেন্ড) প্রায় পর্যন্ত ব্যবহৃত মিলিসেকেন্ড দশ কম ArtificialDataset , এবং এইভাবে তার প্রভাব দেখতে কঠিন।

এই উদাহরণস্বরূপ, বেস ব্যবহার tf.data.Dataset.range ফাংশন এবং সরলতম ফর্ম প্রশিক্ষণ লুপ সহজ করে।

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

স্কেলার ম্যাপিং

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

ডেটা এক্সিকিউশন সময় - স্কেলার ম্যাপ পদ্ধতি

উপরের প্লটটি স্কেলার ম্যাপিং পদ্ধতি ব্যবহার করে (কম নমুনা সহ) কী চলছে তা চিত্রিত করে। এটি দেখায় যে ম্যাপড ফাংশন প্রতিটি নমুনার জন্য প্রয়োগ করা হয়। যদিও এই ফাংশনটি খুব দ্রুত, এটির কিছু ওভারহেড রয়েছে যা সময়ের কর্মক্ষমতাকে প্রভাবিত করে।

ভেক্টরাইজড ম্যাপিং

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

ডেটা এক্সিকিউশন সময় - ভেক্টরাইজড ম্যাপ পদ্ধতি

এই সময়, ম্যাপ করা ফাংশন একবার বলা হয় এবং নমুনার একটি ব্যাচে প্রযোজ্য। ডেটা এক্সিকিউশন টাইম প্লট দেখায়, যখন ফাংশনটি কার্যকর করতে আরও সময় লাগতে পারে, ওভারহেড শুধুমাত্র একবার প্রদর্শিত হয়, সামগ্রিক সময়ের কর্মক্ষমতা উন্নত করে।

মেমরি পদচিহ্ন হ্রাস

রূপান্তরের একটি নম্বর সহ interleave , prefetch , এবং shuffle , উপাদানের একটি অভ্যন্তরীণ বাফার বজায় রাখা। ব্যবহারকারী-সংজ্ঞায়িত ফাংশনে অতিক্রান্ত হওয়া যদি map রূপান্তর উপাদান, এরপর সেটি মানচিত্রের রূপান্তরের ক্রম এবং রূপান্তরের যে বাফার উপাদান মেমোরি ব্যবহার প্রভাবিত আকার পরিবর্তন। সাধারণভাবে, এমন ক্রম বেছে নিন যার ফলে মেমরির পদচিহ্ন কম হয়, যদি না পারফরম্যান্সের জন্য ভিন্ন ক্রম বাঞ্ছনীয় হয়।

আংশিক গণনা ক্যাশিং

এটা তোলে পর ডেটা সেটটি ক্যাশে বাঞ্ছনীয় map রূপান্তর ব্যতীত যদি এই রূপান্তর ডেটাটি অত্যন্ত মেমরি মাপসই বড় করে তোলে। একটি ট্রেড-অফ অর্জন করা যেতে পারে যদি আপনার ম্যাপ করা ফাংশন দুটি অংশে বিভক্ত করা যায়: একটি সময় গ্রাসকারী এবং একটি মেমরি গ্রাসকারী অংশ। এই ক্ষেত্রে, আপনি নীচের মত আপনার রূপান্তর চেইন করতে পারেন:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

এইভাবে, সময় গ্রাসকারী অংশটি শুধুমাত্র প্রথম যুগের সময় কার্যকর করা হয় এবং আপনি অত্যধিক ক্যাশে স্থান ব্যবহার করা এড়ান।

সেরা অনুশীলন সারাংশ

এখানে পারফরম্যান্স টেনসরফ্লো ইনপুট পাইপলাইন ডিজাইন করার জন্য সর্বোত্তম অনুশীলনের একটি সারসংক্ষেপ রয়েছে:

পরিসংখ্যান পুনরুত্পাদন

মধ্যে গভীরে যেতে tf.data.Dataset এপিআই বুঝতে, আপনি আপনার নিজের পাইপলাইনগুলি সঙ্গে খেলা করতে পারেন। নীচে এই নির্দেশিকা থেকে ইমেজ প্লট ব্যবহৃত কোড. এটি একটি ভাল সূচনা বিন্দু হতে পারে, সাধারণ অসুবিধাগুলির জন্য কিছু সমাধান দেখায় যেমন:

  • সঞ্চালনের সময় প্রজননযোগ্যতা
  • ম্যাপ করা ফাংশন আগ্রহী এক্সিকিউশন
  • interleave রূপান্তর callable
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

ডেটাসেট

অনুরূপ ArtificialDataset সময় প্রতিটি পদক্ষেপ অতিবাহিত ফিরে একটি ডেটাসেটের নির্মাণ করতে পারেন।

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

এই সেটটি আকৃতি নমুনা প্রদান করে [[2, 1], [2, 2], [2, 3]] এবং ধরনের [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] । প্রতিটি নমুনা হল:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

কোথায়:

  • Open এবং Read পদক্ষেপ শনাক্তকারী হয়
  • t0 টাইমস্ট্যাম্প যখন সংশ্লিষ্ট পদক্ষেপ শুরু
  • d সময় সংশ্লিষ্ট ধাপে খরচ হয়ে গেছে
  • i উদাহরণ হিসেবে বলা যায় সূচি
  • e যুগান্তকারী ইনডেক্স (যতবার ডেটা সেটটি iterated হয়েছে) হয়
  • s নমুনা সূচি

পুনরাবৃত্তি লুপ

সমস্ত সময় একত্রিত করতে পুনরাবৃত্তি লুপটিকে একটু জটিল করে তুলুন। এটি শুধুমাত্র উপরে বর্ণিত নমুনা তৈরি করে এমন ডেটাসেটের সাথে কাজ করবে।

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

প্লটিং পদ্ধতি

অবশেষে, একটি ফাংশন টাইমলাইনে মান দ্বারা ফিরে দেওয়া প্লটে বিভক্ত করতে সক্ষম সংজ্ঞায়িত timelined_benchmark ফাংশন।

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

ম্যাপ করা ফাংশন জন্য wrappers ব্যবহার করুন

একটি উৎসুক প্রেক্ষাপটে ম্যাপ ফাংশন চালাতে, আপনি তাদের একটি ভিতরে মোড়ানো আছে tf.py_function কল।

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

পাইপলাইন তুলনা

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

নিষ্পাপ

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

অপ্টিমাইজ করা হয়েছে

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png