หน้านี้ได้รับการแปลโดย Cloud Translation API
Switch to English

อินพุตแบบกระจาย

ดูใน TensorFlow.org เรียกใช้ใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดสมุดบันทึก

tf.distribute API เป็นวิธีง่ายๆสำหรับผู้ใช้ในการปรับขนาดการฝึกอบรมจากเครื่องเดียวไปยังหลายเครื่อง เมื่อปรับขนาดโมเดลผู้ใช้จะต้องแจกจ่ายข้อมูลที่ป้อนไปยังอุปกรณ์ต่างๆ tf.distribute มี API ที่คุณสามารถแจกจ่ายอินพุตของคุณไปยังอุปกรณ์ต่างๆโดยอัตโนมัติ

คู่มือนี้จะแสดงวิธีต่างๆในการสร้างชุดข้อมูลและตัววนซ้ำแบบกระจายโดยใช้ tf.distribute API นอกจากนี้หัวข้อต่อไปนี้จะครอบคลุม:

คู่มือนี้ไม่ครอบคลุมถึงการใช้อินพุตแบบกระจายกับ Keras API

ชุดข้อมูลแบบกระจาย

ในการใช้ tf.distribute API ในการปรับขนาดขอแนะนำให้ผู้ใช้ใช้tf.data.Dataset เพื่อแสดงอินพุตของตน tf.distribute ถูกสร้างขึ้นเพื่อให้ทำงานได้อย่างมีประสิทธิภาพกับtf.data.Dataset (ตัวอย่างเช่นการดึงข้อมูลล่วงหน้าอัตโนมัติไปยังอุปกรณ์เร่งความเร็วแต่ละเครื่อง) พร้อมกับการเพิ่มประสิทธิภาพการทำงานที่รวมอยู่ในการใช้งานอย่างสม่ำเสมอ หากคุณมีกรณีการใช้งานสำหรับการใช้อย่างอื่นที่ไม่ใช่tf.data.Dataset โปรดดู ส่วน ต่อไปในคู่มือนี้ ในลูปการฝึกอบรมที่ไม่กระจายผู้ใช้ต้องสร้างอินสแตนซ์tf.data.Dataset ก่อนแล้วจึงวนซ้ำองค์ประกอบ ตัวอย่างเช่น:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

ในการอนุญาตให้ผู้ใช้ใช้กลยุทธ์ tf.distribute โดยมีการเปลี่ยนแปลงเล็กน้อยกับโค้ดที่มีอยู่ของผู้ใช้จึงมีการแนะนำ API สองตัวซึ่งจะแจกจ่ายอินสแตนซ์tf.data.Dataset และส่งคืนอ็อบเจ็กต์ชุดข้อมูลแบบกระจาย จากนั้นผู้ใช้สามารถทำซ้ำบนอินสแตนซ์ชุดข้อมูลแบบกระจายนี้และฝึกโมเดลได้เหมือนเดิม ตอนนี้ให้เราดูสอง APIs - tf.distribute.Strategy.experimental_distribute_dataset และ tf.distribute.Strategy.distribute_datasets_from_function โดยละเอียด:

tf.distribute.Strategy.experimental_distribute_dataset

การใช้งาน

API นี้ใช้อินสแตนซ์tf.data.Dataset เป็นอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset คุณควรแบตช์ชุดข้อมูลอินพุตด้วยค่าที่เท่ากับขนาดแบทช์ส่วนกลาง ขนาดแบทช์ส่วนกลางนี้คือจำนวนตัวอย่างที่คุณต้องการประมวลผลในอุปกรณ์ทั้งหมดใน 1 ขั้นตอน คุณสามารถทำซ้ำชุดข้อมูลแบบกระจายนี้ในรูปแบบ Pythonic หรือสร้างตัววนซ้ำโดยใช้ iter ออบเจ็กต์ที่ส่งคืนไม่ใช่อินสแตนซ์tf.data.Dataset และไม่สนับสนุน API อื่น ๆ ที่แปลงหรือตรวจสอบชุดข้อมูลในทางใดทางหนึ่ง นี่คือ API ที่แนะนำหากคุณไม่มีวิธีเฉพาะเจาะจงที่คุณต้องการแบ่งข้อมูลที่คุณป้อนผ่านแบบจำลองต่างๆ

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

