सहायता Kaggle पर TensorFlow साथ ग्रेट बैरियर रीफ की रक्षा चैलेंज में शामिल हों

tf.data API के साथ बेहतर प्रदर्शन

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

GPU और TPU एकल प्रशिक्षण चरण को निष्पादित करने के लिए आवश्यक समय को मौलिक रूप से कम कर सकते हैं। चरम प्रदर्शन प्राप्त करने के लिए एक कुशल इनपुट पाइपलाइन की आवश्यकता होती है जो वर्तमान चरण समाप्त होने से पहले अगले चरण के लिए डेटा वितरित करती है। tf.data API लचीला और कुशल इनपुट पाइपलाइनों का निर्माण करने में मदद करता है। इस दस्तावेज़ को दर्शाता है कि कैसे उपयोग करने के लिए tf.data एपीआई उच्च performant TensorFlow इनपुट पाइपलाइनों का निर्माण।

आगे बढ़ने से पहले, जाँच TensorFlow बिल्ड इनपुट पाइपलाइनों कैसे उपयोग करने के लिए सीखने के लिए मार्गदर्शन tf.data एपीआई।

साधन

सेट अप

import tensorflow as tf

import time

इस संपूर्ण मार्गदर्शिका में, आप एक डेटासेट पर पुनरावृति करेंगे और प्रदर्शन को मापेंगे। प्रतिलिपि प्रस्तुत करने योग्य प्रदर्शन बेंचमार्क बनाना मुश्किल हो सकता है। प्रजनन क्षमता को प्रभावित करने वाले विभिन्न कारकों में शामिल हैं:

  • वर्तमान सीपीयू लोड
  • नेटवर्क यातायात
  • जटिल तंत्र, जैसे कैश

एक प्रतिलिपि प्रस्तुत करने योग्य बेंचमार्क प्राप्त करने के लिए, आप एक कृत्रिम उदाहरण का निर्माण करेंगे।

डेटासेट

एक वर्ग से इनहेरिट को परिभाषित करने के साथ शुरू करो tf.data.Dataset बुलाया ArtificialDataset । यह डेटासेट:

  • उत्पन्न num_samples नमूने (डिफ़ॉल्ट 3 है)
  • फ़ाइल खोलने के अनुकरण के लिए पहले आइटम से पहले कुछ समय के लिए सो जाता है
  • फ़ाइल से डेटा पढ़ने का अनुकरण करने के लिए प्रत्येक आइटम को बनाने से पहले कुछ समय के लिए सो जाता है
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

यह डेटासेट के समान है tf.data.Dataset.range एक है, और की शुरुआत में एक निश्चित देरी जोड़ने के बीच प्रत्येक नमूने।

ट्रेनिंग लूप

इसके बाद, एक डमी ट्रेनिंग लूप लिखें जो मापता है कि किसी डेटासेट पर पुनरावृति करने में कितना समय लगता है। प्रशिक्षण का समय सिम्युलेटेड है।

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

प्रदर्शन का अनुकूलन करें

कैसे प्रदर्शन अनुकूलित किया जा सकता है प्रदर्शन के लिए, आप के प्रदर्शन में सुधार होगा ArtificialDataset

भोला दृष्टिकोण

बिना किसी तरकीब का उपयोग करते हुए एक भोली पाइपलाइन से शुरू करें, जैसा कि है, डेटासेट पर पुनरावृति।

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

हुड के तहत, आपका निष्पादन समय इस प्रकार व्यतीत हुआ:

डेटा निष्पादन समय की साजिश - एक भोली विधि

साजिश से पता चलता है कि एक प्रशिक्षण कदम का प्रदर्शन करना शामिल है:

  • फ़ाइल खोलना यदि वह अभी तक नहीं खोली गई है
  • फ़ाइल से डेटा प्रविष्टि लाई जा रही है
  • प्रशिक्षण के लिए डेटा का उपयोग करना

हालाँकि, यहाँ जैसे सरल तुल्यकालिक कार्यान्वयन में, जबकि आपकी पाइपलाइन डेटा ला रही है, आपका मॉडल निष्क्रिय है। इसके विपरीत, जब आपका मॉडल प्रशिक्षण ले रहा होता है, तो इनपुट पाइपलाइन बेकार बैठी होती है। इस प्रकार प्रशिक्षण चरण का समय उद्घाटन, पढ़ने और प्रशिक्षण के समय का योग है।

