มีคำถาม? เชื่อมต่อกับชุมชนที่ฟอรัม TensorFlow เยี่ยมชมฟอรัม

ประสิทธิภาพที่ดีขึ้นด้วย tf.data API

ดูใน TensorFlow.org เรียกใช้ใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดสมุดบันทึก

ภาพรวม

GPU และ TPU สามารถลดเวลาที่ต้องใช้ในการฝึกขั้นตอนเดียวได้อย่างมาก การบรรลุประสิทธิภาพสูงสุดจำเป็นต้องมีท่อส่งข้อมูลที่มีประสิทธิภาพซึ่งส่งข้อมูลสำหรับขั้นตอนถัดไปก่อนที่ขั้นตอนปัจจุบันจะเสร็จสิ้น tf.data API ช่วยสร้างท่อส่งข้อมูลที่ยืดหยุ่นและมีประสิทธิภาพ เอกสารนี้สาธิตวิธีใช้ tf.data API เพื่อสร้างไปป์ไลน์อินพุต TensorFlow ที่มีประสิทธิภาพสูง

ก่อนที่คุณจะดำเนินการต่อให้ตรวจสอบคำแนะนำในการ สร้าง TensorFlow input pipelines เพื่อเรียนรู้วิธีใช้ tf.data API

ทรัพยากร

ติดตั้ง

import tensorflow as tf

import time

ในคู่มือนี้คุณจะทำซ้ำในชุดข้อมูลและวัดประสิทธิภาพ การสร้างเกณฑ์มาตรฐานประสิทธิภาพที่ทำซ้ำได้อาจเป็นเรื่องยาก ปัจจัยต่างๆที่มีผลต่อความสามารถในการทำซ้ำ ได้แก่ :

  • โหลด CPU ปัจจุบัน
  • การรับส่งข้อมูลเครือข่าย
  • กลไกที่ซับซ้อนเช่นแคช

เพื่อให้ได้มาตรฐานที่ทำซ้ำได้คุณจะต้องสร้างตัวอย่างเทียมขึ้นมา

ชุดข้อมูล

เริ่มต้นด้วยการกำหนดคลาสที่สืบทอดมาจากtf.data.Dataset เรียกว่า ArtificialDataset ชุดข้อมูลนี้:

  • สร้างตัวอย่าง num_samples (ค่าเริ่มต้นคือ 3)
  • พักไว้ก่อนรายการแรกเพื่อจำลองการเปิดไฟล์
  • พักสักครู่ก่อนที่จะสร้างแต่ละรายการเพื่อจำลองการอ่านข้อมูลจากไฟล์
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

ชุดข้อมูลนี้คล้ายกับ tf.data.Dataset.range one โดยเพิ่มการหน่วงเวลาคงที่ที่จุดเริ่มต้นและระหว่างแต่ละตัวอย่าง

ห่วงการฝึกอบรม

จากนั้นเขียนลูปการฝึกแบบจำลองที่ใช้วัดระยะเวลาในการวนซ้ำชุดข้อมูล เวลาฝึกซ้อมเป็นแบบจำลอง

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

เพิ่มประสิทธิภาพ

หากต้องการแสดงวิธีเพิ่มประสิทธิภาพคุณจะปรับปรุงประสิทธิภาพของ ArtificialDataset

วิธีการที่ไร้เดียงสา

เริ่มต้นด้วยไปป์ไลน์ที่ไร้เดียงสาโดยไม่ใช้เทคนิคใด ๆ โดยทำซ้ำบนชุดข้อมูลตามที่เป็นอยู่

benchmark(ArtificialDataset())
Execution time: 0.2541472299999441

ภายใต้ประทุนนี่คือการใช้เวลาดำเนินการของคุณ:

พล็อตเวลาดำเนินการข้อมูล - วิธีการที่ไร้เดียงสา

