ช่วยปกป้อง Great Barrier Reef กับ TensorFlow บน Kaggle เข้าร่วมท้าทาย

อินพุตแบบกระจาย

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

tf.distribute API ช่วยให้ผู้ใช้สามารถปรับขนาดการฝึกอบรมจากเครื่องเดียวไปยังหลายเครื่องได้ เมื่อปรับขนาดโมเดล ผู้ใช้ยังต้องกระจายข้อมูลเข้าในอุปกรณ์หลายเครื่อง tf.distribute จัดเตรียม API ซึ่งคุณสามารถแจกจ่ายอินพุตของคุณผ่านอุปกรณ์ต่างๆ ได้โดยอัตโนมัติ

คู่มือนี้จะแสดงวิธีต่างๆ ที่คุณสามารถสร้างชุดข้อมูลแบบกระจายและตัววนซ้ำโดยใช้ tf.distribute API นอกจากนี้ หัวข้อต่อไปนี้จะครอบคลุม:

คู่มือนี้ไม่ครอบคลุมการใช้อินพุตแบบกระจายกับ Keras API

ชุดข้อมูลแบบกระจาย

หากต้องการใช้ tf.distribute APIs เพื่อปรับขนาด ขอแนะนำให้ผู้ใช้ใช้ tf.data.Dataset เพื่อเป็นตัวแทนของอินพุต tf.distribute ถูกสร้างมาให้ทำงานอย่างมีประสิทธิภาพด้วย tf.data.Dataset (เช่น การดึงข้อมูลล่วงหน้าอัตโนมัติไปยังอุปกรณ์เร่งความเร็วแต่ละเครื่อง) โดยมีการเพิ่มประสิทธิภาพการทำงานอย่างสม่ำเสมอในการนำไปใช้งาน หากคุณมีกรณีการใช้งานอย่างอื่นที่ไม่ใช่ tf.data.Dataset โปรดดู ส่วน หลังในคู่มือนี้ ในการฝึกอบรมแบบไม่กระจาย ผู้ใช้จะสร้างอินสแตนซ์ tf.data.Dataset ก่อน แล้วจึงวนซ้ำองค์ประกอบ ตัวอย่างเช่น:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

เพื่อให้ผู้ใช้สามารถใช้กลยุทธ์ tf.distribute โดยมีการเปลี่ยนแปลงเล็กน้อยในโค้ดที่มีอยู่ของผู้ใช้ จึงมีการแนะนำ API สองรายการซึ่งจะแจกจ่ายอินสแตนซ์ tf.data.Dataset และส่งคืนออบเจ็กต์ชุดข้อมูลแบบกระจาย ผู้ใช้สามารถวนซ้ำบนอินสแตนซ์ชุดข้อมูลแบบกระจายนี้และฝึกโมเดลของตนเหมือนเมื่อก่อน ให้เราดูที่ API ทั้งสอง - tf.distribute.Strategy.experimental_distribute_dataset และ tf.distribute.Strategy.distribute_datasets_from_function ในรายละเอียดเพิ่มเติม:

tf.distribute.Strategy.experimental_distribute_dataset

การใช้งาน

API นี้ใช้อินสแตนซ์ tf.data.Dataset เป็นอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset คุณควรแบทช์ชุดข้อมูลอินพุตด้วยค่าที่เท่ากับขนาดแบทช์ส่วนกลาง ขนาดแบทช์ส่วนกลางนี้คือจำนวนตัวอย่างที่คุณต้องการประมวลผลในอุปกรณ์ทั้งหมดใน 1 ขั้นตอน คุณสามารถวนซ้ำชุดข้อมูลที่กระจายนี้ในรูปแบบ Pythonic หรือสร้างตัววนซ้ำโดยใช้ iter ออบเจ็กต์ที่ส่งคืนไม่ใช่อินสแตนซ์ tf.data.Dataset และไม่รองรับ API อื่นใดที่แปลงหรือตรวจสอบชุดข้อมูลไม่ว่าในทางใด นี่คือ API ที่แนะนำ หากคุณไม่มีวิธีเฉพาะเจาะจงที่คุณต้องการแบ่งข้อมูลอินพุตของคุณบนแบบจำลองต่างๆ

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

