דף זה תורגם על ידי Cloud Translation API.
Switch to English

קלט מבוזר

צפה ב- TensorFlow.org הפעל בגוגל קולאב צפה במקור ב- GitHub הורד מחברת

ממשקי ה- API של tf.distribute מספקים דרך קלה למשתמשים להגדיל את האימונים שלהם ממכונה אחת למספר מכונות. בעת שינוי גודל המודל שלהם, המשתמשים צריכים גם להפיץ את הקלט שלהם על פני מספר מכשירים. tf.distribute מספק ממשקי API בעזרתם תוכלו להפיץ באופן אוטומטי את הקלט שלכם בין מכשירים.

מדריך זה יציג בפניך את הדרכים השונות בהן תוכל ליצור מערך נתונים ומופצים מבוזרים באמצעות ממשקי API של tf.distribute . בנוסף, הנושאים הבאים יוסקרו:

מדריך זה אינו מכסה שימוש בקלט מבוזר באמצעות ממשקי API של Keras.

מערכי נתונים מבוזרים

כדי להשתמש בקנה מידה של ממשקי ה- API של tf.distribute , מומלץ למשתמשים להשתמש ב-tf.data.Dataset כדי לייצג את הקלט שלהם. tf.distribute נועד לעבוד ביעילות עםtf.data.Dataset (למשל, איסוף אוטומטי מראש של נתונים לכל מכשיר מאיץ) כאשר אופטימיזציות ביצועים משולבות באופן קבוע ביישום. אם יש לך מקרה שימוש לשימוש במשהו אחר שאינוtf.data.Dataset , עיין בסעיף מאוחר יותר במדריך זה. בלולאת אימונים לא מבוזרת, משתמשים יוצרים תחילה מופעtf.data.Dataset ואזtf.data.Dataset על האלמנטים. לדוגמה:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

כדי לאפשר למשתמשים להשתמש באסטרטגיית tf.distribute עם שינויים מינימליים בקוד הקיים של המשתמש, הוצגו שני ממשקי API אשרtf.data.Dataset מופעtf.data.Dataset ויחזירו אובייקט מערך נתונים מבוזר. לאחר מכן יכול משתמש לחזור על מופע מערך נתונים מבוזר זה ולהכשיר את המודל שלו כמו קודם. בואו נסתכל על שני ממשקי ה- API - tf.distribute.Strategy.experimental_distribute_dataset ו- tf.distribute.Strategy.distribute_datasets_from_function בפירוט רב יותר:

tf.distribute.Strategy.experimental_distribute_dataset

נוֹהָג

ממשק API זה לוקח מופעtf.data.Dataset כקלט ומחזיר מופע tf.distribute.DistributedDataset . עליך לאגד את מערך הקלט עם ערך השווה לגודל האצווה הגלובלי. גודל אצווה עולמי זה הוא מספר הדוגמאות שברצונך לעבד בכל המכשירים בשלב אחד. אתה יכול לחזור על מערך הנתונים המבוזר הזה בצורה פיתונית או ליצור איטרטור באמצעות iter . האובייקט שהוחזר אינו מופעtf.data.Dataset ואינו תומך בשום ממשק API אחר שמשנה או בודק את מערך הנתונים בשום צורה שהיא. זהו ה- API המומלץ אם אין לך דרכים ספציפיות בהן ברצונך לשבור את הקלט שלך על פני עותקים שונים.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

נכסים

אצווה

tf.distribute מחזיר מחדש את מופעtf.data.Dataset הקלט עם גודל אצווה חדש השווה לגודל האצווה הגלובלי חלקי מספר העותקים המסונכרנים. מספר ההעתקים המסונכרנים שווה למספר המכשירים שלוקחים חלק בצמצום השיפוע במהלך האימון. כאשר משתמש מתקשר next באיטרטור המבוזר, מוחזר גודל אצווה של נתונים לכל עותק משוכפל. הקרדינליות של מערך הנתונים המחודש תהיה תמיד מכפלה של מספר העתקים. להלן מספר דוגמאות:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • ללא הפצה:
    • אצווה 1: [0, 1, 2, 3]
    • אצווה 2: [4, 5]
    • עם הפצה על פני 2 העתקים. האצווה האחרונה ([4, 5]) מחולקת בין 2 העתקים.

    • אצווה 1:

      • העתק 1: [0, 1]
      • העתק 2: [2, 3]
    • אצווה 2:

      • העתק 2: [4]
      • העתק 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • ללא הפצה:
    • אצווה 1: [[0], [1], [2], [3]]
    • עם הפצה על 5 העתקים:
    • אצווה 1:
      • העתק 1: [0]
      • העתק 2: [1]
      • העתק 3: [2]
      • העתק 4: [3]
      • העתק 5: []
  • tf.data.Dataset.range(8).batch(4)

    • ללא הפצה:
    • אצווה 1: [0, 1, 2, 3]
    • אצווה 2: [4, 5, 6, 7]
    • עם הפצה על 3 העתקים:
    • אצווה 1:
      • העתק 1: [0, 1]
      • העתק 2: [2, 3]
      • העתק 3: []
    • אצווה 2:
      • העתק 1: [4, 5]
      • העתק 2: [6, 7]
      • העתק 3: []