คุณสมบัติ

การผสม

tf.distribute รีแบทช์อินสแตนซ์tf.data.Dataset ใหม่ด้วยขนาดแบทช์ใหม่ที่เท่ากับขนาดแบทช์ส่วนกลางหารด้วยจำนวนแบบจำลองที่ซิงค์ จำนวนแบบจำลองที่ซิงค์จะเท่ากับจำนวนอุปกรณ์ที่มีส่วนร่วมในการไล่ระดับสีทั้งหมดในระหว่างการฝึกอบรม เมื่อผู้ใช้เรียกตัว next บนตัววนซ้ำแบบกระจายขนาดต่อชุดข้อมูลจำลองจะถูกส่งกลับในแต่ละตัวจำลอง คาร์ดินาลลิตี้ของชุดข้อมูล rebatched จะเป็นจำนวนเต็มของจำนวนการจำลองเสมอ นี่คือตัวอย่างสองสามตัวอย่าง:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • ไม่มีการแจกจ่าย:
    • ชุดที่ 1: [0, 1, 2, 3]
    • ชุดที่ 2: [4, 5]
    • ด้วยการจัดจำหน่ายมากกว่า 2 แบบ ชุดสุดท้าย ([4, 5]) ถูกแบ่งระหว่าง 2 แบบจำลอง

    • ชุดที่ 1:

      • แบบจำลอง 1: [0, 1]
      • แบบจำลอง 2: [2, 3]
    • ชุดที่ 2:

      • แบบจำลอง 2: [4]
      • แบบจำลอง 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • ไม่มีการแจกจ่าย:
    • ชุดที่ 1: [[0], [1], [2], [3]]
    • ด้วยการจัดจำหน่ายมากกว่า 5 แบบ:
    • ชุดที่ 1:
      • แบบจำลอง 1: [0]
      • แบบจำลอง 2: [1]
      • แบบจำลอง 3: [2]
      • แบบจำลอง 4: [3]
      • แบบจำลอง 5: []
  • tf.data.Dataset.range(8).batch(4)

    • ไม่มีการแจกจ่าย:
    • ชุดที่ 1: [0, 1, 2, 3]
    • ชุดที่ 2: [4, 5, 6, 7]
    • ด้วยการจัดจำหน่ายมากกว่า 3 แบบ:
    • ชุดที่ 1:
      • แบบจำลอง 1: [0, 1]
      • แบบจำลอง 2: [2, 3]
      • แบบจำลอง 3: []
    • ชุดที่ 2:
      • แบบจำลอง 1: [4, 5]
      • แบบจำลอง 2: [6, 7]
      • แบบจำลอง 3: []

การจับคู่ชุดข้อมูลซ้ำมีความซับซ้อนของช่องว่างที่เพิ่มขึ้นแบบเชิงเส้นด้วยจำนวนแบบจำลอง ซึ่งหมายความว่าสำหรับกรณีการใช้การฝึกอบรมผู้ปฏิบัติงานหลายคนไปป์ไลน์อินพุตอาจพบข้อผิดพลาด OOM

Sharding