พล็อตแสดงให้เห็นว่าการปฏิบัติตามขั้นตอนการฝึกอบรมเกี่ยวข้องกับ:

  • การเปิดไฟล์หากยังไม่ได้เปิด
  • ดึงรายการข้อมูลจากไฟล์
  • การใช้ข้อมูลสำหรับการฝึกอบรม

อย่างไรก็ตามในการใช้งานซิงโครนัสที่ไร้เดียงสาเช่นที่นี่ในขณะที่ไปป์ไลน์ของคุณกำลังดึงข้อมูลโมเดลของคุณไม่ได้ใช้งาน ในทางกลับกันในขณะที่โมเดลของคุณกำลังฝึกอบรมไปป์ไลน์อินพุตไม่ได้ใช้งาน เวลาขั้นตอนการฝึกจึงเป็นผลรวมของเวลาเปิดอ่านและเวลาฝึกอบรม

ส่วนถัดไปสร้างบนท่อส่งข้อมูลนี้ซึ่งแสดงให้เห็นถึงแนวทางปฏิบัติที่ดีที่สุดสำหรับการออกแบบท่อส่งข้อมูล TensorFlow ที่มีประสิทธิภาพ

การกำหนดค่าล่วงหน้า

การดึงข้อมูลล่วงหน้าจะซ้อนทับระหว่างการประมวลผลล่วงหน้าและการดำเนินการแบบจำลองของขั้นตอนการฝึกอบรม ในขณะที่รูปแบบการดำเนินการฝึกอบรมขั้นตอน s ท่อป้อนข้อมูลที่มีการอ่านข้อมูลสำหรับขั้นตอน s+1 1 การทำเช่นนี้จะช่วยลดเวลาขั้นตอนให้เหลือมากที่สุด (ตรงข้ามกับผลรวม) ของการฝึกอบรมและเวลาที่ใช้ในการดึงข้อมูล

tf.data API จัดเตรียมการแปลง tf.data.Dataset.prefetch สามารถใช้เพื่อแยกเวลาที่สร้างข้อมูลจากเวลาที่ใช้ข้อมูล โดยเฉพาะอย่างยิ่งการแปลงจะใช้เธรดพื้นหลังและบัฟเฟอร์ภายในเพื่อดึงองค์ประกอบล่วงหน้าจากชุดข้อมูลอินพุตก่อนเวลาที่มีการร้องขอ จำนวนองค์ประกอบที่จะดึงข้อมูลล่วงหน้าควรเท่ากับ (หรืออาจมากกว่า) จำนวนชุดงานที่ใช้โดยขั้นตอนการฝึกอบรมเดียว คุณสามารถปรับค่านี้ด้วยตนเองหรือตั้งค่าเป็น tf.data.AUTOTUNE ซึ่งจะแจ้งให้รันไทม์ tf.data ปรับแต่งค่าแบบไดนามิกที่รันไทม์

โปรดทราบว่าการเปลี่ยนแปลงการดึงข้อมูลล่วงหน้าให้ประโยชน์ทุกครั้งที่มีโอกาสซ้อนทับงานของ "ผู้ผลิต" กับงานของ "ผู้บริโภค"

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.20805208699994182

พล็อตเวลาดำเนินการข้อมูล - วิธีการดึงข้อมูลล่วงหน้า

ตอนนี้ตามที่พล็อตเวลาดำเนินการข้อมูลแสดงให้เห็นในขณะที่ขั้นตอนการฝึกกำลังทำงานสำหรับตัวอย่าง 0 ไปป์ไลน์อินพุตกำลังอ่านข้อมูลสำหรับตัวอย่าง 1 และอื่น ๆ

การแยกข้อมูลแบบขนาน