לתיקון מחדש של מערך הנתונים יש מורכבות שטח הגדלה באופן ליניארי עם מספר העתקים. המשמעות היא שבמקרה לשימוש בהכשרה מרובת עובדים צינור הקלט יכול להיתקל בשגיאות OOM.

רסיסים

tf.distribute את מערך הקלט בהכשרה מרובת עובדים עם MultiWorkerMirroredStrategy ו- TPUStrategy . כל מערך נתונים נוצר במכשיר המעבד של העובד. שמירה אוטומטית של מערך נתונים על קבוצה של עובדים פירושה שכל עובד מוקצה למערכת משנה כולה (אם tf.data.experimental.AutoShardPolicy ). זאת על מנת להבטיח כי בכל אחד מהעבודות יעובד גודל אצווה עולמי של רכיבי מערך נתונים שאינם חופפים. tf.data.experimental.DistributeOptions יש כמה אפשרויות שונות שניתן לציין באמצעות tf.data.experimental.DistributeOptions . שים לב כי אין שמירה אוטומטית בהכשרה מרובת עובדים עם ParameterServerStrategy , ומידע נוסף על יצירת מערך נתונים באמצעות אסטרטגיה זו ניתן למצוא במדריך לאסטרטגיית שרת פרמטרים .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

ישנן שלוש אפשרויות שונות שתוכל להגדיר עבור tf.data.experimental.AutoShardPolicy :

  • AUTO: זוהי אפשרות ברירת המחדל שמשמעותה נסיון להתגרש באמצעות FILE. הניסיון להתחלק על ידי FILE נכשל אם לא מתגלה מערך נתונים מבוסס קבצים. tf.distribute יחזור לאחר מכן להתמכר על ידי DATA. שים לב שאם מערך הקלט מבוסס על קבצים אך מספר הקבצים קטן ממספר העובדים, InvalidArgumentError . אם זה קורה, הגדירו את המדיניות באופן מפורש ל- AutoShardPolicy.DATA , או חלקו את מקור הקלט לקבצים קטנים יותר, כך שמספר הקבצים גדול ממספר העובדים.
  • FILE: זוהי האפשרות אם ברצונך לשבור את קבצי הקלט על פני כל העובדים. עליך להשתמש באפשרות זו אם מספר קבצי הקלט גדול בהרבה ממספר העובדים והנתונים בקבצים מופצים באופן שווה. החיסרון של אפשרות זו הוא שיש עובדים לא פעילים אם הנתונים בקבצים לא מופצים באופן שווה. אם מספר הקבצים קטן ממספר העובדים, InvalidArgumentError תעלה. אם זה קורה, הגדר במפורש את המדיניות ל- AutoShardPolicy.DATA . לדוגמה, הבה נפיץ 2 קבצים על פני 2 עובדים עם עותק משוכפל אחד. קובץ 1 מכיל [0, 1, 2, 3, 4, 5] וקובץ 2 מכיל [6, 7, 8, 9, 10, 11]. תן למספר ההעתקים המסונכרן להיות 2 ולגודל האצווה העולמי להיות 4.

    • עובד 0:
    • אצווה 1 = העתק 1: [0, 1]
    • אצווה 2 = העתק 1: [2, 3]
    • אצווה 3 = העתק 1: [4]
    • אצווה 4 = העתק 1: [5]
    • עובד 1:
    • אצווה 1 = העתק 2: [6, 7]
    • אצווה 2 = העתק 2: [8, 9]
    • אצווה 3 = העתק 2: [10]
    • אצווה 4 = העתק 2: [11]
  • נתונים: פעולה זו תשמור אוטומטית את האלמנטים בכל העובדים. כל אחד מהעובדים יקרא את כל מערך הנתונים ויעבד רק את הרסיס שהוקצה לו. כל שאר השברים ייזרקו. זה משמש בדרך כלל אם מספר קבצי הקלט קטן ממספר העובדים ואתה מעוניין בשבירת נתונים טובה יותר בכל העובדים. החיסרון הוא שכל מערך הנתונים ייקרא על כל עובד. לדוגמא, הבה נפיץ קבצים על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2.

    • עובד 0:
    • אצווה 1 = העתק 1: [0, 1]
    • אצווה 2 = העתק 1: [4, 5]
    • אצווה 3 = העתק 1: [8, 9]
    • עובד 1:
    • אצווה 1 = העתק 2: [2, 3]
    • אצווה 2 = העתק 2: [6, 7]
    • אצווה 3 = העתק 2: [10, 11]
  • כבוי: אם תכבה את השמירה האוטומטית, כל עובד יעבד את כל הנתונים. לדוגמא, בואו נפיץ קובץ אחד על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2. ואז כל עובד יראה את ההתפלגות הבאה:

    • עובד 0:
    • אצווה 1 = העתק 1: [0, 1]
    • אצווה 2 = העתק 1: [2, 3]
    • אצווה 3 = העתק 1: [4, 5]
    • אצווה 4 = העתק 1: [6, 7]
    • אצווה 5 = העתק 1: [8, 9]
    • אצווה 6 = העתק 1: [10, 11]

    • עובד 1:

    • אצווה 1 = העתק 2: [0, 1]

    • אצווה 2 = העתק 2: [2, 3]

    • אצווה 3 = העתק 2: [4, 5]

    • אצווה 4 = העתק 2: [6, 7]

    • אצווה 5 = העתק 2: [8, 9]

    • אצווה 6 = העתק 2: [10, 11]

