หน้านี้ได้รับการแปลโดย Cloud Translation API
Switch to English

อินพุตแบบกระจาย

ดูใน TensorFlow.org เรียกใช้ใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดสมุดบันทึก

tf.distribute API เป็นวิธีง่ายๆสำหรับผู้ใช้ในการปรับขนาดการฝึกอบรมจากเครื่องเดียวไปยังหลายเครื่อง เมื่อปรับขนาดโมเดลผู้ใช้จะต้องแจกจ่ายข้อมูลที่ป้อนไปยังอุปกรณ์ต่างๆ tf.distribute ให้ API ที่ใช้ซึ่งคุณสามารถกระจายการป้อนข้อมูลของคุณไปยังอุปกรณ์ต่างๆโดยอัตโนมัติ

คำแนะนำนี้จะแสดงให้คุณเห็นถึงวิธีต่างๆที่คุณสามารถสร้างชุดข้อมูลแบบกระจายและตัววนซ้ำโดยใช้ tf.distribute API นอกจากนี้หัวข้อต่อไปนี้จะครอบคลุม:

คู่มือนี้ไม่ครอบคลุมการใช้งานอินพุตแบบกระจายด้วย Keras API

ชุดข้อมูลแบบกระจาย

ในการใช้ tf.distribute API ในการปรับขนาดขอแนะนำให้ผู้ใช้ใช้ tf.data.Dataset เพื่อแสดงอินพุตของตน tf.distribute ถูกสร้างขึ้นเพื่อให้ทำงานได้อย่างมีประสิทธิภาพกับ tf.data.Dataset (ตัวอย่างเช่นการดึงข้อมูลล่วงหน้าอัตโนมัติไปยังอุปกรณ์เร่งความเร็วแต่ละเครื่อง) โดยมีการเพิ่มประสิทธิภาพการทำงานอย่างสม่ำเสมอในการนำไปใช้งาน หากคุณมีกรณีการใช้งานสำหรับการใช้งานอย่างอื่นนอกเหนือจาก tf.data.Dataset โปรดอ้างอิง ส่วน ภายหลังในคู่มือนี้ ในลูปการฝึกอบรมแบบไม่กระจายผู้ใช้สร้างอินสแตนซ์ tf.data.Dataset ก่อนแล้ววนซ้ำองค์ประกอบ ตัวอย่างเช่น:

 import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.3.0

 global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

 
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

ในการอนุญาตให้ผู้ใช้ใช้กลยุทธ์ tf.distribute โดยมีการเปลี่ยนแปลงเล็กน้อยกับโค้ดที่มีอยู่ของผู้ใช้จึงมีการแนะนำ API สองรายการซึ่งจะแจกจ่ายอินสแตนซ์ tf.data.Dataset และส่งคืนอ็อบเจ็กต์ชุดข้อมูลแบบกระจาย จากนั้นผู้ใช้สามารถทำซ้ำผ่านอินสแตนซ์ชุดข้อมูลที่แจกจ่ายนี้และฝึกอบรมโมเดลของพวกเขาเหมือนก่อนหน้านี้ ให้เราดูที่สอง APIs - tf.distribute.Strategy.experimental_distribute_dataset และ tf.distribute.Strategy.experimental_distribute_datasets_from_function ในรายละเอียดเพิ่มเติม:

tf.distribute.Strategy.experimental_distribute_dataset

การใช้

API นี้ใช้อินสแตนซ์ tf.data.Dataset เป็นอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset คุณควรแบตช์ชุดข้อมูลอินพุตด้วยค่าที่เท่ากับขนาดแบทช์ส่วนกลาง ขนาดแบตช์ส่วนกลางนี้คือจำนวนตัวอย่างที่คุณต้องการดำเนินการกับอุปกรณ์ทั้งหมดใน 1 ขั้นตอน คุณสามารถทำซ้ำชุดข้อมูลแบบกระจายนี้ในรูปแบบ Pythonic หรือสร้างตัววนซ้ำโดยใช้ iter อ็อบเจ็กต์ที่ส่งคืนไม่ใช่อินสแตนซ์ tf.data.Dataset และไม่สนับสนุน API อื่นใดที่แปลงหรือตรวจสอบชุดข้อมูลในทางใดทางหนึ่ง นี่คือ API ที่แนะนำหากคุณไม่มีวิธีเฉพาะเจาะจงที่คุณต้องการแบ่งข้อมูลที่คุณป้อนผ่านแบบจำลองต่างๆ

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

