इस पेज का अनुवाद Cloud Translation API से किया गया है.
Switch to English

Tf.data API के साथ बेहतर प्रदर्शन

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

जीपीयू और टीपीयू मौलिक रूप से एकल प्रशिक्षण कदम को निष्पादित करने के लिए आवश्यक समय को कम कर सकते हैं। चोटी के प्रदर्शन को प्राप्त करने के लिए एक कुशल इनपुट पाइपलाइन की आवश्यकता होती है जो वर्तमान चरण समाप्त होने से पहले अगले चरण के लिए डेटा वितरित करती है। tf.data API लचीला और कुशल इनपुट पाइपलाइन बनाने में मदद करता है। यह दस्तावेज़ दर्शाता है कि अत्यधिक प्रदर्शन करने वाले TensorFlow इनपुट पाइपलाइनों के निर्माण के लिए tf.data API का उपयोग कैसे करें।

इससे पहले कि आप जारी रखें, tf.data API का उपयोग करने के तरीके के बारे में जानने के लिए " बिल्ड TensorFlow इनपुट पाइपलाइनों " गाइड tf.data

साधन

सेट अप

import tensorflow as tf

import time

इस गाइड के दौरान, आप एक डेटासेट में पुनरावृति करेंगे और प्रदर्शन को मापेंगे। प्रतिलिपि प्रस्तुत करने योग्य प्रदर्शन बेंचमार्क बनाना मुश्किल हो सकता है, इसे प्रभावित करने वाले विभिन्न कारक:

  • वर्तमान सीपीयू लोड,
  • नेटवर्क ट्रैफ़िक,
  • जटिल तंत्र जैसे कैश इत्यादि।

इसलिए, एक प्रतिलिपि प्रस्तुत करने योग्य बेंचमार्क प्रदान करने के लिए, एक कृत्रिम उदाहरण का निर्माण करें।

डेटासेट

tf.data.Dataset नामक ArtificialDataset से विरासत में मिली एक कक्षा को परिभाषित करें। यह डेटासेट:

  • num_samples नमूने उत्पन्न करता है (डिफ़ॉल्ट 3 है)
  • फ़ाइल खोलने के लिए अनुकरण करने वाले पहले आइटम से पहले कुछ समय के लिए सोता है
  • एक फ़ाइल से डेटा पढ़ने के अनुकरण के लिए प्रत्येक आइटम का उत्पादन करने से पहले कुछ समय के लिए सोता है
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)
        
        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)
            
            yield (sample_idx,)
    
    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=tf.dtypes.int64,
            output_shapes=(1,),
            args=(num_samples,)
        )

यह डेटासेट tf.data.Dataset.range समान है, शुरुआत में और प्रत्येक नमूने के बीच एक निश्चित विलंब जोड़ रहा है।

प्रशिक्षण पाश

डमी प्रशिक्षण लूप लिखें जो मापता है कि किसी डेटासेट पर पुनरावृति करने में कितना समय लगता है। प्रशिक्षण का समय नकली है।

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    tf.print("Execution time:", time.perf_counter() - start_time)

प्रदर्शन का अनुकूलन करें

यह प्रदर्शित करने के लिए कि प्रदर्शन को कैसे अनुकूलित किया जा सकता है, आप ArtificialDataset के प्रदर्शन में सुधार करेंगे।

भोला दृष्टिकोण

बिना चाल के एक भोली पाइपलाइन के साथ शुरू करें, डेटासेट के रूप में पुनरावृति।

benchmark(ArtificialDataset())
Execution time: 0.2530532629998561

हुड के तहत, इस तरह से आपके निष्पादन का समय व्यतीत हुआ:

अनुभवहीन

आप देख सकते हैं कि प्रशिक्षण चरण में शामिल करना शामिल है:

  • यदि यह अभी तक नहीं खोला गया है तो एक फ़ाइल खोलना,
  • फ़ाइल से डेटा प्रविष्टि प्राप्त करना,
  • प्रशिक्षण के लिए डेटा का उपयोग करना।

हालांकि, यहां एक भोले तुल्यकालिक कार्यान्वयन में, जबकि आपकी पाइपलाइन डेटा प्राप्त कर रही है, आपका मॉडल बेकार बैठा है। इसके विपरीत, जब आपका मॉडल प्रशिक्षण दे रहा होता है, तो इनपुट पाइपलाइन बेकार बैठी रहती है। प्रशिक्षण चरण का समय इस प्रकार है, खुलने, पढ़ने और प्रशिक्षण का समय।