คุณสมบัติ

แบทช์

tf.distribute ทำการรีแบทช์อินพุตอินสแตนซ์ tf.data.Dataset ด้วยขนาดแบทช์ใหม่ที่เท่ากับขนาดแบทช์ส่วนกลางหารด้วยจำนวนเรพลิกาในการซิงค์ จำนวนแบบจำลองที่ซิงค์จะเท่ากับจำนวนอุปกรณ์ที่มีส่วนร่วมในการไล่ระดับทั้งหมดลดระหว่างการฝึก เมื่อผู้ใช้เรียกใช้ตัววนซ้ำแบบกระจาย ข้อมูลขนาดแบทช์ next แบบจำลองจะถูกส่งคืนในแต่ละแบบจำลอง คาร์ดินาลลิตี้ชุดข้อมูลที่รีแบทช์จะเป็นจำนวนของเรพลิกาเสมอ ต่อไปนี้คือตัวอย่างบางส่วน:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • ไม่มีการแจกจ่าย:
    • รุ่นที่ 1: [0, 1, 2, 3]
    • รุ่นที่ 2: [4, 5]
    • ด้วยการแจกจ่ายมากกว่า 2 แบบจำลอง ชุดสุดท้าย ([4, 5]) ถูกแบ่งระหว่าง 2 แบบจำลอง

    • รุ่นที่ 1:

      • แบบจำลอง 1:[0, 1]
      • แบบจำลอง 2:[2, 3]
    • ชุดที่ 2:

      • แบบจำลอง 2: [4]
      • แบบจำลอง 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • ไม่มีการแจกจ่าย:
    • รุ่นที่ 1: [[0], [1], [2], [3]]
    • ด้วยการแจกจ่ายมากกว่า 5 แบบจำลอง:
    • รุ่นที่ 1:
      • แบบจำลอง 1: [0]
      • แบบจำลอง 2: [1]
      • แบบจำลอง 3: [2]
      • แบบจำลอง 4: [3]
      • แบบจำลอง 5: []
  • tf.data.Dataset.range(8).batch(4)

    • ไม่มีการแจกจ่าย:
    • รุ่นที่ 1: [0, 1, 2, 3]
    • รุ่นที่ 2: [4, 5, 6, 7]
    • ด้วยการแจกจ่ายมากกว่า 3 แบบจำลอง:
    • รุ่นที่ 1:
      • แบบจำลอง 1: [0, 1]
      • แบบจำลอง 2: [2, 3]
      • แบบจำลอง 3: []
    • ชุดที่ 2:
      • แบบจำลอง 1: [4, 5]
      • แบบจำลอง 2: [6, 7]
      • แบบจำลอง 3: []

การรีแบทช์ชุดข้อมูลมีความซับซ้อนของพื้นที่เพิ่มขึ้นตามจำนวนเรพลิกา ซึ่งหมายความว่าสำหรับกรณีการใช้งานการฝึกอบรมผู้ปฏิบัติงานหลายคน ไปป์ไลน์อินพุตสามารถเรียกใช้ข้อผิดพลาด OOM ได้

ชาร์ดิง

tf.distribute ยังสร้างชาร์ดอัตโนมัติชุดข้อมูลอินพุตในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย MultiWorkerMirroredStrategy และ TPUStrategy แต่ละชุดข้อมูลถูกสร้างขึ้นบนอุปกรณ์ CPU ของผู้ปฏิบัติงาน การแบ่งกลุ่มข้อมูลอัตโนมัติบนชุดของผู้ปฏิบัติงานหมายความว่าผู้ปฏิบัติงานแต่ละคนได้รับมอบหมายชุดย่อยของชุดข้อมูลทั้งหมด (หากตั้งค่า tf.data.experimental.AutoShardPolicy ที่ถูกต้อง) ทั้งนี้เพื่อให้แน่ใจว่าในแต่ละขั้นตอน ขนาดแบทช์โกลบอลขององค์ประกอบชุดข้อมูลที่ไม่ทับซ้อนกันจะถูกประมวลผลโดยผู้ปฏิบัติงานแต่ละคน Autosharding มีตัวเลือกที่แตกต่างกันสองสามตัวที่สามารถระบุได้โดยใช้ tf.data.experimental.DistributeOptions โปรดทราบว่าไม่มีการชาร์ดอัตโนมัติในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย ParameterServerStrategy และข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูลด้วยกลยุทธ์นี้ สามารถดูได้ในบทช่วย สอนกลยุทธ์เซิร์ฟเวอร์พารามิเตอร์

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