คุณสมบัติ

เครื่องผสม

tf.distribute รี tf.data.Dataset อินสแตนซ์ tf.data.Dataset อินพุตด้วยขนาดแบตช์ใหม่ที่เท่ากับขนาดแบตช์ส่วนกลางหารด้วยจำนวนเรพลิกาในการซิงค์ จำนวนของแบบจำลองในการซิงค์เท่ากับจำนวนของอุปกรณ์ที่มีส่วนร่วมใน allreduce ไล่ระดับสีในระหว่างการฝึกอบรม เมื่อผู้ใช้เรียกตัว next บนตัววนซ้ำแบบกระจายขนาดต่อชุดข้อมูลจำลองจะถูกส่งกลับในแต่ละตัวจำลอง คาร์ดินาลลิตี้ของชุดข้อมูล rebatched จะเป็นจำนวนเต็มของจำนวนการจำลองเสมอ นี่คือตัวอย่างบางส่วน:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    ไม่มีการกระจาย:

    ชุดที่ 1: [0, 1, 2, 3]

    ชุดที่ 2: [4, 5]

    ด้วยการจัดจำหน่ายมากกว่า 2 แบบ:

    ชุดที่ 1: แบบจำลอง 1: [0, 1] แบบจำลอง 2: [2, 3]

    รุ่นที่ 2: แบบจำลอง 2: [4] แบบจำลอง 2: [5]

    ชุดสุดท้าย ([4, 5]) จะแบ่งระหว่าง 2 แบบจำลอง

  • tf.data.Dataset.range(4).batch(4)

    ไม่มีการกระจาย:

    กลุ่ม 1: [[0], [1], [2], [3]]

    ด้วยการจัดจำหน่ายมากกว่า 5 แบบ:

    ชุดที่ 1: แบบจำลอง 1: [0] แบบจำลอง 2: [1] แบบจำลอง 3: [2] แบบจำลอง 4: [3] แบบจำลอง 5: []

  • tf.data.Dataset.range(8).batch(4)

    ไม่มีการกระจาย:

    กลุ่ม 1: [0, 1, 2, 3]

    รุ่นที่ 2: [4, 5, 6, 7]

    ด้วยการกระจายมากกว่า 3 แบบจำลอง:

    ชุดที่ 1: จำลอง 1: [0, 1] จำลอง 2: [2, 3] จำลอง 3: []

    ชุดที่ 2: จำลอง 1: [4, 5] จำลอง 2: [6, 7] จำลอง 3: []

การจับคู่ชุดข้อมูลใหม่มีความซับซ้อนของช่องว่างที่เพิ่มขึ้นแบบเชิงเส้นด้วยจำนวนแบบจำลอง ซึ่งหมายความว่าสำหรับกรณีที่ใช้การฝึกอบรมผู้ปฏิบัติงานหลายกรณีช่องทางอินพุตสามารถเรียกใช้เป็นข้อผิดพลาดของ OOM

sharding

tf.distribute ยังจัดเก็บข้อมูลชุดข้อมูลอินพุตอัตโนมัติในการฝึกอบรมผู้ปฏิบัติงานหลายคน แต่ละชุดข้อมูลจะถูกสร้างขึ้นบนอุปกรณ์ซีพียูของผู้ปฏิบัติงาน การส่งชุดข้อมูลโดยอัตโนมัติผ่านชุดของคนงานหมายความว่าผู้ปฏิบัติงานแต่ละคนได้รับมอบหมายชุดย่อยของชุดข้อมูลทั้งหมด (ถ้าตั้งค่า tf.data.experimental.AutoShardPolicy ถูกต้อง) นี่คือเพื่อให้แน่ใจว่าในแต่ละขั้นตอนขนาดของแบทช์ทั่วโลกขององค์ประกอบชุดข้อมูลที่ไม่ทับซ้อนกันจะถูกประมวลผลโดยผู้ปฏิบัติงานแต่ละคน Autosharding มีสองตัวเลือกที่แตกต่างกันซึ่งสามารถระบุได้โดยใช้ tf.data.experimental.DistributeOptions

 dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
 