अगले खंड इस इनपुट पाइपलाइन पर निर्माण करते हैं, प्रदर्शनकारी TensorFlow इनपुट पाइपलाइनों को डिजाइन करने के लिए सर्वोत्तम प्रथाओं का चित्रण करते हैं।

प्रीफेचिंग

प्रीफेटिंग एक प्रशिक्षण चरण के प्रीप्रोसेसिंग और मॉडल निष्पादन को ओवरलैप करता है। जबकि मॉडल प्रशिक्षण चरण s निष्पादित कर रहा है, इनपुट पाइपलाइन चरण s+1 लिए डेटा पढ़ रही है। ऐसा करने से प्रशिक्षण के अधिकतम (योग के विपरीत) कदम समय कम हो जाता है और डेटा निकालने में समय लगता है।

tf.data API tf.data.Dataset.prefetch परिवर्तन प्रदान करता है। इसका उपयोग उस समय को कम करने के लिए किया जा सकता है जब डेटा का उपभोग किया जाता है। विशेष रूप से, परिवर्तन समय-समय पर इनपुट डेटासेट से तत्वों को प्रीफ़ैच करने के लिए बैकग्राउंड थ्रेड और आंतरिक बफर का उपयोग करता है। प्रीफ़ैच करने वाले तत्वों की संख्या एकल प्रशिक्षण चरण द्वारा उपभोग किए गए बैचों की संख्या के बराबर (या संभवतः अधिक से अधिक) होनी चाहिए। आप या तो मैन्युअल रूप से इस मान को ट्यून कर सकते हैं, या इसे tf.data.experimental.AUTOTUNE सेट कर tf.data.experimental.AUTOTUNE जो tf.data रनटाइम को गतिशील रूप से मान को ट्यून करने के लिए संकेत देगा।

ध्यान दें कि प्रीफ़ैच परिवर्तन किसी भी समय लाभ प्रदान करता है "उपभोक्ता" के काम के साथ "निर्माता" के काम को ओवरलैप करने का अवसर है।

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.experimental.AUTOTUNE)
)
Execution time: 0.20858672200006367

प्रीफ़ेच

इस बार आप देख सकते हैं कि प्रशिक्षण चरण नमूना 0 के लिए चल रहा है, इनपुट पाइप लाइन नमूना 1 के लिए डेटा पढ़ रहा है, और इसी तरह।

डेटा निष्कर्षण को समानांतर करना

वास्तविक दुनिया की सेटिंग में, इनपुट डेटा को दूरस्थ रूप से संग्रहीत किया जा सकता है (उदाहरण के लिए, जीसीएस या एचडीएफएस)। एक डेटासेट पाइपलाइन, जो स्थानीय रूप से डेटा पढ़ने पर अच्छी तरह से काम करती है, स्थानीय और दूरस्थ भंडारण के बीच निम्नलिखित अंतरों के कारण दूरस्थ रूप से डेटा पढ़ते समय I / O पर अड़चन बन सकती है:

  • टाइम-टू-फर्स्ट-बाइट: रिमोट स्टोरेज से किसी फाइल के पहले बाइट को पढ़ने से लोकल स्टोरेज की तुलना में ज्यादा समय तक ऑर्डर मिल सकता है।
  • थ्रूपुट पढ़ें: जबकि रिमोट स्टोरेज आमतौर पर बड़ी एग्रीगेट बैंडविड्थ प्रदान करता है, एक फाइल को पढ़ना केवल इस बैंडविड्थ के एक छोटे से हिस्से का उपयोग करने में सक्षम हो सकता है।

इसके अलावा, एक बार जब कच्ची बाइट्स को मेमोरी में लोड कर दिया जाता है, तो डेटा को डीसर्विलाइज़ और / या डिक्रिप्ट करना भी आवश्यक हो सकता है (जैसे प्रोटोबुफ़ ), जिसे अतिरिक्त कम्प्यूटेशन की आवश्यकता होती है। यह ओवरहेड मौजूद है चाहे डेटा स्थानीय रूप से संग्रहीत हो या दूरस्थ रूप से, लेकिन यदि डेटा प्रभावी रूप से प्रीफ़ेट नहीं किया गया है, तो दूरस्थ स्थिति में खराब हो सकता है।