มีสามตัวเลือกที่แตกต่างกันซึ่งคุณสามารถตั้งค่าสำหรับ tf.data.experimental.AutoShardPolicy :

  • อัตโนมัติ: นี่เป็นตัวเลือกเริ่มต้นซึ่งหมายความว่าจะมีการพยายามแบ่งส่วนข้อมูลโดย FILE ความพยายามที่จะชาร์ดโดย FILE ล้มเหลวหากตรวจไม่พบชุดข้อมูลแบบไฟล์ tf.distribute จะถอยกลับไปใช้การแบ่งกลุ่มตาม DATA โปรดทราบว่าหากชุดข้อมูลอินพุตเป็นแบบไฟล์ แต่จำนวนไฟล์น้อยกว่าจำนวนผู้ปฏิบัติงาน InvalidArgumentError จะเพิ่มขึ้น หากเกิดกรณีนี้ขึ้น ให้ตั้งค่านโยบายเป็น AutoShardPolicy.DATA อย่างชัดเจน หรือแยกแหล่งที่มาอินพุตของคุณเป็นไฟล์ขนาดเล็กลง เพื่อให้จำนวนไฟล์มากกว่าจำนวนผู้ปฏิบัติงาน
  • ไฟล์: นี่คือตัวเลือกถ้าคุณต้องการแบ่งไฟล์อินพุตให้กับผู้ปฏิบัติงานทั้งหมด คุณควรใช้ตัวเลือกนี้หากจำนวนไฟล์อินพุตมากกว่าจำนวนผู้ปฏิบัติงานมาก และข้อมูลในไฟล์มีการกระจายอย่างเท่าเทียมกัน ข้อเสียของตัวเลือกนี้คือมีผู้ปฏิบัติงานที่ไม่ได้ใช้งานหากข้อมูลในไฟล์ไม่กระจายอย่างเท่าเทียมกัน หากจำนวนไฟล์น้อยกว่าจำนวนผู้ปฏิบัติงาน InvalidArgumentError จะเพิ่มขึ้น หากเป็นเช่นนี้ ให้ตั้งค่านโยบายเป็น AutoShardPolicy.DATA อย่างชัดเจน ตัวอย่างเช่น ให้เราแจกจ่ายไฟล์ 2 ไฟล์ให้กับผู้ปฏิบัติงาน 2 คนโดยแต่ละไฟล์จำลอง 1 รายการ ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5] และไฟล์ 2 ประกอบด้วย [6, 7, 8, 9, 10, 11] ให้จำนวนเรพลิกาทั้งหมดในการซิงค์เป็น 2 และขนาดแบตช์ทั่วโลกคือ 4

    • คนทำงาน 0:
    • แบทช์ 1 = แบบจำลอง 1: [0, 1]
    • แบทช์ 2 = แบบจำลอง 1: [2, 3]
    • แบทช์ 3 = แบบจำลอง 1: [4]
    • แบทช์ 4 = แบบจำลอง 1: [5]
    • คนงาน 1:
    • แบทช์ 1 = แบบจำลอง 2: [6, 7]
    • แบทช์ 2 = แบบจำลอง 2: [8, 9]
    • แบทช์ 3 = แบบจำลอง 2: [10]
    • แบทช์ 4 = แบบจำลอง 2: [11]
  • ข้อมูล: สิ่งนี้จะทำการชาร์ดอัตโนมัติขององค์ประกอบในผู้ปฏิบัติงานทั้งหมด ผู้ปฏิบัติงานแต่ละคนจะอ่านชุดข้อมูลทั้งหมดและประมวลผลเฉพาะชาร์ดที่ได้รับมอบหมายเท่านั้น ชาร์ดอื่นๆ ทั้งหมดจะถูกยกเลิก โดยทั่วไปจะใช้สิ่งนี้หากจำนวนไฟล์อินพุตน้อยกว่าจำนวนผู้ปฏิบัติงาน และคุณต้องการการแบ่งส่วนข้อมูลที่ดีขึ้นในผู้ปฏิบัติงานทั้งหมด ข้อเสียคือชุดข้อมูลทั้งหมดจะถูกอ่านสำหรับผู้ปฏิบัติงานแต่ละคน ตัวอย่างเช่น ให้เราแจกจ่าย 1 ไฟล์มากกว่า 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2

    • คนทำงาน 0:
    • แบทช์ 1 = แบบจำลอง 1: [0, 1]
    • แบทช์ 2 = แบบจำลอง 1: [4, 5]
    • แบทช์ 3 = แบบจำลอง 1: [8, 9]
    • คนงาน 1:
    • แบทช์ 1 = แบบจำลอง 2: [2, 3]
    • แบทช์ 2 = แบบจำลอง 2: [6, 7]
    • แบทช์ 3 = แบบจำลอง 2: [10, 11]
  • ปิด: หากคุณปิดการชาร์ทอัตโนมัติ พนักงานแต่ละคนจะประมวลผลข้อมูลทั้งหมด ตัวอย่างเช่น ให้เราแจกจ่าย 1 ไฟล์มากกว่า 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2 จากนั้นผู้ปฏิบัติงานแต่ละคนจะเห็นการแจกจ่ายต่อไปนี้:

    • คนทำงาน 0:
    • แบทช์ 1 = แบบจำลอง 1: [0, 1]
    • แบทช์ 2 = แบบจำลอง 1: [2, 3]
    • แบทช์ 3 = แบบจำลอง 1: [4, 5]
    • แบทช์ 4 = แบบจำลอง 1: [6, 7]
    • แบทช์ 5 = แบบจำลอง 1: [8, 9]
    • แบทช์ 6 = แบบจำลอง 1: [10, 11]

    • คนงาน 1:

    • แบทช์ 1 = แบบจำลอง 2: [0, 1]

    • แบทช์ 2 = แบบจำลอง 2: [2, 3]

    • แบทช์ 3 = แบบจำลอง 2: [4, 5]

    • แบทช์ 4 = แบบจำลอง 2: [6, 7]

    • แบทช์ 5 = แบบจำลอง 2: [8, 9]

    • แบทช์ 6 = แบบจำลอง 2: [10, 11]