איסוף מראש

כברירת מחדל, tf.distribute מוסיף טרנספורמציה tf.distribute מראש בסוף המשתמש שסופק על ידיtf.data.Dataset . הארגומנט לשינוי טרנספורמציה מראש שהוא buffer_size שווה למספר העתקים המסונכרנים.

tf.distribute.Strategy.distribute_datasets_from_function

נוֹהָג

ממשק API זה לוקח פונקציית קלט ומחזיר מופע tf.distribute.DistributedDataset . לפונקציית הקלט שמשתמשים מעבירים יש ארגומנט tf.distribute.InputContext והיא צריכה להחזיר מופעtf.data.Dataset . באמצעות ממשק API זה, tf.distribute לא מבצע שינויים נוספים במופעtf.data.Dataset של המשתמשtf.data.Dataset הקלט. באחריות המשתמש לאחסן ולגרוס את מערך הנתונים. tf.distribute קורא לפונקציית הקלט במכשיר המעבד של כל אחד מהעובדים. מלבד המאפשר למשתמשים לציין את לוגיקת האצווה והשבירה שלהם, ממשק API זה גם מדגים יכולת הרחבה וביצועים טובים יותר בהשוואה ל- tf.distribute.Strategy.experimental_distribute_dataset כאשר משתמשים בו להכשרה מרובת עובדים.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

נכסים

אצווה

ישtf.data.Dataset המופעtf.data.Dataset שהוא ערך ההחזרה של פונקציית הקלט באמצעות גודל אצווה לפי עותק משוכפל. גודל האצווה לפי עותק משוכפל הוא גודל האצווה הגלובלי חלקי מספר העותקים המשוכפלים שלוקחים חלק באימון הסנכרון. הסיבה לכך היא ש- tf.distribute קורא לפונקציית הקלט במכשיר המעבד של כל אחד מהעובדים. מערך הנתונים שנוצר על עובד מסוים צריך להיות מוכן לשימוש על ידי כל העתקים של אותו עובד.

רסיסים

האובייקט tf.distribute.InputContext המועבר באופן מרומז tf.distribute.InputContext לפונקציית הקלט של המשתמש נוצר על ידי tf.distribute מתחת למכסה המנוע. יש לו מידע על מספר העובדים, מזהה העובד הנוכחי וכו '. פונקציית קלט זו יכולה להתמודד עם התפלגות בהתאם למדיניות שהגדיר המשתמש באמצעות מאפיינים אלה שהם חלק מהאובייקט tf.distribute.InputContext .

איסוף מראש

tf.distribute לא מוסיף טרנספורמציה מראש בהמשךtf.data.Dataset שהוחזר על ידי פונקציית הקלט של המשתמש.

מחזיקים מחולקים

בדומהtf.data.Dataset שאינם מופצים, יהיה עליך ליצור איטרטור tf.distribute.DistributedDataset כדי לחזור עליו ולגשת לאלמנטים ב- tf.distribute.DistributedDataset . להלן הדרכים בהן אתה יכול ליצור tf.distribute.DistributedIterator ולהשתמש בו לאימון המודל שלך:

שימושים

השתמש ב- Pythonic for construct loop