विभिन्न डेटा निष्कर्षण ओवरहेड्स के प्रभाव को कम करने के लिए, tf.data.Dataset.interleave परिवर्तन का उपयोग डेटा लोडिंग चरण को समानांतर करने के लिए किया जा सकता है, अन्य डेटासेट (जैसे डेटा फ़ाइल रीडर) की सामग्री को tf.data.Dataset.interleave किया जा सकता है। ओवरलैप के डेटासेट की संख्या को cycle_length तर्क द्वारा निर्दिष्ट किया जा सकता है, जबकि cycle_length के स्तर को num_parallel_calls तर्क द्वारा निर्दिष्ट किया जा सकता है। करने के लिए इसी तरह के prefetch परिवर्तन, interleave परिवर्तन का समर्थन करता है tf.data.experimental.AUTOTUNE जिसके बारे में निर्णय प्रतिनिधि होगा क्या करने के लिए उपयोग करने के लिए समानांतरवाद के स्तर tf.data क्रम।

अनुक्रमिक इंटरलेवेव

tf.data.Dataset.interleave परिवर्तन के डिफ़ॉल्ट तर्क इसे दो डेटासेट से क्रमिक रूप से एकल नमूने बनाते हैं।

benchmark(
    tf.data.Dataset.range(2)
    .interleave(ArtificialDataset)
)
Execution time: 0.2373930549999841

अनुक्रमिक इंटरलेवेव

यह प्लॉट interleave परिवर्तन के व्यवहार को प्रदर्शित करने की अनुमति देता है, जो उपलब्ध दो डेटासेट से वैकल्पिक रूप से नमूने ले रहा है। हालांकि, यहां कोई प्रदर्शन सुधार शामिल नहीं है।

समानांतर इंटरलेव

अब interleave परिवर्तन के num_parallel_calls तर्क का उपयोग करें। यह समानांतर में कई डेटासेट लोड करता है, जिससे फाइलों के खुलने का समय कम हो जाता है।

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        ArtificialDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)
Execution time: 0.1730301249999684

समानांतर इंटरलेव

इस बार, दो डेटासेटों के पठन को वैश्विक डेटा प्रोसेसिंग समय को कम करते हुए, समानांतर किया गया है।

डेटा परिवर्तन को समानांतर करना

डेटा तैयार करते समय, इनपुट तत्वों को पूर्व-संसाधित करने की आवश्यकता हो सकती है। इसके लिए, tf.data API tf.data.Dataset.map ट्रांसफ़ॉर्मेशन प्रदान करता है, जो इनपुट डेटासेट के प्रत्येक तत्व के लिए एक यूज़र-डिफ़ाइंड फ़ंक्शन को लागू करता है। क्योंकि इनपुट तत्व एक दूसरे से स्वतंत्र होते हैं, इसलिए प्री-प्रोसेसिंग को कई सीपीयू कोर में समानांतर किया जा सकता है। इसी तरह करने के लिए, यह संभव बनाने के prefetch और interleave परिवर्तनों, map परिवर्तन प्रदान करता है num_parallel_calls समानांतरवाद के स्तर को निर्दिष्ट करने के तर्क।

num_parallel_calls तर्क के लिए सर्वोत्तम मूल्य चुनना आपके हार्डवेयर, आपके प्रशिक्षण डेटा की विशेषताओं (जैसे इसका आकार और आकार), आपके मानचित्र फ़ंक्शन की लागत और एक ही समय में CPU पर अन्य प्रसंस्करण क्या हो रहा है, पर निर्भर करता है। एक साधारण अनुमानी उपलब्ध सीपीयू कोर की संख्या का उपयोग करना है। हालांकि, के लिए के रूप में prefetch और interleave परिवर्तन, map परिवर्तन का समर्थन करता है tf.data.experimental.AUTOTUNE जो करने के लिए उपयोग करने के लिए समानांतरवाद का स्तर क्या के बारे में निर्णय प्रतिनिधि होगा tf.data क्रम।

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

अनुक्रमिक मानचित्रण

आधारभूत उदाहरण के रूप में समानता के बिना map परिवर्तन का उपयोग करके शुरू करें।

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.43913738300011573

अनुक्रमिक मानचित्रण

जैसा कि भोले दृष्टिकोण के लिए , यहाँ खोलने, पढ़ने, पूर्व-प्रसंस्करण (मानचित्रण) और प्रशिक्षण चरणों के लिए खर्च किया गया एक एकल प्रवाह के लिए एक साथ योग है।