กำลังดึงข้อมูลล่วงหน้า

โดยค่าเริ่มต้น tf.distribute จะเพิ่มการแปลงการดึงข้อมูลล่วงหน้าที่ส่วนท้ายของอินสแตนซ์ tf.data.Dataset ที่ผู้ใช้ระบุ อาร์กิวเมนต์สำหรับการแปลงค่าล่วงหน้าซึ่งเป็น buffer_size เท่ากับจำนวนเรพลิกาที่ซิงค์

tf.distribute.Strategy.distribute_datasets_from_function

การใช้งาน

API นี้ใช้ฟังก์ชันอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset ฟังก์ชันอินพุตที่ผู้ใช้ส่งผ่านมีอาร์กิวเมนต์ tf.distribute.InputContext และควรส่งคืนอินสแตนซ์ tf.data.Dataset ด้วย API นี้ tf.distribute จะไม่ทำการเปลี่ยนแปลงใดๆ เพิ่มเติมกับอินสแตนซ์ tf.data.Dataset ของผู้ใช้ที่ส่งคืนจากฟังก์ชันอินพุต เป็นความรับผิดชอบของผู้ใช้ในการแบทช์และชาร์ดชุดข้อมูล tf.distribute เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของผู้ปฏิบัติงานแต่ละคน นอกเหนือจากการอนุญาตให้ผู้ใช้ระบุตรรกะการแบ่งกลุ่มและการแบ่งกลุ่มของตัวเองแล้ว API นี้ยังแสดงให้เห็นถึงความสามารถในการปรับขนาดและประสิทธิภาพที่ดีขึ้นเมื่อเทียบกับ tf.distribute.Strategy.experimental_distribute_dataset เมื่อใช้สำหรับการฝึกอบรมผู้ปฏิบัติงานหลายคน

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