अगले खंड इस इनपुट पाइपलाइन का निर्माण करते हैं, जो प्रदर्शनकारी TensorFlow इनपुट पाइपलाइनों को डिजाइन करने के सर्वोत्तम अभ्यासों को दर्शाता है।

प्रीफेचिंग

प्रीफ़ेचिंग प्रशिक्षण चरण के प्रीप्रोसेसिंग और मॉडल निष्पादन को ओवरलैप करता है। मॉडल को क्रियान्वित किया जाता है प्रशिक्षण कदम s , इनपुट पाइप लाइन के लिए कदम डेटा पढ़ रही है s+1 । ऐसा करने से चरण का समय प्रशिक्षण के अधिकतम (योग के विपरीत) और डेटा निकालने में लगने वाले समय को कम कर देता है।

tf.data एपीआई प्रदान करता है tf.data.Dataset.prefetch परिवर्तन। इसका उपयोग उस समय को कम करने के लिए किया जा सकता है जब डेटा की खपत के समय से डेटा का उत्पादन किया जाता है। विशेष रूप से, परिवर्तन अनुरोधित समय से पहले इनपुट डेटासेट से तत्वों को प्रीफ़ेच करने के लिए पृष्ठभूमि थ्रेड और आंतरिक बफर का उपयोग करता है। प्रीफ़ेच करने के लिए तत्वों की संख्या एकल प्रशिक्षण चरण द्वारा खपत किए गए बैचों की संख्या के बराबर (या संभवतः इससे अधिक) होनी चाहिए। आप या तो स्वयं धुन यह मान सकता है, या के लिए सेट tf.data.AUTOTUNE है, जो संकेत देगा tf.data क्रम धुन पर मूल्य गतिशील रनटाइम पर।

ध्यान दें कि प्रीफेच परिवर्तन किसी भी समय "उपभोक्ता" के काम के साथ "निर्माता" के काम को ओवरलैप करने का अवसर प्रदान करता है।

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

डेटा निष्पादन समय प्लॉट - प्रीफ़ेचिंग विधि

अब, जैसा कि डेटा निष्पादन समय प्लॉट दिखाता है, जबकि नमूना 0 के लिए प्रशिक्षण चरण चल रहा है, इनपुट पाइपलाइन नमूना 1 के लिए डेटा पढ़ रही है, और इसी तरह।

समानांतर डेटा निष्कर्षण

वास्तविक दुनिया की सेटिंग में, इनपुट डेटा को दूरस्थ रूप से संग्रहीत किया जा सकता है (उदाहरण के लिए, Google क्लाउड स्टोरेज या एचडीएफएस पर)। एक डेटासेट पाइपलाइन जो स्थानीय रूप से डेटा पढ़ते समय अच्छी तरह से काम करती है, स्थानीय और दूरस्थ संग्रहण के बीच निम्न अंतरों के कारण दूरस्थ रूप से डेटा पढ़ते समय I/O पर अड़चन बन सकती है:

  • समय-टू-पहले-बाइट: दूरस्थ संग्रहण से फ़ाइल की पहली बाइट पढ़ना स्थानीय भंडारण से अधिक लंबी परिमाण के आदेश लेना कर सकते हैं।
  • Throughput पढ़ें: एक ओर जहां दूरदराज के भंडारण आम तौर पर बड़े कुल बैंडविड्थ प्रदान करता है, एक एकल फाइल पढ़ने केवल इस बैंडविड्थ की एक छोटा सा अंश उपयोग करने में सक्षम हो सकता है।

इसके अलावा, एक बार कच्चे बाइट्स मेमोरी में लोड कर रहे हैं, यह भी आवश्यक deserialize और / या डेटा डिक्रिप्ट करने के लिए हो सकता है (उदाहरण के लिए Protobuf ) है, जो अतिरिक्त परिकलन करना पड़ता है। यह ओवरहेड मौजूद है चाहे डेटा स्थानीय रूप से या दूरस्थ रूप से संग्रहीत किया गया हो, लेकिन यदि डेटा को प्रभावी ढंग से प्रीफ़ेच नहीं किया जाता है तो दूरस्थ मामले में यह बदतर हो सकता है।