ในสภาพแวดล้อมจริงข้อมูลอินพุตอาจถูกจัดเก็บจากระยะไกล (เช่นบน Google Cloud Storage หรือ HDFS) ไปป์ไลน์ชุดข้อมูลที่ทำงานได้ดีเมื่ออ่านข้อมูลภายในเครื่องอาจเกิดปัญหาคอขวดบน I / O เมื่ออ่านข้อมูลจากระยะไกลเนื่องจากความแตกต่างดังต่อไปนี้ระหว่างที่เก็บข้อมูลภายในและระยะไกล:

  • เวลาต่อไบต์แรก : การอ่านไบต์แรกของไฟล์จากที่เก็บข้อมูลระยะไกลสามารถสั่งขนาดได้นานกว่าจากที่จัดเก็บในตัวเครื่อง
  • ปริมาณการอ่าน : ในขณะที่พื้นที่จัดเก็บข้อมูลระยะไกลมักมีแบนด์วิดท์รวมขนาดใหญ่การอ่านไฟล์เดียวอาจใช้ประโยชน์จากแบนด์วิดท์นี้เพียงเล็กน้อยเท่านั้น

นอกจากนี้เมื่อโหลดไบต์ดิบลงในหน่วยความจำแล้วยังอาจจำเป็นต้องแยกส่วนและ / หรือถอดรหัสข้อมูล (เช่น โปรโตบัฟ ) ซึ่งต้องมีการคำนวณเพิ่มเติม ค่าโสหุ้ยนี้จะปรากฏขึ้นโดยไม่คำนึงว่าข้อมูลจะถูกเก็บไว้ในเครื่องหรือจากระยะไกล แต่อาจแย่กว่านั้นในกรณีระยะไกลหากข้อมูลไม่ได้รับการดึงข้อมูลไว้ล่วงหน้าอย่างมีประสิทธิภาพ

เพื่อลดผลกระทบของค่าโสหุ้ยในการแยกข้อมูลต่างๆการแปลง tf.data.Dataset.interleave สามารถใช้เพื่อทำขั้นตอนการโหลดข้อมูลแบบขนานโดยสอดแทรกเนื้อหาของชุดข้อมูลอื่น ๆ (เช่นโปรแกรมอ่านไฟล์ข้อมูล) จำนวนชุดข้อมูลที่จะทับซ้อนกันสามารถระบุได้โดยอาร์กิวเมนต์ cycle_length ในขณะที่ระดับของความขนานสามารถระบุได้โดยอาร์กิวเมนต์ num_parallel_calls เช่นเดียวกับการแปลงแบบ prefetch แปลง interleave สนับสนุน tf.data.AUTOTUNE ซึ่งจะมอบหมายการตัดสินใจเกี่ยวกับระดับของความขนานที่จะใช้กับรันไทม์ tf.data

แทรกสลับตามลำดับ

อาร์กิวเมนต์เริ่มต้นของการแปลง tf.data.Dataset.interleave ทำให้แทรกตัวอย่างเดียวจากชุดข้อมูลสองชุดตามลำดับ

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4883518669998921

พล็อตเวลาดำเนินการข้อมูล - การแทรกสลับตามลำดับ

พล็อตเวลาดำเนินการข้อมูลนี้ช่วยให้สามารถแสดงพฤติกรรมของการแปลง interleave โดยดึงตัวอย่างหรือจากชุดข้อมูลสองชุดที่มีอยู่ อย่างไรก็ตามไม่มีการปรับปรุงประสิทธิภาพที่เกี่ยวข้องที่นี่

interleave แบบขนาน

ตอนนี้ใช้อาร์กิวเมนต์ num_parallel_calls ของการแปลง interleave ซึ่งจะโหลดชุดข้อมูลหลายชุดพร้อมกันช่วยลดเวลาในการรอเปิดไฟล์

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.26920967700016263

พล็อตเวลาดำเนินการข้อมูล - วิธีการแทรกสลับแบบขนาน

คราวนี้ตามที่แสดงพล็อตเวลาดำเนินการข้อมูลการอ่านชุดข้อมูลทั้งสองจะขนานกันซึ่งจะช่วยลดเวลาในการประมวลผลข้อมูลทั่วโลก

การแปลงข้อมูลแบบขนาน