คุณสมบัติ

แบทช์

อินสแตนซ์ tf.data.Dataset ที่เป็นค่าส่งคืนของฟังก์ชันอินพุตควรถูกแบทช์โดยใช้ขนาดแบทช์ต่อแบบจำลอง ขนาดชุดงานต่อแบบจำลองคือขนาดชุดงานส่วนกลางหารด้วยจำนวนแบบจำลองที่เข้าร่วมในการฝึกการซิงค์ นี่เป็นเพราะ tf.distribute เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของผู้ปฏิบัติงานแต่ละคน ชุดข้อมูลที่สร้างขึ้นจากผู้ปฏิบัติงานที่กำหนดควรพร้อมใช้งานโดยแบบจำลองทั้งหมดของผู้ปฏิบัติงานนั้น

ชาร์ดิง

ออบเจ็กต์ tf.distribute.InputContext ที่ส่งผ่านโดยปริยายเป็นอาร์กิวเมนต์ไปยังฟังก์ชันอินพุตของผู้ใช้ ถูกสร้างขึ้นโดย tf.distribute ภายใต้ประทุน มีข้อมูลเกี่ยวกับจำนวนผู้ปฏิบัติงาน รหัสผู้ปฏิบัติงานปัจจุบัน ฯลฯ ฟังก์ชันอินพุตนี้สามารถจัดการการแบ่งส่วนข้อมูลตามนโยบายที่กำหนดโดยผู้ใช้โดยใช้คุณสมบัติเหล่านี้ซึ่งเป็นส่วนหนึ่งของอ็อบเจ็กต์ tf.distribute.InputContext

กำลังดึงข้อมูลล่วงหน้า

tf.distribute ไม่ได้เพิ่มการแปลง prefetch ที่ส่วนท้ายของ tf.data.Dataset ที่ส่งคืนโดยผู้ใช้ที่ให้ฟังก์ชันอินพุต

Iterators แบบกระจาย

คล้ายกับอินสแตนซ์ tf.data.Dataset ที่ไม่ได้แจกจ่าย คุณจะต้องสร้างตัววนซ้ำบนอินสแตนซ์ tf.distribute.DistributedDataset เพื่อวนซ้ำและเข้าถึงองค์ประกอบใน tf.distribute.DistributedDataset ต่อไปนี้เป็นวิธีที่คุณสามารถสร้าง tf.distribute.DistributedIterator และใช้เพื่อฝึกโมเดลของคุณ:

ประเพณี

ใช้ Pythonic for loop construct

คุณสามารถใช้ลูป Pythonic ที่เป็นมิตรกับผู้ใช้เพื่อวนซ้ำบน tf.distribute.DistributedDataset อิลิเมนต์ที่ส่งคืนจาก tf.distribute.DistributedIterator สามารถเป็น tf.Tensor เดียวหรือ tf.distribute.DistributedValues ซึ่งมีค่าต่อการจำลอง การวางลูปใน tf.function จะช่วยเพิ่มประสิทธิภาพ อย่างไรก็ตาม ขณะนี้ไม่รองรับการ break และ return สำหรับการวนซ้ำ tf.distribute.DistributedDataset ที่วางอยู่ภายใน tf.function

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

ใช้ iter เพื่อสร้างตัววนซ้ำที่ชัดเจน