มีสามตัวเลือกต่าง ๆ ที่คุณสามารถตั้งค่าสำหรับ tf.data.experimental.AutoShardPolicy :

  • AUTO: นี่คือตัวเลือกเริ่มต้นซึ่งหมายถึงความพยายามที่จะทำให้ไฟล์โดย FILE ความพยายามในการแบ่งโดย FILE ล้มเหลวหากตรวจไม่พบชุดข้อมูลที่ใช้ไฟล์ tf.distribute จะถอยกลับไปหา tf.distribute โปรดทราบว่าหากชุดข้อมูลอินพุตเป็นแบบไฟล์ แต่จำนวนไฟล์น้อยกว่าจำนวนคนทำงานข้อผิดพลาดจะเพิ่มขึ้น
  • ไฟล์: เป็นตัวเลือกนี้หากคุณต้องการแบ่งไฟล์อินพุตให้กับคนงานทั้งหมด หากจำนวนไฟล์น้อยกว่าจำนวนคนงานจะมีข้อผิดพลาดเกิดขึ้น คุณควรใช้ตัวเลือกนี้หากจำนวนไฟล์อินพุตมีขนาดใหญ่กว่าจำนวนคนงานและข้อมูลในไฟล์จะถูกกระจายอย่างเท่าเทียมกัน ข้อเสียของตัวเลือกนี้คือมีพนักงานที่ไม่ได้ใช้งานหากข้อมูลในไฟล์ไม่กระจายอย่างเท่าเทียมกัน ตัวอย่างเช่นให้เราแจกจ่าย 2 ไฟล์ใน 2 คนที่มี 1 แบบจำลอง ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5] และไฟล์ 2 ประกอบด้วย [6, 7, 8, 9, 10, 11] ปล่อยให้จำนวนเรพลิกาทั้งหมดที่ซิงค์เป็น 2 และขนาดแบตช์โกลบอลเท่ากับ 4

    • คนงาน 0:

    รุ่นที่ 1 = จำลอง 1: [0, 1]

    ชุดที่ 2 = ตัวจำลอง 1: [2, 3]

    ชุดที่ 3 = แบบจำลอง 1: [4]

    รุ่นที่ 4 = จำลอง 1: [5]

    • คนงาน 1:

    ชุดที่ 1 = ตัวจำลอง 2: [6, 7]

    ชุดที่ 2 = ตัวจำลอง 2: [8, 9]

    ชุดที่ 3 = ตัวจำลอง 2: [10]

    ชุดที่ 4 = ตัวจำลอง 2: [11]

  • ข้อมูล: สิ่งนี้จะทำการตรวจสอบองค์ประกอบของพนักงานทั้งหมด คนงานแต่ละคนจะอ่านชุดข้อมูลทั้งหมดและประมวลผลชิ้นส่วนที่กำหนดให้เท่านั้น ชิ้นส่วนอื่น ๆ ทั้งหมดจะถูกทิ้ง โดยทั่วไปจะใช้หากจำนวนไฟล์อินพุตน้อยกว่าจำนวนคนงานและคุณต้องการการแบ่งข้อมูลให้กับพนักงานทุกคนได้ดีขึ้น ข้อเสียคือชุดข้อมูลทั้งหมดจะถูกอ่านเกี่ยวกับคนงานแต่ละคน ตัวอย่างเช่นให้เราแจกจ่าย 1 ไฟล์ใน 2 คนงาน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2

    • คนงาน 0:

    ชุดที่ 1 = ตัวจำลอง 1: [0, 1]

    รุ่นที่ 2 = จำลอง 1: [4, 5]

    ชุดที่ 3 = ตัวจำลอง 1: [8, 9]

    • คนงาน 1:

    ชุดที่ 1 = ตัวจำลอง 2: [2, 3]

    ชุดที่ 2 = ตัวจำลอง 2: [6, 7]

    รุ่นที่ 3 = จำลอง 2: [10, 11]

  • OFF: หากคุณปิดการบันทึกอัตโนมัติผู้ปฏิบัติงานแต่ละคนจะประมวลผลข้อมูลทั้งหมด ตัวอย่างเช่นให้เราแจกจ่าย 1 ไฟล์กับพนักงาน 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2 จากนั้นผู้ปฏิบัติงานแต่ละคนจะเห็นการแจกจ่ายต่อไปนี้:

    • คนงาน 0:

    รุ่นที่ 1 = จำลอง 1: [0, 1]

    รุ่นที่ 2 = จำลอง 1: [2, 3]

    รุ่นที่ 3 = จำลอง 1: [4, 5]

    แบทช์ 4 = จำลอง 1: [6, 7]

    รุ่นที่ 5 = จำลอง 1: [8, 9]

    ชุดที่ 6 = แบบจำลอง 1: [10, 11]

    • คนงาน 1:

    รุ่นที่ 1 = จำลอง 2: [0, 1]

    รุ่นที่ 2 = จำลอง 2: [2, 3]

    รุ่นที่ 3 = จำลอง 2: [4, 5]

    แบทช์ 4 = จำลอง 2: [6, 7]

    รุ่นที่ 5 = จำลอง 2: [8, 9]

    แบทช์ 6 = จำลอง 2: [10, 11]