เมื่อเตรียมข้อมูลองค์ประกอบอินพุตอาจต้องได้รับการประมวลผลล่วงหน้า ด้วยเหตุนี้ tf.data API จึงนำเสนอการแปลง tf.data.Dataset.map ซึ่งใช้ฟังก์ชันที่ผู้ใช้กำหนดเองกับแต่ละองค์ประกอบของชุดข้อมูลอินพุต เนื่องจากองค์ประกอบอินพุตเป็นอิสระจากกันการประมวลผลล่วงหน้าจึงสามารถขนานกันระหว่างแกน CPU หลายตัวได้ เพื่อให้เป็นไปได้เช่นเดียวกับการ prefetch และการแปลง interleave num_parallel_calls การแปลง map จะจัดเตรียมอาร์กิวเมนต์ num_parallel_calls เพื่อระบุระดับของความขนาน

การเลือกค่าที่ดีที่สุดสำหรับอาร์กิวเมนต์ num_parallel_calls ขึ้นอยู่กับฮาร์ดแวร์ของคุณลักษณะของข้อมูลการฝึกของคุณ (เช่นขนาดและรูปร่าง) ต้นทุนของฟังก์ชันแผนที่ของคุณและการประมวลผลอื่น ๆ ที่เกิดขึ้นกับ CPU ในเวลาเดียวกัน ฮิวริสติกง่ายๆคือการใช้จำนวนแกน CPU ที่มีอยู่ อย่างไรก็ตามสำหรับการ prefetch และการแปลง interleave tf.data.AUTOTUNE การแปลง map รองรับ tf.data.AUTOTUNE ซึ่งจะมอบหมายการตัดสินใจเกี่ยวกับระดับของความขนานที่จะใช้กับรันไทม์ tf.data

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

การแมปตามลำดับ

เริ่มต้นด้วยการใช้การแปลง map โดยไม่มีความขนานเป็นตัวอย่างพื้นฐาน

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4379127629999857

พล็อตเวลาดำเนินการข้อมูล - วิธีการแมปตามลำดับ

สำหรับ วิธีการที่ไร้เดียงสา ตามที่แสดงในที่นี้เวลาที่ใช้ในการเปิดการอ่านการประมวลผลล่วงหน้า (การทำแผนที่) และขั้นตอนการฝึกอบรมจะรวมเข้าด้วยกันสำหรับการทำซ้ำเพียงครั้งเดียว

การทำแผนที่แบบขนาน

ตอนนี้ใช้ฟังก์ชันก่อนการประมวลผลเดียวกัน แต่ใช้ควบคู่กันกับหลายตัวอย่าง

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2747970279999663

เวลาดำเนินการข้อมูล - การทำแผนที่แบบขนาน

ตามที่แสดงให้เห็นถึงการลงจุดข้อมูลขั้นตอนก่อนการประมวลผลจะซ้อนทับกันซึ่งจะช่วยลดเวลาโดยรวมสำหรับการทำซ้ำเพียงครั้งเดียว

เก็บเอาไว้

การแปลง tf.data.Dataset.cache สามารถแคชชุดข้อมูลทั้งในหน่วยความจำหรือที่เก็บข้อมูลในเครื่อง การดำเนินการนี้จะช่วยประหยัดการดำเนินการบางอย่าง (เช่นการเปิดไฟล์และการอ่านข้อมูล) จากการดำเนินการในแต่ละยุค

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3715158390000397

เวลาดำเนินการข้อมูล - วิธีการแคชชุดข้อมูล

ที่นี่พล็อตเวลาดำเนินการข้อมูลแสดงให้เห็นว่าเมื่อคุณแคชชุดข้อมูลการแปลงก่อน cache หนึ่ง (เช่นการเปิดไฟล์และการอ่านข้อมูล) จะดำเนินการในช่วงยุคแรกเท่านั้น ยุคถัดไปจะนำข้อมูลที่แคชไว้มาใช้ใหม่โดยการแปลง cache