ในการวนซ้ำองค์ประกอบในอินสแตนซ์ tf.distribute.DistributedDataset คุณสามารถสร้าง tf.distribute.DistributedIterator โดยใช้ iter API ได้ ด้วยตัววนซ้ำที่ชัดเจน คุณสามารถทำซ้ำสำหรับจำนวนขั้นตอนที่แน่นอนได้ เพื่อรับองค์ประกอบถัดไปจากอินสแตนซ์ tf.distribute.DistributedIterator dist_iterator คุณสามารถเรียก next(dist_iterator) , dist_iterator.get_next() หรือ dist_iterator.get_next_as_optional() สองอันแรกนั้นเหมือนกันโดยพื้นฐานแล้ว:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

ด้วย next() หรือ tf.distribute.DistributedIterator.get_next() หาก tf.distribute.DistributedIterator ถึงจุดสิ้นสุด ข้อผิดพลาด OutOfRange จะถูกส่งออกไป ลูกค้าสามารถตรวจจับข้อผิดพลาดทางฝั่งหลามและทำงานอื่นๆ ต่อไปได้ เช่น ด่านตรวจและประเมินผล อย่างไรก็ตาม สิ่งนี้จะไม่ทำงานหากคุณใช้ลูปการฝึกโฮสต์ (เช่น รันหลายขั้นตอนต่อ tf.function ) ซึ่งมีลักษณะดังนี้:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn มีหลายขั้นตอนโดยการห่อตัวขั้นตอนภายใน tf.range ในกรณีนี้ การวนซ้ำที่แตกต่างกันในลูปที่ไม่มีการขึ้นต่อกันสามารถเริ่มต้นพร้อมกันได้ ดังนั้นจึงสามารถทริกเกอร์ข้อผิดพลาด OutOfRange ในการวนซ้ำในภายหลังก่อนที่การคำนวณการวนซ้ำก่อนหน้าจะเสร็จสิ้น เมื่อเกิดข้อผิดพลาด OutOfRange การดำเนินการทั้งหมดในฟังก์ชันจะถูกยกเลิกทันที หากเป็นบางกรณีที่คุณต้องการหลีกเลี่ยง ทางเลือกอื่นที่ไม่เกิดข้อผิดพลาด OutOfRange คือ tf.distribute.DistributedIterator.get_next_as_optional() get_next_as_optional ส่งคืน tf.experimental.Optional ซึ่งมีองค์ประกอบถัดไปหรือไม่มีค่าหาก tf.distribute.DistributedIterator ถึงจุดสิ้นสุด

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

การใช้คุณสมบัติ element_spec

หากคุณส่งผ่านองค์ประกอบของชุดข้อมูลที่แจกจ่ายไปยัง tf.function และต้องการการรับประกัน tf.TypeSpec คุณสามารถระบุอาร์กิวเมนต์ input_signature ของ tf.function ได้ เอาต์พุตของชุดข้อมูลแบบกระจายคือ tf.distribute.DistributedValues ซึ่งสามารถแสดงอินพุตไปยังอุปกรณ์เดียวหรือหลายอุปกรณ์ ในการรับ tf.TypeSpec ที่สอดคล้องกับค่าแบบกระจายนี้ คุณสามารถใช้คุณสมบัติ element_spec ของชุดข้อมูลแบบกระจายหรืออ็อบเจ็กต์ iterator แบบกระจาย

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

แบทช์บางส่วน

พบแบทช์บางส่วนเมื่ออินสแตนซ์ tf.data.Dataset ที่ผู้ใช้สร้างอาจมีขนาดแบทช์ที่ไม่แบ่งเท่า ๆ กันด้วยจำนวนของเรพพลิกา หรือเมื่อคาร์ดินาลลิตี้ของอินสแตนซ์ชุดข้อมูลไม่หารด้วยขนาดแบทช์ ซึ่งหมายความว่าเมื่อมีการแจกจ่ายชุดข้อมูลบนแบบจำลองหลายตัว การเรียก next บนตัววนซ้ำบางตัวจะส่งผลให้เกิด OutOfRangeError เพื่อจัดการกับกรณีการใช้งานนี้ tf.distribute จะส่งคืนแบตช์จำลองที่มีขนาดแบตช์ 0 บนเรพลิกาที่ไม่มีข้อมูลในการประมวลผลอีกต่อไป