tf.distribute ยังจัดเก็บข้อมูลชุดข้อมูลอินพุตโดยอัตโนมัติในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย MultiWorkerMirroredStrategy และ TPUStrategy ชุดข้อมูลแต่ละชุดถูกสร้างขึ้นบนอุปกรณ์ CPU ของผู้ปฏิบัติงาน การชาร์ตชุดข้อมูลโดยอัตโนมัติบนชุดของคนงานหมายความว่าผู้ปฏิบัติงานแต่ละคนได้รับมอบหมายชุดย่อยของชุดข้อมูลทั้งหมด (หากตั้งค่า tf.data.experimental.AutoShardPolicy ถูกต้อง) ทั้งนี้เพื่อให้แน่ใจว่าในแต่ละขั้นตอนผู้ปฏิบัติงานแต่ละคนจะประมวลผลขนาดแบตช์ขององค์ประกอบชุดข้อมูลที่ไม่ทับซ้อนกัน Autosharding มีสองตัวเลือกที่แตกต่างกันซึ่งสามารถระบุได้โดยใช้ tf.data.experimental.DistributeOptions โปรดทราบว่าไม่มีการชาร์ตอัตโนมัติในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย ParameterServerStrategy และข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูลด้วยกลยุทธ์นี้สามารถพบได้ใน บทแนะนำกลยุทธ์เซิร์ฟเวอร์พารามิเตอร์

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