विभिन्न डेटा निष्कर्षण ओवरहेड्स के प्रभाव को कम करने के लिए, tf.data.Dataset.interleave परिवर्तन (जैसे कि डेटा फ़ाइल पाठकों के रूप में) अन्य ऐसे डेटासेट की सामग्री interleaving, डेटा लोड कदम parallelize के लिए इस्तेमाल किया जा सकता है। ओवरलैप करने के लिए डेटासेट की संख्या से निर्दिष्ट किया जा सकता cycle_length , तर्क, जबकि समानांतरवाद के स्तर द्वारा निर्दिष्ट किया जा सकता num_parallel_calls तर्क। करने के लिए इसी तरह के prefetch परिवर्तन, interleave परिवर्तन का समर्थन करता है tf.data.AUTOTUNE है, जो करने के लिए उपयोग करने के लिए समानांतरवाद का स्तर क्या के बारे में निर्णय प्रतिनिधि होगा tf.data क्रम।

अनुक्रमिक इंटरलीव

के डिफ़ॉल्ट तर्क tf.data.Dataset.interleave परिवर्तन यह दोनों डेटासेट क्रमिक रूप से से एक नमूने बिछा सकते हैं।

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

डेटा निष्पादन समय प्लॉट - अनुक्रमिक इंटरलीव

इस डेटा निष्पादन समय साजिश के व्यवहार प्रदर्शन करने की अनुमति देता है interleave दो उपलब्ध डेटासेट से वैकल्पिक रूप से नमूने लेते समय परिवर्तन। हालांकि, यहां कोई प्रदर्शन सुधार शामिल नहीं है।

समानांतर इंटरलीव

अब, का उपयोग num_parallel_calls का तर्क interleave परिवर्तन। यह समानांतर में कई डेटासेट लोड करता है, जिससे फाइलों के खुलने की प्रतीक्षा में लगने वाला समय कम हो जाता है।

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

डेटा निष्पादन समय प्लॉट - समानांतर इंटरलीव विधि

इस बार, जैसा कि डेटा निष्पादन समय प्लॉट दिखाता है, दो डेटासेट की रीडिंग समानांतर है, जिससे वैश्विक डेटा प्रोसेसिंग समय कम हो जाता है।

समानांतर डेटा परिवर्तन

डेटा तैयार करते समय, इनपुट तत्वों को पूर्व-संसाधित करने की आवश्यकता हो सकती है। इस उद्देश्य के लिए tf.data एपीआई प्रदान करता है tf.data.Dataset.map परिवर्तन, इनपुट डाटासेट के प्रत्येक तत्व के लिए एक उपयोगकर्ता परिभाषित समारोह लागू होता है। चूंकि इनपुट तत्व एक दूसरे से स्वतंत्र होते हैं, इसलिए प्री-प्रोसेसिंग को कई सीपीयू कोर में समानांतर किया जा सकता है। इसी तरह करने के लिए, यह संभव बनाने के prefetch और interleave परिवर्तनों, map परिवर्तन प्रदान करता है num_parallel_calls समानांतरवाद के स्तर को निर्दिष्ट करने के तर्क।

के लिए सबसे अच्छा मूल्य का चयन num_parallel_calls , अपने नक्शे समारोह की लागत, और क्या अन्य संसाधन एक ही समय में सीपीयू पर हो रहा है (जैसे कि इसके आकार और आकार के रूप में) अपने प्रशिक्षण डेटा की विशेषताओं तर्क आपके हार्डवेयर पर निर्भर करता है। उपलब्ध सीपीयू कोर की संख्या का उपयोग करने के लिए एक साधारण अनुमानी है। हालांकि, के लिए के रूप में prefetch और interleave परिवर्तन, map परिवर्तन का समर्थन करता है tf.data.AUTOTUNE जो करने के लिए उपयोग करने के लिए समानांतरवाद का स्तर क्या के बारे में निर्णय प्रतिनिधि होगा tf.data क्रम।

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

अनुक्रमिक मानचित्रण

का उपयोग करके शुरू map एक आधारभूत उदाहरण के रूप में समानांतरवाद बिना परिवर्तन।

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

डेटा निष्पादन समय प्लॉट - अनुक्रमिक मानचित्रण विधि

के रूप में अनुभवहीन दृष्टिकोण , यहाँ, साजिश है, जैसा कि बार खोलने के लिए खर्च किया, पढ़ना, पूर्व प्रसंस्करण (मैपिंग) और प्रशिक्षण चरणों एक एकल पुनरावृत्ति के लिए एक साथ योग।