สำหรับกรณีผู้ปฏิบัติงานคนเดียว หากไม่มีการส่งคืนข้อมูลโดยการโทร next บนตัววนซ้ำ แบตช์จำลองที่มีขนาดแบตช์ 0 จะถูกสร้างและใช้งานพร้อมกับข้อมูลจริงในชุดข้อมูล ในกรณีของแบทช์บางส่วน แบทช์โกลบอลชุดสุดท้ายจะมีข้อมูลจริงควบคู่ไปกับแบทช์จำลองของข้อมูล เงื่อนไขการหยุดสำหรับการประมวลผลข้อมูลในขณะนี้จะตรวจสอบว่าเรพลิกาใด ๆ มีข้อมูลหรือไม่ หากไม่มีข้อมูลบนแบบจำลองใดๆ ข้อผิดพลาด OutOfRange จะถูกส่งออกไป

สำหรับกรณีที่มีผู้ปฏิบัติงานหลายคน ค่าบูลีนที่แสดงการมีอยู่ของข้อมูลในแต่ละผู้ปฏิบัติงานจะถูกรวมโดยใช้การสื่อสารข้ามเรพลิกา และค่านี้ใช้เพื่อระบุว่าผู้ปฏิบัติงานทั้งหมดประมวลผลชุดข้อมูลที่แจกจ่ายเสร็จแล้วหรือไม่ เนื่องจากสิ่งนี้เกี่ยวข้องกับการสื่อสารระหว่างผู้ปฏิบัติงาน จึงมีบทลงโทษด้านประสิทธิภาพบางอย่างที่เกี่ยวข้อง