มีสามตัวเลือกที่แตกต่างกันที่คุณสามารถตั้งค่าสำหรับ tf.data.experimental.AutoShardPolicy :

  • อัตโนมัติ: นี่คือตัวเลือกเริ่มต้นซึ่งหมายความว่าจะมีการพยายามสร้างเศษโดย FILE ความพยายามที่จะแบ่งโดย FILE ล้มเหลวหากตรวจไม่พบชุดข้อมูลที่ใช้ไฟล์ tf.distribute จะถอยกลับไปสู่การแตกโดย DATA โปรดทราบว่าหากชุดข้อมูลอินพุตเป็นแบบไฟล์ แต่จำนวนไฟล์น้อยกว่าจำนวนคนงานระบบจะเพิ่ม InvalidArgumentError ในกรณีนี้ให้ตั้งค่านโยบายเป็น AutoShardPolicy.DATA อย่างชัดเจนหรือแยกแหล่งอินพุตของคุณเป็นไฟล์ขนาดเล็กเช่นจำนวนไฟล์มากกว่าจำนวนคนทำงาน
  • FILE: นี่คือตัวเลือกหากคุณต้องการแบ่งไฟล์อินพุตให้กับคนงานทั้งหมด คุณควรใช้ตัวเลือกนี้หากจำนวนไฟล์อินพุตมากกว่าจำนวนคนงานมากและข้อมูลในไฟล์กระจายเท่า ๆ กัน ข้อเสียของตัวเลือกนี้คือมีพนักงานที่ไม่ได้ใช้งานหากข้อมูลในไฟล์ไม่กระจายอย่างเท่าเทียมกัน ถ้าจำนวนไฟล์น้อยกว่าจำนวนคนงานระบบจะเพิ่ม InvalidArgumentError ในกรณีนี้ให้ตั้งค่านโยบายเป็น AutoShardPolicy.DATA อย่างชัดเจน ตัวอย่างเช่นให้เราแจกจ่ายไฟล์ 2 ไฟล์กับคนงาน 2 คนโดยแต่ละคนจำลอง 1 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5] และไฟล์ 2 ประกอบด้วย [6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2 และขนาดชุดงานส่วนกลางเป็น 4

    • คนงาน 0:
    • ชุดที่ 1 = ตัวจำลอง 1: [0, 1]
    • ชุดที่ 2 = ตัวจำลอง 1: [2, 3]
    • ชุดที่ 3 = แบบจำลอง 1: [4]
    • ชุดที่ 4 = แบบจำลอง 1: [5]
    • คนงาน 1:
    • ชุดที่ 1 = ตัวจำลอง 2: [6, 7]
    • ชุดที่ 2 = ตัวจำลอง 2: [8, 9]
    • ชุดที่ 3 = ตัวจำลอง 2: [10]
    • ชุดที่ 4 = ตัวจำลอง 2: [11]
  • ข้อมูล: สิ่งนี้จะทำให้องค์ประกอบต่างๆโดยอัตโนมัติในคนงานทั้งหมด พนักงานแต่ละคนจะอ่านชุดข้อมูลทั้งหมดและประมวลผลเฉพาะส่วนที่กำหนดให้เท่านั้น ชิ้นส่วนอื่น ๆ ทั้งหมดจะถูกทิ้ง โดยทั่วไปจะใช้หากจำนวนไฟล์อินพุตน้อยกว่าจำนวนคนงานและคุณต้องการการแบ่งข้อมูลที่ดีขึ้นสำหรับคนงานทั้งหมด ข้อเสียคือชุดข้อมูลทั้งหมดจะถูกอ่านเกี่ยวกับคนงานแต่ละคน ตัวอย่างเช่นให้เราแจกจ่าย 1 ไฟล์กับพนักงาน 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2

    • คนงาน 0:
    • ชุดที่ 1 = ตัวจำลอง 1: [0, 1]
    • ชุดที่ 2 = ตัวจำลอง 1: [4, 5]
    • ชุดที่ 3 = แบบจำลอง 1: [8, 9]
    • คนงาน 1:
    • ชุดที่ 1 = ตัวจำลอง 2: [2, 3]
    • ชุดที่ 2 = ตัวจำลอง 2: [6, 7]
    • ชุดที่ 3 = ตัวจำลอง 2: [10, 11]
  • ปิด: หากคุณปิดการชาร์ตอัตโนมัติพนักงานแต่ละคนจะประมวลผลข้อมูลทั้งหมด ตัวอย่างเช่นให้เราแจกจ่าย 1 ไฟล์กับพนักงาน 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2 จากนั้นผู้ปฏิบัติงานแต่ละคนจะเห็นการแจกจ่ายต่อไปนี้:

    • คนงาน 0:
    • ชุดที่ 1 = ตัวจำลอง 1: [0, 1]
    • ชุดที่ 2 = ตัวจำลอง 1: [2, 3]
    • ชุดที่ 3 = แบบจำลอง 1: [4, 5]
    • ชุดที่ 4 = ตัวจำลอง 1: [6, 7]
    • ชุดที่ 5 = แบบจำลอง 1: [8, 9]
    • ชุดที่ 6 = แบบจำลอง 1: [10, 11]

    • คนงาน 1:

    • ชุดที่ 1 = ตัวจำลอง 2: [0, 1]

    • ชุดที่ 2 = ตัวจำลอง 2: [2, 3]

    • ชุดที่ 3 = ตัวจำลอง 2: [4, 5]

    • ชุดที่ 4 = ตัวจำลอง 2: [6, 7]

    • ชุดที่ 5 = ตัวจำลอง 2: [8, 9]

    • ชุดที่ 6 = ตัวจำลอง 2: [10, 11]

การกำหนดค่าล่วงหน้า

ตามค่าเริ่มต้น tf.distribute จะเพิ่มการแปลง prefetch ที่ส่วนท้ายของอินสแตนซ์tf.data.Dataset ผู้ใช้ระบุ อาร์กิวเมนต์สำหรับการแปลงการดึงข้อมูลล่วงหน้าซึ่งเป็น buffer_size เท่ากับจำนวนแบบจำลองที่ซิงค์

tf.distribute.Strategy.distribute_datasets_from_function

การใช้งาน

API นี้รับฟังก์ชันอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset ฟังก์ชันอินพุตที่ผู้ใช้ส่งผ่านมีอาร์กิวเมนต์ tf.distribute.InputContext และควรส่งคืนอินสแตนซ์tf.data.Dataset ด้วย API นี้ tf.distribute จะไม่ทำการเปลี่ยนแปลงใด ๆ เพิ่มเติมกับอินสแตนซ์tf.data.Dataset ของผู้ใช้ที่ส่งคืนจากฟังก์ชันอินพุต เป็นความรับผิดชอบของผู้ใช้ในการจัดกลุ่มและแบ่งชุดข้อมูล tf.distribute เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของผู้ปฏิบัติงานแต่ละคน นอกเหนือจากการอนุญาตให้ผู้ใช้ระบุ batching และ sharding logic ของตนเองแล้ว API นี้ยังแสดงให้เห็นถึงความสามารถในการปรับขนาดและประสิทธิภาพที่ดีขึ้นเมื่อเทียบกับ tf.distribute.Strategy.experimental_distribute_dataset เมื่อใช้สำหรับการฝึกอบรมพนักงานหลายคน

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

คุณสมบัติ

การผสม

อินสแตนซ์tf.data.Dataset ที่เป็นค่าส่งคืนของฟังก์ชันอินพุตควรเป็นแบตช์โดยใช้ขนาดแบตช์ต่อแบบจำลอง ขนาดแบตช์ต่อการจำลองคือขนาดแบทช์ส่วนกลางหารด้วยจำนวนของแบบจำลองที่เข้าร่วมในการฝึกการซิงค์ เนื่องจาก tf.distribute เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของผู้ปฏิบัติงานแต่ละคน ชุดข้อมูลที่สร้างขึ้นบนผู้ปฏิบัติงานที่กำหนดควรพร้อมใช้งานโดยตัวจำลองทั้งหมดของผู้ปฏิบัติงานนั้น

Sharding

วัตถุ tf.distribute.InputContext ที่ส่งโดยปริยายเป็นอาร์กิวเมนต์ไปยังฟังก์ชันอินพุตของผู้ใช้ถูกสร้างขึ้นโดย tf.distribute ภายใต้ประทุน มีข้อมูลเกี่ยวกับจำนวนคนงานรหัสผู้ปฏิบัติงานปัจจุบันเป็นต้นฟังก์ชันอินพุตนี้สามารถจัดการการชาร์ดตามนโยบายที่กำหนดโดยผู้ใช้โดยใช้คุณสมบัติเหล่านี้ซึ่งเป็นส่วนหนึ่งของอ็อบเจ็กต์ tf.distribute.InputContext

การกำหนดค่าล่วงหน้า

tf.distribute ไม่ได้เพิ่มการแปลง prefetch ที่ส่วนท้ายของtf.data.Dataset ส่งคืนโดยฟังก์ชันอินพุตที่ผู้ใช้ระบุ

Iterators แบบกระจาย

เช่นเดียวกับอินสแตนซ์tf.data.Dataset ไม่กระจายคุณจะต้องสร้างตัววนซ้ำบนอินสแตนซ์ tf.distribute.DistributedDataset เพื่อวนซ้ำและเข้าถึงองค์ประกอบใน tf.distribute.DistributedDataset ต่อไปนี้เป็นวิธีที่คุณสามารถสร้าง tf.distribute.DistributedIterator และใช้เพื่อฝึกโมเดลของคุณ:

ประเพณี

ใช้ Pythonic สำหรับการสร้างลูป

คุณสามารถใช้ Pythonic loop ที่เป็นมิตรกับผู้ใช้เพื่อวนซ้ำบน tf.distribute.DistributedDataset องค์ประกอบที่ส่งคืนจาก tf.distribute.DistributedIterator สามารถเป็น tf.Tensor เดียวหรือ tf.distribute.DistributedValues ซึ่งมีค่าต่อแบบจำลอง การวางลูปภายใน tf.function จะช่วยเพิ่มประสิทธิภาพ อย่างไรก็ตามขณะนี้ยังไม่รองรับการ break และ return สำหรับการวนซ้ำบน tf.distribute.DistributedDataset ที่อยู่ภายใน tf.function

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

ใช้ iter เพื่อสร้าง iterator ที่ชัดเจน

หากต้องการวนซ้ำองค์ประกอบในอินสแตนซ์ tf.distribute.DistributedDataset คุณสามารถสร้าง tf.distribute.DistributedIterator โดยใช้ iter API ด้วยตัววนซ้ำที่ชัดเจนคุณสามารถทำซ้ำได้ตามจำนวนขั้นตอนที่กำหนด ในการรับองค์ประกอบถัดไปจาก tf.distribute.DistributedIterator อินสแตนซ์ dist_iterator คุณสามารถเรียก next(dist_iterator) , dist_iterator.get_next() หรือ dist_iterator.get_next_as_optional() อดีตทั้งสองนั้นเหมือนกันเป็นหลัก:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

ด้วย next() หรือ tf.distribute.DistributedIterator.get_next() หาก tf.distribute.DistributedIterator ถึงจุดสิ้นสุดข้อผิดพลาด OutOfRange จะถูกโยนทิ้ง ไคลเอนต์สามารถตรวจจับข้อผิดพลาดในด้าน python และทำงานอื่น ๆ ต่อไปเช่นการตรวจสอบและประเมินผล อย่างไรก็ตามสิ่งนี้จะไม่ได้ผลหากคุณใช้โฮสต์การฝึกวนซ้ำ (เช่นรันหลายขั้นตอนต่อ tf.function ) ซึ่งมีลักษณะดังนี้:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn มีหลายขั้นตอนโดยการห่อเนื้อหาขั้นตอนไว้ใน tf.range ในกรณีนี้การวนซ้ำที่แตกต่างกันในลูปที่ไม่มีการพึ่งพาอาจเริ่มต้นควบคู่กันได้ดังนั้นข้อผิดพลาด OutOfRange สามารถถูกทริกเกอร์ในการทำซ้ำในภายหลังก่อนที่การคำนวณการทำซ้ำก่อนหน้านี้จะเสร็จสิ้น เมื่อเกิดข้อผิดพลาด OutOfRange การดำเนินการทั้งหมดในฟังก์ชันจะสิ้นสุดลงทันที หากเป็นบางกรณีที่คุณต้องการหลีกเลี่ยงทางเลือกอื่นที่ไม่ทำให้เกิดข้อผิดพลาด tf.distribute.DistributedIterator.get_next_as_optional() คือ tf.distribute.DistributedIterator.get_next_as_optional() get_next_as_optional ส่งคืน tf.experimental.Optional ซึ่งมีองค์ประกอบถัดไปหรือไม่มีค่าถ้า tf.distribute.DistributedIterator ถึงจุดสิ้นสุด

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

ใช้คุณสมบัติ element_spec

หากคุณส่งผ่านองค์ประกอบของชุดข้อมูลแบบกระจายไปยัง tf.function และต้องการการรับประกัน tf.TypeSpec คุณสามารถระบุอาร์กิวเมนต์ input_signature ของ tf.function เอาต์พุตของชุดข้อมูลแบบกระจายคือ tf.distribute.DistributedValues ซึ่งสามารถแสดงอินพุตไปยังอุปกรณ์เดียวหรือหลายอุปกรณ์ หากต้องการรับ tf.TypeSpec สอดคล้องกับค่าแบบกระจายนี้คุณสามารถใช้คุณสมบัติ element_spec ของชุดข้อมูลแบบกระจายหรืออ็อบเจ็กต์ตัววนซ้ำแบบกระจาย

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

แบทช์บางส่วน

พบแบตช์บางส่วนเมื่อtf.data.Dataset อินสแตนซ์ชุดข้อมูลที่ผู้ใช้สร้างขึ้นอาจมีขนาดแบตช์ที่หารด้วยจำนวนแบบจำลองไม่ได้เท่า ๆ กันหรือเมื่อความสำคัญของอินสแตนซ์ชุดข้อมูลหารด้วยขนาดชุดงานไม่ได้ ซึ่งหมายความว่าเมื่อชุดข้อมูลถูกกระจายไปบนตัวจำลองหลายตัวการเรียก next ซ้ำบางตัวจะทำให้เกิด OutOfRangeError ในการจัดการกรณีการใช้งานนี้ tf.distribute จะส่งคืนชุดดัมมี่ของขนาดแบตช์ 0 บนแบบจำลองที่ไม่มีข้อมูลให้ประมวลผลอีกต่อไป

สำหรับกรณีของผู้ปฏิบัติงานรายเดียวหากไม่มีการส่งคืนข้อมูลโดยการเรียก next บนตัววนซ้ำระบบจะสร้างและใช้ชุดข้อมูลจำลองที่มีขนาดชุดงาน 0 พร้อมกับข้อมูลจริงในชุดข้อมูล ในกรณีของแบทช์บางส่วนชุดข้อมูลส่วนกลางชุดสุดท้ายจะมีข้อมูลจริงควบคู่ไปกับชุดข้อมูลจำลอง ขณะนี้เงื่อนไขการหยุดการประมวลผลข้อมูลจะตรวจสอบว่าข้อมูลจำลองมีข้อมูลหรือไม่ หากไม่มีข้อมูลบนแบบจำลองใด ๆ แสดงว่าเกิดข้อผิดพลาด OutOfRange

สำหรับกรณีของผู้ปฏิบัติงานหลายคนค่าบูลีนที่แสดงถึงการมีอยู่ของข้อมูลในพนักงานแต่ละคนจะถูกรวมโดยใช้การสื่อสารแบบจำลองข้ามและใช้เพื่อระบุว่าคนงานทั้งหมดประมวลผลชุดข้อมูลแบบกระจายเสร็จแล้วหรือไม่ เนื่องจากสิ่งนี้เกี่ยวข้องกับการสื่อสารข้ามผู้ปฏิบัติงานจึงมีบทลงโทษด้านประสิทธิภาพบางประการ

ข้อควรระวัง

  • เมื่อใช้ tf.distribute.Strategy.experimental_distribute_dataset API ที่มีการตั้งค่าผู้ปฏิบัติงานหลายคนผู้ใช้จะส่งtf.data.Dataset ที่อ่านจากไฟล์ ถ้า tf.data.experimental.AutoShardPolicy ถูกตั้งค่าเป็น AUTO หรือ FILE ขนาดชุดงานจริงต่อขั้นตอนอาจมีขนาดเล็กกว่าที่ผู้ใช้กำหนดขนาดชุดงานส่วนกลาง สิ่งนี้สามารถเกิดขึ้นได้เมื่อองค์ประกอบที่เหลือในไฟล์มีขนาดน้อยกว่าขนาดแบทช์ส่วนกลาง ผู้ใช้สามารถใช้ชุดข้อมูลโดยไม่ต้องขึ้นอยู่กับจำนวนขั้นตอนในการรันหรือตั้งค่า tf.data.experimental.AutoShardPolicy เป็น DATA เพื่อแก้ไขปัญหา

  • ขณะนี้การแปลงชุดข้อมูลที่มีสถานะไม่ได้รับการสนับสนุนกับ tf.distribute และการดำเนินการแบบ stateful ใด ๆ ที่ชุดข้อมูลอาจถูกละเว้นในขณะนี้ ตัวอย่างเช่นหากชุดข้อมูลของคุณมี map_fn ที่ใช้ tf.random.uniform ในการหมุนรูปภาพคุณจะมีกราฟชุดข้อมูลที่ขึ้นอยู่กับสถานะ (เช่นเมล็ดพันธุ์แบบสุ่ม) บนเครื่องโลคัลที่กำลังดำเนินการกระบวนการ python

  • Experimental tf.data.experimental.OptimizationOptions ที่ปิดใช้งานโดยค่าเริ่มต้นสามารถในบริบทบางอย่างเช่นเมื่อใช้ร่วมกับ tf.distribute - ทำให้ประสิทธิภาพลดลง คุณควรเปิดใช้งานหลังจากที่คุณตรวจสอบแล้วว่ามีประโยชน์ต่อประสิทธิภาพของปริมาณงานของคุณในการตั้งค่าการแจกจ่าย

  • โปรดดู คู่มือนี้ สำหรับวิธีเพิ่มประสิทธิภาพไปป์ไลน์อินพุตของคุณด้วย tf.data โดยทั่วไป เคล็ดลับเพิ่มเติมบางประการ:

    • หากคุณมีพนักงานหลายคนและใช้ tf.data.Dataset.list_files เพื่อสร้างชุดข้อมูลจากไฟล์ทั้งหมดที่ตรงกับรูปแบบ glob อย่างน้อยหนึ่งรูปแบบอย่าลืมตั้งค่าอาร์กิวเมนต์ seed หรือ set shuffle=False เพื่อให้ผู้ปฏิบัติงานแต่ละคนแบ่งไฟล์อย่างสม่ำเสมอ

    • หากไปป์ไลน์อินพุตของคุณมีทั้งการสลับข้อมูลในระดับเร็กคอร์ดและการแยกวิเคราะห์ข้อมูลเว้นแต่ว่าข้อมูลที่ไม่ได้แยกวิเคราะห์จะมีขนาดใหญ่กว่าข้อมูลที่แยกวิเคราะห์อย่างมีนัยสำคัญ (ซึ่งโดยปกติไม่เป็นเช่นนั้น) ให้สับเปลี่ยนก่อนแล้วจึงแยกวิเคราะห์ดังที่แสดงในตัวอย่างต่อไปนี้ ซึ่งอาจเป็นประโยชน์ต่อการใช้งานหน่วยความจำและประสิทธิภาพ

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) รักษาบัฟเฟอร์ภายในขององค์ประกอบ buffer_size และการลด buffer_size อาจช่วยลดปัญหา OOM ได้

  • ไม่รับประกันลำดับที่ผู้ปฏิบัติงานประมวลผลข้อมูลเมื่อใช้ tf.distribute.experimental_distribute_dataset หรือ tf.distribute.distribute_datasets_from_function โดยทั่วไปจำเป็นต้องใช้หากคุณใช้ tf.distribute เพื่อปรับขนาดการคาดการณ์ อย่างไรก็ตามคุณสามารถแทรกดัชนีสำหรับแต่ละองค์ประกอบในชุดงานและสั่งซื้อผลลัพธ์ตามนั้น ตัวอย่างต่อไปนี้เป็นตัวอย่างวิธีการสั่งซื้อเอาต์พุต

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