โหลดล่วงหน้า

ตามค่าเริ่มต้น tf.distribute จะเพิ่มการแปลงการดึงข้อมูลล่วงหน้าที่ส่วนท้ายของอินสแตนซ์ tf.data.Dataset ผู้ใช้ระบุ อาร์กิวเมนต์สำหรับการแปลงการดึงข้อมูลล่วงหน้าซึ่งเป็น buffer_size เท่ากับจำนวนแบบจำลองที่ซิงค์

tf.distribute.Strategy.experimental_distribute_datasets_from_function

การใช้

API นี้รับฟังก์ชันอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset ฟังก์ชันอินพุตที่ผู้ใช้ส่งผ่านมีอาร์กิวเมนต์ tf.distribute.InputContext และควรส่งคืนอินสแตนซ์ tf.data.Dataset ด้วย API นี้ tf.distribute จะไม่ทำการเปลี่ยนแปลงใด ๆ กับอินสแตนซ์ tf.data.Dataset ของผู้ใช้ที่ส่งคืนจากฟังก์ชันอินพุต มันเป็นความรับผิดชอบของผู้ใช้ในการแบทช์และแบ่งชุดข้อมูล tf.distribute เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของพนักงานแต่ละคน นอกเหนือจากการอนุญาตให้ผู้ใช้ระบุแบตช์และการแบ่งส่วนตรรกะของตนเอง API นี้ยังแสดงให้เห็นถึงความสามารถในการปรับขนาดและประสิทธิภาพที่ดีขึ้นเมื่อเปรียบเทียบกับ tf.distribute.Strategy.experimental_distribute_dataset เมื่อใช้สำหรับการฝึกอบรมผู้ปฏิบัติงานหลายคน

 mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

คุณสมบัติ

เครื่องผสม

อินสแตนซ์ tf.data.Dataset ที่เป็นค่าส่งคืนของฟังก์ชันอินพุตควรเป็นแบตช์โดยใช้ขนาดแบตช์ต่อแบบจำลอง ขนาดแบทช์ต่อจำลองคือขนาดแบทช์ทั่วโลกหารด้วยจำนวนของเรพลิคาที่มีส่วนร่วมในการฝึกอบรมการซิงค์ นี่เป็นเพราะ tf.distribute เรียกฟังก์ชันอินพุตบนอุปกรณ์ CPU ของพนักงานแต่ละคน ชุดข้อมูลที่สร้างขึ้นในผู้ปฏิบัติงานที่กำหนดควรพร้อมที่จะใช้โดยแบบจำลองทั้งหมดในผู้ปฏิบัติงานนั้น

sharding

วัตถุ tf.distribute.InputContext ที่ส่งโดยปริยายเป็นอาร์กิวเมนต์ไปยังฟังก์ชันอินพุตของผู้ใช้ถูกสร้างขึ้นโดย tf.distribute ภายใต้ประทุน มีข้อมูลเกี่ยวกับจำนวนคนงานรหัสผู้ปฏิบัติงานปัจจุบัน ฯลฯ ฟังก์ชันอินพุตนี้สามารถจัดการการชาร์ดตามนโยบายที่กำหนดโดยผู้ใช้โดยใช้คุณสมบัติเหล่านี้ซึ่งเป็นส่วนหนึ่งของอ็อบเจ็กต์ tf.distribute.InputContext