หากฟังก์ชันที่ผู้ใช้กำหนดเองที่ส่งผ่านไปยังการแปลง map มีราคาแพงให้ใช้การแปลง cache หลังจากการแปลง map ตราบเท่าที่ชุดข้อมูลผลลัพธ์ยังสามารถใส่ลงในหน่วยความจำหรือที่จัดเก็บในตัวเครื่องได้ หากฟังก์ชันที่ผู้ใช้กำหนดเองเพิ่มพื้นที่ที่จำเป็นในการจัดเก็บชุดข้อมูลเกินความจุแคชให้ใช้หลังจากการแปลง cache หรือพิจารณาการประมวลผลข้อมูลของคุณล่วงหน้าก่อนงานฝึกอบรมของคุณเพื่อลดการใช้ทรัพยากร

การทำแผนที่แบบ Vectorizing

การเรียกใช้ฟังก์ชันที่ผู้ใช้กำหนดเองที่ส่งผ่านไปยังการแปลง map มีค่าใช้จ่ายที่เกี่ยวข้องกับการตั้งเวลาและการเรียกใช้ฟังก์ชันที่ผู้ใช้กำหนดเอง กำหนดฟังก์ชันที่ผู้ใช้กำหนดเป็นเวกเตอร์ (นั่นคือให้มันทำงานผ่านชุดอินพุตพร้อมกัน) และใช้การแปลง batch ก่อน การแปลง map

เพื่อแสดงแนวทางปฏิบัติที่ดีชุดข้อมูลเทียมของคุณไม่เหมาะสม ความล่าช้าในการตั้งเวลาอยู่ที่ประมาณ 10 ไมโครวินาที (10e-6 วินาที) ซึ่งน้อยกว่าหลายสิบมิลลิวินาทีที่ใช้ใน ArtificialDataset ดังนั้นจึงยากที่จะเห็นผลกระทบ

สำหรับตัวอย่างนี้ใช้ฟังก์ชัน base tf.data.Dataset.range และลดความซับซ้อนของลูปการฝึกให้เป็นรูปแบบที่ง่ายที่สุด

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

การทำแผนที่สเกลาร์

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.9082538790000854

เวลาดำเนินการข้อมูล - วิธีแผนที่สเกลาร์

พล็อตด้านบนแสดงให้เห็นถึงสิ่งที่เกิดขึ้น (มีตัวอย่างน้อยกว่า) โดยใช้วิธีการแมปสเกลาร์ แสดงว่ามีการใช้ฟังก์ชันที่แมปสำหรับแต่ละตัวอย่าง แม้ว่าฟังก์ชันนี้จะเร็วมาก แต่ก็มีค่าใช้จ่ายบางส่วนที่ส่งผลต่อประสิทธิภาพของเวลา

การทำแผนที่แบบเวกเตอร์

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.03624614399996062

เวลาดำเนินการข้อมูล - วิธีการแผนที่แบบเวกเตอร์

คราวนี้ฟังก์ชันที่แมปถูกเรียกใช้ครั้งเดียวและใช้กับกลุ่มตัวอย่าง ตามที่แสดงพล็อตเวลาดำเนินการข้อมูลในขณะที่ฟังก์ชันอาจใช้เวลาในการดำเนินการมากขึ้นค่าใช้จ่ายจะปรากฏเพียงครั้งเดียวซึ่งช่วยปรับปรุงประสิทธิภาพของเวลาโดยรวม

ลดการใช้หน่วยความจำ

การเปลี่ยนแปลงหลายอย่างรวมถึงการ interleave prefetch และการ shuffle รักษาบัฟเฟอร์ภายในขององค์ประกอบ หากฟังก์ชันที่ผู้ใช้กำหนดเองผ่านเข้าไปในการแปลง map เปลี่ยนขนาดขององค์ประกอบลำดับของการแปลงแผนที่และการแปลงที่องค์ประกอบบัฟเฟอร์จะมีผลต่อการใช้หน่วยความจำ โดยทั่วไปให้เลือกลำดับที่ส่งผลให้มีหน่วยความจำลดลงเว้นแต่การจัดลำดับที่แตกต่างกันจะเป็นที่ต้องการสำหรับประสิทธิภาพ