समानांतर मानचित्रण

अब, उसी प्री-प्रोसेसिंग फ़ंक्शन का उपयोग करें, लेकिन इसे कई नमूनों पर समानांतर में लागू करें।

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

डेटा निष्पादन समय - समानांतर मानचित्रण

जैसा कि डेटा प्लॉट प्रदर्शित करता है, पूर्व-प्रसंस्करण चरण ओवरलैप होते हैं, जिससे एकल पुनरावृत्ति के लिए समग्र समय कम हो जाता है।

कैशिंग

tf.data.Dataset.cache परिवर्तन एक डाटासेट कैश कर सकते हैं, या तो स्मृति में या स्थानीय भंडारण पर। यह प्रत्येक युग के दौरान निष्पादित होने से कुछ संचालन (जैसे फ़ाइल खोलने और डेटा पढ़ने) को बचाएगा।

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

डेटा निष्पादन समय - कैश्ड डेटासेट विधि

इधर, डेटा निष्पादन समय साजिश से पता चलता है कि जब आप एक डाटासेट कैश, इससे पहले कि परिवर्तनों cache एक (फ़ाइल खोलने और पढ़ने डेटा की तरह) केवल पहले युग के दौरान क्रियान्वित कर रहे हैं। अगले अवधियों को डेटा द्वारा कैश का पुन: उपयोग होगा cache परिवर्तन।

उपयोगकर्ता परिभाषित समारोह में पारित कर दिया तो map परिवर्तन महंगा है, लागू cache के बाद परिवर्तन map जब तक जिसके परिणामस्वरूप डाटासेट अभी भी स्मृति या स्थानीय भंडारण में फिट कर सकते हैं परिवर्तन। उपयोगकर्ता परिभाषित समारोह बढ़ जाती है, तो अंतरिक्ष कैश क्षमता से परे डाटासेट की दुकान, या तो के बाद इसे लागू करने के लिए आवश्यक cache परिवर्तन या अपने डेटा पूर्व प्रसंस्करण पर विचार अपने प्रशिक्षण नौकरी से पहले संसाधन के उपयोग को कम करने के लिए।

वेक्टरिंग मैपिंग

उपयोगकर्ता परिभाषित समारोह में पारित कर दिया लागू map परिवर्तन भूमि के ऊपर शेड्यूलिंग से संबंधित है और उपयोगकर्ता-निर्धारित फ़ंक्शन को निष्पादित। उपयोगकर्ता परिभाषित समारोह vectorize (यह है कि, यह एक बार में आदानों की एक बैच में काम किया है) और लागू batch से पहले परिवर्तन map परिवर्तन।

इस अच्छे अभ्यास को स्पष्ट करने के लिए, आपका कृत्रिम डेटासेट उपयुक्त नहीं है। शेड्यूलिंग देरी, 10 माइक्रोसेकंड (10e-6 सेकंड) के आसपास है अब तक में इस्तेमाल किया मिलीसेकेंड के दसियों से कम ArtificialDataset , और इस तरह इसके प्रभाव को देखने के लिए कठिन है।

इस उदाहरण के लिए, आधार का उपयोग tf.data.Dataset.range समारोह और इसके सरलतम रूप को प्रशिक्षण पाश को आसान बनाने में।

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

अदिश मानचित्रण

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

डेटा निष्पादन समय - अदिश नक्शा विधि

ऊपर दिया गया प्लॉट दिखाता है कि स्केलर मैपिंग पद्धति का उपयोग करके क्या हो रहा है (कम नमूनों के साथ)। यह दर्शाता है कि प्रत्येक नमूने के लिए मैप किए गए फ़ंक्शन को लागू किया जाता है। हालांकि यह फ़ंक्शन बहुत तेज़ है, इसमें कुछ ओवरहेड हैं जो समय के प्रदर्शन को प्रभावित करते हैं।

वेक्टरकृत मानचित्रण

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

डेटा निष्पादन समय - वेक्टरकृत मानचित्र विधि

इस बार, मैप किए गए फ़ंक्शन को एक बार कॉल किया जाता है और नमूने के एक बैच पर लागू होता है। जैसा कि डेटा निष्पादन समय प्लॉट दिखाता है, जबकि फ़ंक्शन को निष्पादित करने में अधिक समय लग सकता है, ओवरहेड केवल एक बार दिखाई देता है, समग्र समय प्रदर्शन में सुधार करता है।