समानांतर मानचित्रण

अब, समान प्री-प्रोसेसिंग फ़ंक्शन का उपयोग करें लेकिन इसे कई नमूनों पर समानांतर में लागू करें।

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)
Execution time: 0.2730358689998411

समानांतर मानचित्रण

अब, आप प्लॉट पर देख सकते हैं कि प्री-प्रोसेसिंग स्टेप्स ओवरलैप करते हैं, एक एकल पुनरावृत्ति के लिए समग्र समय को कम करते हैं।

कैशिंग

tf.data.Dataset.cache परिवर्तन किसी डेटासेट को कैश कर सकता है, या तो मेमोरी में या लोकल स्टोरेज पर। यह कुछ संचालन (जैसे फ़ाइल खोलने और डेटा पढ़ने) को प्रत्येक युग के दौरान निष्पादित होने से बचाएगा।

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.36568501300007483

कैश्ड डेटासेट

जब आप एक डाटासेट को कैश करते हैं, तो cache एक (फ़ाइल खोलने और डेटा पढ़ने की तरह) से पहले परिवर्तन केवल पहले युग के दौरान निष्पादित होते हैं। अगले युग cache परिवर्तन द्वारा cache किए गए डेटा का पुन: उपयोग करेगा।

यदि उपयोगकर्ता-निर्धारित फ़ंक्शन map परिवर्तन में पारित महंगा है, तो map परिवर्तन के बाद cache रूपांतरण लागू करें जब तक कि डेटासेट अभी भी मेमोरी या स्थानीय भंडारण में फिट हो सकता है। यदि उपयोगकर्ता द्वारा परिभाषित फ़ंक्शन, कैश क्षमता से परे डेटासेट को संग्रहीत करने के लिए आवश्यक स्थान को बढ़ाता है, तो इसे या तो cache परिवर्तन के बाद लागू cache या संसाधन उपयोग को कम करने के लिए अपने प्रशिक्षण कार्य से पहले अपने डेटा को पूर्व-प्रसंस्करण पर विचार करें।

वेक्टरिंग मैपिंग

map परिवर्तन में पारित उपयोगकर्ता-परिभाषित फ़ंक्शन को लागू करने से उपयोगकर्ता-निर्धारित फ़ंक्शन को शेड्यूल करने और निष्पादित करने से संबंधित ओवरहेड होता है। हम उपयोगकर्ता-परिभाषित फ़ंक्शन को वेक्टर करने की सलाह देते हैं (अर्थात, एक बार में आदानों के एक बैच पर काम करते हैं) और map परिवर्तन से पहले batch परिवर्तन लागू करें।

इस अच्छे अभ्यास का वर्णन करने के लिए, आपका कृत्रिम डेटासेट उपयुक्त नहीं है। शेड्यूलिंग में देरी 10 माइक्रोसेकंड (10e-6 सेकंड) के आसपास होती है, जो ArtificialDataset में इस्तेमाल किए गए दसियों मिलीसेकंड से भी कम है, और इस तरह इसका असर देखना मुश्किल है।

इस उदाहरण के लिए, आधार tf.data.Dataset.range फ़ंक्शन का उपयोग करें और प्रशिक्षण लूप को उसके सरलतम रूप में सरल करें।

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)
    
def increment(x):
    return x+1

स्केलर मैपिंग

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.8861004689999845

स्केलर का नक्शा

ऊपर दिए गए कथानक से पता चलता है कि क्या चल रहा है (कम नमूनों के साथ)। आप देख सकते हैं कि प्रत्येक नमूने के लिए मैप किए गए फ़ंक्शन को लागू किया गया है। जबकि यह फ़ंक्शन बहुत तेज़ है, इसमें कुछ ओवरहेड हैं जो समय प्रदर्शन को प्रभावित करते हैं।

सदिश मानचित्रण

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
5663abbbfd

सदिश मानचित्र

इस बार, मैप किए गए फ़ंक्शन को एक बार बुलाया जाता है और नमूने के एक बैच पर लागू होता है। जबकि फ़ंक्शन को निष्पादित करने में अधिक समय लग सकता है, ओवरहेड केवल एक बार दिखाई देता है, समग्र समय प्रदर्शन में सुधार करता है।

स्मृति पदचिह्न कम करना