การแคชการคำนวณบางส่วน

ขอแนะนำให้แคชชุดข้อมูลหลังจากการแปลง map ยกเว้นว่าการเปลี่ยนแปลงนี้ทำให้ข้อมูลมีขนาดใหญ่เกินไปที่จะใส่ลงในหน่วยความจำ การแลกเปลี่ยนสามารถทำได้หากฟังก์ชันการแมปของคุณสามารถแบ่งออกเป็นสองส่วน: ส่วนที่ใช้เวลานานและส่วนที่ใช้หน่วยความจำ ในกรณีนี้คุณสามารถเชื่อมโยงการเปลี่ยนแปลงของคุณได้ดังต่อไปนี้:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

ด้วยวิธีนี้ส่วนที่เสียเวลาจะดำเนินการในช่วงยุคแรกเท่านั้นและคุณหลีกเลี่ยงการใช้พื้นที่แคชมากเกินไป

สรุปแนวทางปฏิบัติที่ดีที่สุด

นี่คือบทสรุปของแนวทางปฏิบัติที่ดีที่สุดสำหรับการออกแบบไปป์ไลน์อินพุต TensorFlow ที่มีประสิทธิภาพ:

การจำลองตัวเลข

หากต้องการทำความเข้าใจเกี่ยวกับtf.data.Dataset API ให้ลึกซึ้งยิ่งขึ้นคุณสามารถเล่นกับไปป์ไลน์ของคุณเองได้ ด้านล่างนี้คือรหัสที่ใช้ในการพล็อตภาพจากคู่มือนี้ อาจเป็นจุดเริ่มต้นที่ดีโดยแสดงวิธีแก้ปัญหาบางประการสำหรับปัญหาทั่วไปเช่น:

  • ความสามารถในการทำซ้ำเวลาดำเนินการ
  • ฟังก์ชั่นที่แมปดำเนินการอย่างกระตือรือร้น
  • สามารถเรียกการแปลง interleave
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

ชุดข้อมูล

เช่นเดียวกับ ArtificialDataset คุณสามารถสร้างชุดข้อมูลคืนเวลาที่ใช้ในแต่ละขั้นตอนได้

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

ชุดข้อมูลนี้แสดงตัวอย่างรูปร่าง [[2, 1], [2, 2], [2, 3]] และประเภท [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] แต่ละตัวอย่างคือ:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

ที่ไหน:

  • Open และ Read คือตัวระบุขั้นตอน
  • t0 คือการประทับเวลาเมื่อขั้นตอนที่เกี่ยวข้องเริ่มต้นขึ้น
  • d คือเวลาที่ใช้ในขั้นตอนที่เกี่ยวข้อง
  • i คือดัชนีอินสแตนซ์
  • e คือดัชนียุค (จำนวนครั้งที่มีการทำซ้ำชุดข้อมูล)
  • s คือดัชนีตัวอย่าง

วนซ้ำ

ทำให้การวนซ้ำซับซ้อนขึ้นเล็กน้อยเพื่อรวมการกำหนดเวลาทั้งหมด สิ่งนี้จะใช้ได้เฉพาะกับชุดข้อมูลที่สร้างตัวอย่างตามรายละเอียดด้านบน

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

วิธีการพล็อต

สุดท้ายกำหนดฟังก์ชันที่สามารถพล็อตไทม์ไลน์โดยให้ค่าที่ส่งคืนโดยฟังก์ชัน timelined_benchmark

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

ใช้ Wrapper สำหรับฟังก์ชันที่แมป

ในการเรียกใช้ฟังก์ชันที่แมปในบริบทที่กระตือรือร้นคุณต้องรวมฟังก์ชันเหล่านี้ไว้ในการเรียกใช้ tf.py_function

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

การเปรียบเทียบท่อ

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

ไร้เดียงสา

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From <ipython-input-1-c85330a00c6e>:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 12.445692234000035

ปรับให้เหมาะสม

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.326935971000012
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png