ฉันจะแจกจ่ายข้อมูลของฉันได้อย่างไรหากฉันไม่ได้ใช้อินสแตนซ์ tf.data.Dataset แบบบัญญัติ

บางครั้งผู้ใช้ไม่สามารถใช้tf.data.Dataset เพื่อแสดงอินพุตของตนและต่อมา API ที่กล่าวถึงข้างต้นเพื่อแจกจ่ายชุดข้อมูลไปยังอุปกรณ์หลายเครื่อง ในกรณีเช่นนี้คุณสามารถใช้เทนเซอร์ดิบหรืออินพุตจากเครื่องกำเนิดไฟฟ้า

ใช้ trial_distribute_values_from_function สำหรับอินพุตเทนเซอร์โดยพลการ

strategy.run ยอมรับ tf.distribute.DistributedValues ซึ่งเป็นผลลัพธ์ของ next(iterator) การส่งผ่านค่าเมตริกซ์ใช้ experimental_distribute_values_from_function เพื่อสร้าง tf.distribute.DistributedValues จากเทนเซอร์ดิบ

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

ใช้ tf.data.Dataset.from_generator หากอินพุตของคุณมาจากเครื่องกำเนิดไฟฟ้า

ถ้าคุณมีฟังก์ชันตัวสร้างที่คุณต้องการใช้คุณสามารถสร้างอินสแตนซ์tf.data.Dataset โดยใช้ from_generator API

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)