interleave , prefetch और shuffle सहित कई रूपांतरण, तत्वों के आंतरिक बफर को बनाए रखते हैं। यदि उपयोगकर्ता-परिभाषित फ़ंक्शन map परिवर्तन में पारित हो जाता है, तो तत्वों का आकार बदल जाता है, फिर मानचित्र परिवर्तन का क्रम और रूपांतरण जो बफर तत्वों को स्मृति उपयोग को प्रभावित करते हैं। सामान्य तौर पर, हम उस आदेश को चुनने की सलाह देते हैं, जिसके परिणामस्वरूप कम मेमोरी फ़ुटप्रिंट होता है, जब तक कि अलग-अलग ऑर्डर प्रदर्शन के लिए वांछनीय न हों।

आंशिक अभिकलन कैशिंग

map परिवर्तन के बाद डेटासेट को कैश करने की सिफारिश की जाती है, यदि यह परिवर्तन डेटा को स्मृति में फिट करने के लिए बहुत बड़ा बनाता है। यदि आपके मैप किए गए फ़ंक्शन को दो भागों में विभाजित किया जा सकता है तो एक ट्रेड-ऑफ प्राप्त किया जा सकता है: एक समय लेने वाला और एक मेमोरी लेने वाला हिस्सा। इस स्थिति में, आप नीचे दिए गए परिवर्तनों की श्रृंखला बना सकते हैं:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

इस तरह, समय लेने वाला हिस्सा केवल पहले युग के दौरान निष्पादित किया जाता है, और आप बहुत अधिक कैश स्थान का उपयोग करने से बचते हैं।

सबसे अच्छा अभ्यास सारांश

यहाँ प्रस्तुतकर्ता TensorFlow इनपुट पाइपलाइनों के डिजाइन के लिए सर्वोत्तम प्रथाओं का सारांश दिया गया है:

आंकड़ों को फिर से प्रस्तुत करना

tf.data.Dataset एपीआई समझ में गहराई तक जाने के लिए, आप अपनी खुद की पाइपलाइन के साथ खेल सकते हैं। नीचे इस गाइड से छवियों को प्लॉट करने के लिए उपयोग किया गया कोड है। यह एक अच्छा शुरुआती बिंदु हो सकता है, जैसे कि सामान्य कठिनाइयों के लिए कुछ वर्कअराउंड दिखा रहा है:

  • निष्पादन समय प्रतिलिपि प्रस्तुत करने योग्यता;
  • मैप किए गए कार्य उत्सुक निष्पादन;
  • interleave परिवर्तन कॉल करने योग्य।
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

डेटासेट

ArtificialDataset समान आप प्रत्येक चरण में बिताए समय को लौटाने वाला डेटासेट बना सकते हैं।

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
    
    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset
    
    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
        
        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter
        
        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter
            
            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered
            
    
    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

यह डेटासेट आकार [[2, 1], [2, 2], [2, 3]] और प्रकार के [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] । प्रत्येक नमूना है:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

कहाँ पे:

  • Open एंड Read चरण पहचानकर्ता हैं
  • t0 , टाइमस्टैम्प है जब संगत कदम शुरू हुआ
  • d इसी चरण में बिताया गया समय है
  • i उदाहरण सूचकांक हूँ
  • e युगीन सूचकांक है (कई बार डेटासेट को पुनरावृत्त किया गया है)
  • s नमूना सूचकांक है

पुनरावृति पाश

सभी समयों को एकत्र करने के लिए पुनरावृति लूप को थोड़ा अधिक जटिल बनाएं। यह केवल ऊपर विस्तृत रूप में नमूने उत्पन्न करने वाले डेटासेट के साथ काम करेगा।

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
    
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)
            
            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter
            
            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
        
        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)
    
    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

प्लॉटिंग विधि

अंत में, एक समयरेखा को प्लॉट करने में सक्षम फ़ंक्शन को परिभाषित करें, जो टाइमलाइन किए गए मान को वापस timelined_benchmark है।

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()
    
    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
    
    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)
    
    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")
        
        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]
        
        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

मैप किए गए फ़ंक्शन के लिए रैपर का उपयोग करें

एक उत्सुक संदर्भ में मैप किए गए फ़ंक्शन को चलाने के लिए, आपको उन्हें एक tf.py_function कॉल के अंदर लपेटना होगा।

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

पाइपलाइनों की तुलना

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

अनुभवहीन

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
Execution time: 12.436093607999965

अनुकूलित

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.experimental.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.303204500999982

draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png