โหลดล่วงหน้า

tf.distribute ไม่ได้เพิ่มการแปลง prefetch ที่ส่วนท้ายของ tf.data.Dataset ส่งคืนโดยฟังก์ชันอินพุตที่ผู้ใช้ระบุ

Iterators แบบกระจาย

คล้ายกับอินสแตนซ์ tf.data.Dataset ไม่กระจายคุณจะต้องสร้างตัววนซ้ำบนอินสแตนซ์ tf.distribute.DistributedDataset เพื่อวนซ้ำและเข้าถึงองค์ประกอบใน tf.distribute.DistributedDataset ต่อไปนี้เป็นวิธีที่คุณสามารถสร้าง tf.distribute.DistributedIterator และใช้เพื่อฝึกโมเดลของคุณ:

ประเพณี

ใช้ Pythonic สำหรับการสร้างลูป

คุณสามารถใช้ Pythonic loop ที่เป็นมิตรกับผู้ใช้เพื่อวนซ้ำบน tf.distribute.DistributedDataset องค์ประกอบที่ส่งคืนจาก tf.distribute.DistributedIterator สามารถเป็น tf.Tensor เดียวหรือ tf.distribute.DistributedValues ซึ่งมีค่าต่อแบบจำลอง การวางลูปภายใน tf.function จะช่วยเพิ่มประสิทธิภาพ อย่างไรก็ตามในขณะนี้ไม่รองรับการ break และการ return หากวางลูปไว้ใน tf.function นอกจากนี้เรายังไม่สนับสนุนการวางลูปใน tf.function เมื่อใช้กลยุทธ์หลายคนเช่น tf.distribute.experimental.MultiWorkerMirroredStrategy และ tf.distribute.TPUStrategy การวางลูปภายใน tf.function งานได้สำหรับผู้ปฏิบัติงานคนเดียว tf.distribute.TPUStrategy แต่ไม่ใช่เมื่อใช้ TPU tf.distribute.TPUStrategy

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

ใช้ iter เพื่อสร้าง iterator ที่ชัดเจน

หากต้องการวนซ้ำองค์ประกอบในอินสแตนซ์ tf.distribute.DistributedDataset คุณสามารถสร้าง tf.distribute.DistributedIterator โดยใช้ iter API ด้วยตัววนซ้ำอย่างชัดเจนคุณสามารถวนซ้ำตามจำนวนขั้นตอนที่แน่นอน ในการรับองค์ประกอบถัดไปจาก tf.distribute.DistributedIterator อินสแตนซ์ dist_iterator คุณสามารถเรียก next(dist_iterator) , dist_iterator.get_next() หรือ dist_iterator.get_next_as_optional() อดีตทั้งสองมีความเหมือนกัน:

 num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
 
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

ด้วย next() หรือ tf.distribute.DistributedIterator.get_next() ถ้า tf.distribute.DistributedIterator ถึงจุดสิ้นสุดข้อผิดพลาด OutOfRange จะถูกโยนทิ้ง ไคลเอนต์สามารถตรวจจับข้อผิดพลาดในด้าน python และทำงานอื่น ๆ ต่อไปเช่นการตรวจสอบและประเมินผล อย่างไรก็ตามวิธีนี้จะไม่ทำงานหากคุณใช้ลูปการฝึกอบรมโฮสต์ (เช่นเรียกใช้หลายขั้นตอนต่อ tf.function ) ซึ่งมีลักษณะดังนี้:

 @tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
 