स्मृति पदचिह्न को कम करना

परिवर्तनों का एक नंबर, सहित interleave , prefetch , और shuffle , तत्वों की एक आंतरिक बफर बनाए रखें। उपयोगकर्ता परिभाषित समारोह में पारित कर दिया तो map परिवर्तन तत्वों, तो नक्शा परिवर्तन के आदेश और परिवर्तनों कि बफर तत्वों स्मृति के उपयोग को प्रभावित करता है के आकार बदल जाता है। सामान्य तौर पर, उस क्रम को चुनें जिसके परिणामस्वरूप कम मेमोरी फ़ुटप्रिंट होता है, जब तक कि प्रदर्शन के लिए अलग-अलग ऑर्डरिंग वांछनीय न हो।

कैशिंग आंशिक गणना

यह बाद डाटासेट कैश करने के लिए सिफारिश की है map परिवर्तन को छोड़कर यदि इस बदलाव डेटा भी स्मृति में फिट करने के लिए बड़ा बना देता है। यदि आपके मैप किए गए फ़ंक्शन को दो भागों में विभाजित किया जा सकता है, तो एक ट्रेड-ऑफ प्राप्त किया जा सकता है: एक समय लेने वाला और एक मेमोरी लेने वाला भाग। इस मामले में, आप अपने परिवर्तनों को नीचे की तरह श्रृंखलाबद्ध कर सकते हैं:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

इस तरह, समय लेने वाला हिस्सा केवल पहले युग के दौरान ही निष्पादित होता है, और आप बहुत अधिक कैश स्थान का उपयोग करने से बचते हैं।

सर्वोत्तम अभ्यास सारांश

प्रदर्शनकारी TensorFlow इनपुट पाइपलाइनों को डिजाइन करने के लिए सर्वोत्तम प्रथाओं का सारांश यहां दिया गया है:

आंकड़ों का पुनरुत्पादन

में गहराई में जाने के लिए tf.data.Dataset एपीआई समझ, आप अपने खुद के पाइपलाइनों के साथ खेल सकते हैं। नीचे इस गाइड से छवियों को प्लॉट करने के लिए इस्तेमाल किया गया कोड है। सामान्य कठिनाइयों के लिए कुछ समाधान दिखाते हुए यह एक अच्छा प्रारंभिक बिंदु हो सकता है, जैसे:

  • निष्पादन समय प्रतिलिपि प्रस्तुत करने योग्यता
  • मैप किए गए कार्य उत्सुक निष्पादन
  • interleave परिवर्तन प्रतिदेय
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

डेटासेट

करने के लिए इसी तरह के ArtificialDataset आप समय हर कदम में खर्च लौटने एक डाटासेट निर्माण कर सकते हैं।

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

यह डेटासेट आकार के नमूने प्रदान करता है [[2, 1], [2, 2], [2, 3]] और प्रकार के [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] । प्रत्येक नमूना है:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

कहा पे:

  • Open और Read चरणों पहचानकर्ता हैं
  • t0 टाइमस्टैम्प जब इसी कदम शुरू कर दिया है
  • d समय इसी चरण में खर्च किया है
  • i उदाहरण के सूचकांक है
  • e युग सूचकांक (समय की संख्या डाटासेट दोहराया गया है) है
  • s नमूना सूचकांक है

पुनरावृत्ति लूप

सभी समयों को एकत्रित करने के लिए पुनरावृत्ति लूप को थोड़ा और जटिल बनाएं। यह केवल ऊपर बताए अनुसार नमूने जनरेट करने वाले डेटासेट के साथ काम करेगा।

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

प्लॉटिंग विधि

अंत में, एक समारोह एक समय मूल्यों से लौटे दिया साजिश करने में सक्षम परिभाषित timelined_benchmark कार्य करते हैं।

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

मैप किए गए फ़ंक्शन के लिए रैपर का उपयोग करें

एक उत्सुक संदर्भ में मैप किया गया समारोह चलाने के लिए, आप उन्हें एक अंदर रैप करने के लिए है tf.py_function कॉल।

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

पाइपलाइन तुलना

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

अनाड़ी

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

अनुकूलित

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

पीएनजी

draw_timeline(optimized_timeline, "Optimized", 15)

पीएनजी