คำเตือน

  • เมื่อใช้ tf.distribute.Strategy.experimental_distribute_dataset APIs กับการตั้งค่าผู้ปฏิบัติงานหลายราย ผู้ใช้จะส่ง tf.data.Dataset ที่อ่านจากไฟล์ ถ้า tf.data.experimental.AutoShardPolicy ถูกตั้งค่าเป็น AUTO หรือ FILE ขนาดแบทช์ตามจริงต่อขั้นตอนอาจเล็กกว่าขนาดแบทช์ส่วนกลางที่ผู้ใช้กำหนด สิ่งนี้สามารถเกิดขึ้นได้เมื่อองค์ประกอบที่เหลือในไฟล์น้อยกว่าขนาดแบตช์ส่วนกลาง ผู้ใช้สามารถทำให้ชุดข้อมูลหมดโดยไม่ต้องขึ้นอยู่กับจำนวนขั้นตอนในการรัน หรือตั้งค่า tf.data.experimental.AutoShardPolicy เป็น DATA เพื่อแก้ไข

  • ขณะนี้ยังไม่รองรับการแปลงชุดข้อมูลแบบเก็บสถานะด้วย tf.distribute และการดำเนินการเก็บสถานะใดๆ ที่ชุดข้อมูลอาจมีจะถูกละเว้น ตัวอย่างเช่น หากชุดข้อมูลของคุณมี map_fn ที่ใช้ tf.random.uniform เพื่อหมุนภาพ แสดงว่าคุณมีกราฟชุดข้อมูลที่ขึ้นอยู่กับสถานะ (เช่น การสุ่มเมล็ด) บนเครื่องในพื้นที่ที่มีการดำเนินการกระบวนการหลาม

  • tf.data.experimental.OptimizationOptions ทดลองที่ถูกปิดใช้งานโดยค่าเริ่มต้นสามารถในบริบทบางอย่างได้ เช่น เมื่อใช้ร่วมกับ tf.distribute ทำให้ประสิทธิภาพลดลง คุณควรเปิดใช้งานหลังจากตรวจสอบว่ามีประโยชน์ต่อประสิทธิภาพของปริมาณงานของคุณในการตั้งค่าการกระจายเท่านั้น

  • โปรดดู คู่มือนี้ สำหรับวิธีเพิ่มประสิทธิภาพไพพ์ไลน์อินพุตของคุณด้วย tf.data โดยทั่วไป เคล็ดลับเพิ่มเติมบางประการ:

    • หากคุณมีผู้ปฏิบัติงานหลายคนและกำลังใช้ tf.data.Dataset.list_files เพื่อสร้างชุดข้อมูลจากไฟล์ทั้งหมดที่ตรงกับรูปแบบโกลบอลอย่างน้อยหนึ่งรูปแบบ อย่าลืมตั้งค่าอาร์กิวเมนต์ seed หรือตั้งค่า shuffle=False เพื่อให้ผู้ปฏิบัติงานแต่ละคนแบ่งไฟล์อย่างสม่ำเสมอ

    • หากไปป์ไลน์อินพุตของคุณมีทั้งการสับเปลี่ยนข้อมูลในระดับเร็กคอร์ดและการแยกวิเคราะห์ข้อมูล เว้นแต่ข้อมูลที่ไม่ถูกแยกวิเคราะห์จะมีขนาดใหญ่กว่าข้อมูลที่แยกวิเคราะห์อย่างมีนัยสำคัญ (ซึ่งโดยปกติไม่ใช่กรณี) สับเปลี่ยนก่อนแล้วจึงแยกวิเคราะห์ ดังที่แสดงในตัวอย่างต่อไปนี้ ซึ่งอาจเป็นประโยชน์ต่อการใช้งานหน่วยความจำและประสิทธิภาพ

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) รักษาบัฟเฟอร์ภายในขององค์ประกอบ buffer_size และด้วยเหตุนี้การลด buffer_size อาจช่วยบรรเทาปัญหา OOM ได้

  • ไม่รับประกันลำดับที่ข้อมูลถูกประมวลผลโดยผู้ปฏิบัติงานเมื่อใช้ tf.distribute.experimental_distribute_dataset หรือ tf.distribute.distribute_datasets_from_function โดยทั่วไปจำเป็นต้องใช้หากคุณใช้ tf.distribute เพื่อคาดการณ์มาตราส่วน อย่างไรก็ตาม คุณสามารถแทรกดัชนีสำหรับแต่ละองค์ประกอบในแบทช์และเอาต์พุตของคำสั่งซื้อได้ ตัวอย่างต่อไปนี้คือตัวอย่างวิธีการสั่งซื้อเอาต์พุต

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

ฉันจะแจกจ่ายข้อมูลของฉันได้อย่างไรหากฉันไม่ได้ใช้อินสแตนซ์ tf.data.Dataset ที่เป็นที่ยอมรับ

ในบางครั้ง ผู้ใช้ไม่สามารถใช้ tf.data.Dataset เพื่อแทนอินพุตของตน และต่อมาใช้ API ที่กล่าวถึงข้างต้นเพื่อแจกจ่ายชุดข้อมูลไปยังอุปกรณ์หลายเครื่อง ในกรณีเช่นนี้ คุณสามารถใช้เมตริกซ์ดิบหรืออินพุตจากเครื่องกำเนิดไฟฟ้าได้

ใช้ Experimental_distribute_values_from_function สำหรับอินพุตเทนเซอร์ตามอำเภอใจ

strategy.run ยอมรับ tf.distribute.DistributedValues ซึ่งเป็นผลลัพธ์ของ next(iterator) ในการส่งผ่านค่าเทนเซอร์ ให้ใช้ experimental_distribute_values_from_function เพื่อสร้าง tf.distribute.DistributedValues จากเมตริกซ์ดิบ

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
ตัวยึดตำแหน่ง23

ใช้ tf.data.Dataset.from_generator หากอินพุตของคุณมาจากตัวสร้าง

หากคุณมีฟังก์ชันตัวสร้างที่คุณต้องการใช้ คุณสามารถสร้างอินสแตนซ์ tf.data.Dataset โดยใช้ from_generator API

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.