train_fn มีหลายขั้นตอนโดยการรวมส่วนของขั้นตอนไว้ใน tf.range ในกรณีนี้การวนซ้ำที่แตกต่างกันในลูปที่ไม่มีการพึ่งพาอาจเริ่มต้นควบคู่กันได้ดังนั้นข้อผิดพลาด OutOfRange สามารถถูกทริกเกอร์ในการทำซ้ำในภายหลังก่อนที่การคำนวณการทำซ้ำก่อนหน้านี้จะเสร็จสิ้น เมื่อโยนข้อผิดพลาด OutOfRange แล้ว ops ทั้งหมดในฟังก์ชั่นจะถูกยกเลิกทันที หากเป็นกรณีที่คุณต้องการหลีกเลี่ยงทางเลือกอื่นที่ไม่ทิ้งข้อผิดพลาด tf.distribute.DistributedIterator.get_next_as_optional() คือ tf.distribute.DistributedIterator.get_next_as_optional() get_next_as_optional ส่งคืน tf.experimental.Optional ซึ่งมีองค์ประกอบถัดไปหรือไม่มีค่าถ้า tf.distribute.DistributedIterator ถึงจุดสิ้นสุด

 # You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
 
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

ใช้คุณสมบัติ element_spec

ถ้าคุณส่งผ่านองค์ประกอบของชุดข้อมูลแบบกระจายไปยัง tf.function และต้องการการรับประกัน tf.TypeSpec คุณสามารถระบุอาร์กิวเมนต์ input_signature ของ tf.function เอาต์พุตของชุดข้อมูลแบบกระจายคือ tf.distribute.DistributedValues ซึ่งสามารถแสดงถึงอินพุตไปยังอุปกรณ์เดียวหรือหลายอุปกรณ์ หากต้องการรับ tf.TypeSpec สอดคล้องกับค่าแบบกระจายนี้คุณสามารถใช้คุณสมบัติ element_spec ของชุดข้อมูลแบบกระจายหรืออ็อบเจ็กต์ตัววนซ้ำแบบกระจาย

 global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs
  
  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

แบทช์บางส่วน

พบชุดงานบางส่วนเมื่ออินสแตนซ์ tf.data.Dataset ที่ผู้ใช้สร้างอาจมีขนาดแบทช์ที่ไม่สามารถหารด้วยจำนวนแบบจำลองหรือเมื่อความสำคัญของอินสแตนซ์ชุดข้อมูลไม่สามารถหารด้วยขนาดแบทช์ ซึ่งหมายความว่าเมื่อชุดข้อมูลถูกกระจายไปบนตัวจำลองหลายตัวการเรียก next สำหรับตัววนซ้ำบางตัวจะส่งผลให้เกิด OutOfRangeError ในการจัดการกรณีการใช้งานนี้ tf.distribute จะส่งคืนชุดดัมมี่ของขนาดแบตช์ 0 บนแบบจำลองที่ไม่มีข้อมูลให้ประมวลผลอีกต่อไป

สำหรับกรณีของผู้ปฏิบัติงานรายเดียวหากไม่มีการส่งคืนข้อมูลโดยการเรียก next บนตัววนซ้ำระบบจะสร้างและใช้ชุดข้อมูลจำลองที่มีขนาดชุดงาน 0 พร้อมกับข้อมูลจริงในชุดข้อมูล ในกรณีของแบทช์บางส่วนแบทช์ทั่วโลกชุดสุดท้ายจะมีข้อมูลจริงควบคู่กับชุดข้อมูล ตอนนี้เงื่อนไขการหยุดสำหรับการประมวลผลข้อมูลจะตรวจสอบว่ามีแบบจำลองใดมีข้อมูลหรือไม่ ถ้าไม่มีข้อมูลเกี่ยวกับแบบจำลองใด ๆ ข้อผิดพลาด OutOfRange จะถูกส่งออกไป

สำหรับกรณีของผู้ปฏิบัติงานหลายคนค่าบูลีนที่แสดงการมีอยู่ของข้อมูลในแต่ละคนงานจะถูกรวมโดยใช้การสื่อสารข้ามแบบจำลองและสิ่งนี้จะใช้เพื่อระบุว่าคนงานทั้งหมดได้เสร็จสิ้นการประมวลผลชุดข้อมูลแบบกระจาย เนื่องจากสิ่งนี้เกี่ยวข้องกับการสื่อสารข้ามคนงานจึงมีบทลงโทษที่เกี่ยวข้องกับการปฏิบัติงาน