אתה יכול להשתמש בלולאה פיתונית ידידותית למשתמש כדי לחזור על tf.distribute.DistributedDataset . האלמנטים המוחזרים מ- tf.distribute.DistributedIterator יכולים להיות tf.Tensor יחיד או tf.distribute.DistributedValues המכיל ערך לכל עותק משוכפל. הצבת הלולאה בתוך tf.function תתן דחיפה לביצועים. עם זאת, כרגע לא נתמכת break return עבור לולאה מעל tf.distribute.DistributedDataset שממוקם בתוך tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

השתמש iter ליצירת איטרטר מפורש

כדי לחזר על אלמנטים בתוך tf.distribute.DistributedDataset למשל, אתה יכול ליצור tf.distribute.DistributedIterator באמצעות iter API עליו. באמצעות איטרטור מפורש, אתה יכול לחזור על מספר קבוע של צעדים. על מנת לקבל את האלמנט הבא tf.distribute.DistributedIterator dist_iterator , אתה יכול להתקשר next(dist_iterator) , dist_iterator.get_next() או dist_iterator.get_next_as_optional() . שני הראשונים זהים למעשה:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

עם next() או tf.distribute.DistributedIterator.get_next() , אם tf.distribute.DistributedIterator הגיע לסיומו, תושלך שגיאת OutOfRange. הלקוח יכול לתפוס את השגיאה בצד הפיתון ולהמשיך לבצע עבודות אחרות כמו בדיקת בדיקה והערכה. עם זאת, זה לא יעבוד אם אתה משתמש בלולאת אימון מארחת (כלומר, הפעל שלבים מרובים לכל tf.function ), שנראית כמו:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn מכיל שלבים מרובים על ידי גלישת גוף המדרגות בתוך tf.range . במקרה זה, איטרציות שונות בלולאה ללא תלות יכולות להתחיל במקביל, כך שניתן להפעיל שגיאת OutOfRange באיטרציות מאוחרות יותר לפני חישוב האיטרציות הקודמות. ברגע שנזרקת שגיאת OutOfRange, כל האופציות בפונקציה יופסקו מיד. אם מדובר במקרה כלשהו שתרצה להימנע ממנו, חלופה שלא זורקת שגיאת tf.distribute.DistributedIterator.get_next_as_optional() היא tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional מחזיר tf.experimental.Optional שמכיל את האלמנט הבא או ללא ערך אם tf.distribute.DistributedIterator הגיע לסיומו.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

שימוש במאפיין element_spec

אם אתה מעביר את האלמנטים של מערך נתונים מבוזר ל- tf.function ורוצה להבטיח tf.TypeSpec , תוכל לציין את ארגומנט input_signature של tf.function . הפלט של מערך נתונים מבוזר הוא tf.distribute.DistributedValues שיכולים לייצג את הקלט למכשיר יחיד או למספר מכשירים. כדי לקבל את ה- tf.TypeSpec המתאים לערך מבוזר זה תוכלו להשתמש במאפיין element_spec של מערך הנתונים המבוזר או אובייקט iterator מבוזר.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

אצוותים חלקיים

קבוצות חלקיות נתקלות כאשר מופעיtf.data.Dataset שמשתמשים יוצרים עשויים להכיל גדלי אצווה שאינם מתחלקים באופן שווה במספר העתקים או כאשר הקרדינליות של מופע מערך הנתונים אינה ניתנת לחלוקה לפי גודל האצווה. המשמעות היא שכאשר מערך הנתונים מופץ על פני מספר עותקים משוכפלים, הקריאה next לאיתורים חוזרים מסוימת תביא ל- OutOfRangeError. כדי לטפל במקרה שימוש זה, tf.distribute מחזיר קבוצות דמה בגודל אצווה 0 על עותקים משוכפלים שאין להם עוד נתונים לעיבוד.

במקרה של עובד יחיד, אם הנתונים לא מוחזרים על ידי השיחה next באיטורציה, נוצרות קבוצות דמה בגודל אצווה 0 ומשמשות יחד עם הנתונים האמיתיים במערך הנתונים. במקרה של קבוצות חלקיות, קבוצת הנתונים הגלובלית האחרונה תכלול נתונים אמיתיים לצד קבוצות דמה של נתונים. תנאי ההפסקה לעיבוד נתונים בודק כעת אם לאחד מהעתקים יש נתונים. אם אין נתונים על אף העתק, נזרקת שגיאת OutOfRange.