คำเตือน

  • เมื่อใช้ tf.distribute.Strategy.experimental_distribute_dataset API ที่มีการตั้งค่าผู้ปฏิบัติงานหลายผู้ใช้ผ่าน tf.data.Dataset ที่อ่านจากไฟล์ ถ้า tf.data.experimental.AutoShardPolicy ถูกตั้งค่าเป็น AUTO หรือ FILE ขนาดชุดงานจริงต่อขั้นตอนอาจเล็กกว่าขนาดแบทช์ส่วนกลางที่ผู้ใช้กำหนด สิ่งนี้สามารถเกิดขึ้นได้เมื่อองค์ประกอบที่เหลือในไฟล์น้อยกว่าขนาดแบทช์ทั่วโลก ผู้ใช้สามารถใช้ชุดข้อมูลโดยไม่ต้องขึ้นอยู่กับจำนวนขั้นตอนในการรันหรือตั้งค่า tf.data.experimental.AutoShardPolicy เป็น DATA เพื่อแก้ไขปัญหา

  • การแปลงชุดข้อมูลแบบ stateful ไม่รองรับ tf.distribute ในขณะนี้และ tf.distribute แบบไม่ระบุสถานะใด ๆ ที่ชุดข้อมูลนั้นอาจถูกละเว้นในปัจจุบัน ตัวอย่างเช่นหากชุดข้อมูลของคุณมี map_fn ที่ใช้ tf.random.uniform ในการหมุนรูปภาพคุณจะมีกราฟชุดข้อมูลที่ขึ้นอยู่กับสถานะ (เช่นเมล็ดพันธุ์แบบสุ่ม) บนเครื่องโลคัลที่กำลังดำเนินการกระบวนการ python

  • Experimental tf.data.experimental.OptimizationOptions ที่ปิดใช้งานโดยค่าเริ่มต้นสามารถในบริบทบางอย่างเช่นเมื่อใช้ร่วมกับ tf.distribute - ทำให้ประสิทธิภาพลดลง คุณควรเปิดใช้งานหลังจากที่คุณตรวจสอบแล้วว่ามีประโยชน์ต่อประสิทธิภาพของปริมาณงานของคุณในการตั้งค่าการแจกจ่าย

  • ลำดับที่ข้อมูลถูกประมวลผลโดยผู้ปฏิบัติงานเมื่อใช้ tf.distribute.experimental_distribute_dataset หรือ tf.distribute.experimental_distribute_datasets_from_function ไม่รับประกัน โดยทั่วไปจำเป็นต้องใช้สิ่งนี้หากคุณใช้ tf.distribute เพื่อปรับการคาดการณ์ อย่างไรก็ตามคุณสามารถแทรกดัชนีสำหรับแต่ละองค์ประกอบในชุดงานและสั่งซื้อผลลัพธ์ตามนั้น ตัวอย่างต่อไปนี้เป็นตัวอย่างของวิธีการสั่งซื้อเอาต์พุต

 mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

ฉันจะเผยแพร่ข้อมูลของฉันได้อย่างไรหากฉันไม่ได้ใช้อินสแตนซ์ของ canonical tf.data.Dataset?

บางครั้งผู้ใช้ไม่สามารถใช้ tf.data.Dataset เพื่อแสดงอินพุตของตนและต่อมา API ที่กล่าวถึงข้างต้นเพื่อแจกจ่ายชุดข้อมูลไปยังอุปกรณ์หลายเครื่อง ในกรณีเช่นนี้คุณสามารถใช้เทนเซอร์ดิบหรืออินพุตจากเครื่องกำเนิดไฟฟ้า

ใช้ Exper_distribute_values_from_function สำหรับอินพุตเทนเซอร์แบบสุ่ม

strategy.run ยอมรับ tf.distribute.DistributedValues ซึ่งเป็นผลลัพธ์ของ next(iterator) การส่งผ่านค่าเมตริกซ์ใช้ experimental_distribute_values_from_function เพื่อสร้าง tf.distribute.DistributedValues จากเทนเซอร์ดิบ

 mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

ใช้ tf.data.Dataset.from_generator ถ้าอินพุตของคุณมาจากเครื่องกำเนิด

ถ้าคุณมีฟังก์ชันตัวสร้างที่คุณต้องการใช้คุณสามารถสร้างอินสแตนซ์ tf.data.Dataset โดยใช้ from_generator API

 mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)