במקרה של מספר רב של עובדים, הערך הבוליאני המייצג נוכחות של נתונים על כל אחד מהעובדים נצבר באמצעות תקשורת רפליקות צולבת וזה משמש לזיהוי אם כל העובדים סיימו לעבד את מערך הנתונים המבוזר. מכיוון שהדבר כרוך בתקשורת צולבת בין העובדים, קיים עונש ביצוע כלשהו.

אזהרות

  • בעת שימוש בממשקי API של tf.distribute.Strategy.experimental_distribute_dataset עם הגדרת עובד מרובה, משתמשים מעביריםtf.data.Dataset שקורא מקבצים. אם tf.data.experimental.AutoShardPolicy מוגדר כ- AUTO או FILE , גודל האצווה בפועל לפי שלב עשוי להיות קטן יותר מגודל האצווה הגלובלי שהוגדר על ידי המשתמש. זה יכול לקרות כאשר שאר האלמנטים בקובץ הם פחות מגודל האצווה הגלובלי. משתמשים יכולים למצות את מערך הנתונים ללא תלות במספר השלבים להפעלה או להגדיר את tf.data.experimental.AutoShardPolicy ל- DATA כדי לעקוף אותו.

  • כרגע לא ניתן לתמוך tf.distribute באמצעות tf.distribute וכל tf.distribute שיש tf.distribute הנתונים מתעלמות כרגע. לדוגמא, אם map_fn הנתונים שלך יש map_fn שמשתמש ב- tf.random.uniform לסיבוב תמונה, יש לך גרף מערך נתונים תלוי במצב (כלומר הזרע האקראי) במכונה המקומית בה מתבצע תהליך הפיתון.

  • ניסיוני tf.data.experimental.OptimizationOptions כברירת מחדל יכולות בהקשרים מסוימים - כמו למשל בשימוש יחד עם tf.distribute - לגרום לירידת ביצועים. עליך לאפשר אותם רק לאחר שתאמת שהם מועילים לביצוע עומס העבודה שלך במסגרת הגדרת הפצה.

  • אנא עיין במדריך זה כיצד לייעל את צינור הקלט שלך עם tf.data באופן כללי. כמה טיפים נוספים:

    • אם יש לך מספר עובדים ומשתמשים ב- tf.data.Dataset.list_files ליצירת מערך נתונים מכל הקבצים התואמים tf.data.Dataset.list_files גלוב אחת או יותר, זכור להגדיר את טיעון seed או להגדיר shuffle=False כך שכל עובד יגרוס את הקובץ באופן עקבי.

    • אם צינור הקלט שלך כולל גם דשדוש הנתונים ברמת הרשומה וגם ניתוח הנתונים, אלא אם כן הנתונים הלא מעובדים גדולים משמעותית מהנתונים המנותחים (וזה בדרך כלל לא המקרה), ערבב תחילה ואז ניתוח, כפי שמוצג בדוגמה הבאה. זה עשוי להועיל לשימוש בזיכרון וביצועיו.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) שומרים על מאגר פנימי של רכיבי buffer_size , וכך הפחתת buffer_size יכולה להעלות את הבעיה ב- OOM.

  • לא ניתן להבטיח את סדר עיבוד הנתונים על ידי העובדים בעת שימוש ב- tf.distribute.experimental_distribute_dataset או tf.distribute.distribute_datasets_from_function . בדרך כלל זה נדרש אם אתה משתמש ב- tf.distribute כדי לחזות את קנה המידה. עם זאת ניתן להוסיף אינדקס עבור כל רכיב באצווה ולהזמין פלטים בהתאם. הקטע הבא הוא דוגמה לאופן הזמנת תפוקות.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

כיצד אוכל להפיץ את הנתונים שלי אם אינני משתמש במופע קנוני tf.data.Dataset?

לפעמים משתמשים אינם יכולים להשתמש ב-tf.data.Dataset כדי לייצג את הקלט שלהם, ולאחר מכן את ה- API המוזכרים לעיל כדי להפיץ את מערך הנתונים למספר מכשירים. במקרים כאלה אתה יכול להשתמש בטורסרים גולמיים או בקלטים מגנרטור.

השתמש בתפקוד_הפצה_ערכי_אופן ניסיוני לתשומות טנזור שרירותיות

strategy.run מקבל את tf.distribute.DistributedValues שהיא הפלט של next(iterator) . כדי להעביר את ערכי הטנסור, השתמש ב- experimental_distribute_values_from_function כדי לבנות tf.distribute.DistributedValues מ- tf.distribute.DistributedValues raw.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

השתמש ב- tf.data.Dataset.from_generator אם הקלט שלך הוא מחולל

אם יש לך פונקציית גנרטור שבה אתה רוצה להשתמש, אתה יכול ליצור מופעtf.data.Dataset באמצעות ממשק ה- API של from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)