![]() | ![]() | ![]() | ![]() |
ממשקי ה- API של tf.distribute מספקים דרך קלה למשתמשים להגדיל את האימונים שלהם ממכונה אחת למספר מכונות. בעת שינוי גודל המודל שלהם, המשתמשים צריכים גם להפיץ את הקלט שלהם על פני מספר מכשירים. tf.distribute
מספק ממשקי API בעזרתם תוכלו להפיץ באופן אוטומטי את הקלט שלכם בין מכשירים.
מדריך זה יציג בפניך את הדרכים השונות בהן תוכל ליצור מערך נתונים ומופצים מבוזרים באמצעות ממשקי API של tf.distribute
. בנוסף, הנושאים הבאים יוסקרו:
- אפשרויות שימוש, שיתוף ואצווה בעת שימוש ב-
tf.distribute.Strategy.experimental_distribute_dataset
ו-tf.distribute.Strategy.distribute_datasets_from_function
. - דרכים שונות בהן ניתן לחזור על מערך הנתונים המבוזר.
- ההבדלים בין
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
APIs ו-tf.data
APIs וכן מגבלות שמשתמשים עשויים להיתקל בהם בשימוש.
מדריך זה אינו מכסה שימוש בקלט מבוזר באמצעות ממשקי API של Keras.
מערכי נתונים מבוזרים
כדי להשתמש בקנה מידה של ממשקי ה- API של tf.distribute
, מומלץ למשתמשים להשתמש ב-tf.data.Dataset
כדי לייצג את הקלט שלהם. tf.distribute
נועד לעבוד ביעילות עםtf.data.Dataset
(למשל, איסוף אוטומטי מראש של נתונים לכל מכשיר מאיץ) כאשר אופטימיזציות ביצועים משולבות באופן קבוע ביישום. אם יש לך מקרה שימוש לשימוש במשהו אחר שאינוtf.data.Dataset
, עיין בסעיף מאוחר יותר במדריך זה. בלולאת אימונים לא מבוזרת, משתמשים יוצרים תחילה מופעtf.data.Dataset
ואזtf.data.Dataset
על האלמנטים. לדוגמה:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.4.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
כדי לאפשר למשתמשים להשתמש באסטרטגיית tf.distribute
עם שינויים מינימליים בקוד הקיים של המשתמש, הוצגו שני ממשקי API אשרtf.data.Dataset
מופעtf.data.Dataset
ויחזירו אובייקט מערך נתונים מבוזר. לאחר מכן יכול משתמש לחזור על מופע מערך נתונים מבוזר זה ולהכשיר את המודל שלו כמו קודם. בואו נסתכל על שני ממשקי ה- API - tf.distribute.Strategy.experimental_distribute_dataset
ו- tf.distribute.Strategy.distribute_datasets_from_function
בפירוט רב יותר:
tf.distribute.Strategy.experimental_distribute_dataset
נוֹהָג
ממשק API זה לוקח מופעtf.data.Dataset
כקלט ומחזיר מופע tf.distribute.DistributedDataset
. עליך לאגד את מערך הקלט עם ערך השווה לגודל האצווה הגלובלי. גודל אצווה עולמי זה הוא מספר הדוגמאות שברצונך לעבד בכל המכשירים בשלב אחד. אתה יכול לחזור על מערך הנתונים המבוזר הזה בצורה פיתונית או ליצור איטרטור באמצעות iter
. האובייקט שהוחזר אינו מופעtf.data.Dataset
ואינו תומך בשום ממשק API אחר שמשנה או בודק את מערך הנתונים בשום צורה שהיא. זהו ה- API המומלץ אם אין לך דרכים ספציפיות בהן ברצונך לשבור את הקלט שלך על פני עותקים שונים.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>)
נכסים
אצווה
tf.distribute
מחזיר מחדש את מופעtf.data.Dataset
הקלט עם גודל אצווה חדש השווה לגודל האצווה הגלובלי חלקי מספר העותקים המסונכרנים. מספר ההעתקים המסונכרנים שווה למספר המכשירים שלוקחים חלק בצמצום השיפוע במהלך האימון. כאשר משתמש מתקשר next
באיטרטור המבוזר, מוחזר גודל אצווה של נתונים לכל עותק משוכפל. הקרדינליות של מערך הנתונים המחודש תהיה תמיד מכפלה של מספר העתקים. להלן מספר דוגמאות:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- ללא הפצה:
- אצווה 1: [0, 1, 2, 3]
- אצווה 2: [4, 5]
עם הפצה על פני 2 העתקים. האצווה האחרונה ([4, 5]) מחולקת בין 2 העתקים.
אצווה 1:
- העתק 1: [0, 1]
- העתק 2: [2, 3]
אצווה 2:
- העתק 2: [4]
- העתק 2: [5]
tf.data.Dataset.range(4).batch(4)
- ללא הפצה:
- אצווה 1: [[0], [1], [2], [3]]
- עם הפצה על 5 העתקים:
- אצווה 1:
- העתק 1: [0]
- העתק 2: [1]
- העתק 3: [2]
- העתק 4: [3]
- העתק 5: []
tf.data.Dataset.range(8).batch(4)
- ללא הפצה:
- אצווה 1: [0, 1, 2, 3]
- אצווה 2: [4, 5, 6, 7]
- עם הפצה על 3 העתקים:
- אצווה 1:
- העתק 1: [0, 1]
- העתק 2: [2, 3]
- העתק 3: []
- אצווה 2:
- העתק 1: [4, 5]
- העתק 2: [6, 7]
- העתק 3: []
לתיקון מחדש של מערך הנתונים יש מורכבות שטח הגדלה באופן ליניארי עם מספר העתקים. המשמעות היא שבמקרה לשימוש בהכשרה מרובת עובדים צינור הקלט יכול להיתקל בשגיאות OOM.
רסיסים
tf.distribute
את מערך הקלט בהכשרה מרובת עובדים עם MultiWorkerMirroredStrategy
ו- TPUStrategy
. כל מערך נתונים נוצר במכשיר המעבד של העובד. שמירה אוטומטית של מערך נתונים על קבוצה של עובדים פירושה שכל עובד מוקצה למערכת משנה כולה (אם tf.data.experimental.AutoShardPolicy
). זאת על מנת להבטיח כי בכל אחד מהעבודות יעובד גודל אצווה עולמי של רכיבי מערך נתונים שאינם חופפים. tf.data.experimental.DistributeOptions
יש כמה אפשרויות שונות שניתן לציין באמצעות tf.data.experimental.DistributeOptions
. שים לב כי אין שמירה אוטומטית בהכשרה מרובת עובדים עם ParameterServerStrategy
, ומידע נוסף על יצירת מערך נתונים באמצעות אסטרטגיה זו ניתן למצוא במדריך לאסטרטגיית שרת פרמטרים .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
ישנן שלוש אפשרויות שונות שתוכל להגדיר עבור tf.data.experimental.AutoShardPolicy
:
- AUTO: זוהי אפשרות ברירת המחדל שמשמעותה נסיון להתגרש באמצעות FILE. הניסיון להתחלק על ידי FILE נכשל אם לא מתגלה מערך נתונים מבוסס קבצים.
tf.distribute
יחזור לאחר מכן להתמכר על ידי DATA. שים לב שאם מערך הקלט מבוסס על קבצים אך מספר הקבצים קטן ממספר העובדים,InvalidArgumentError
. אם זה קורה, הגדירו את המדיניות באופן מפורש ל-AutoShardPolicy.DATA
, או חלקו את מקור הקלט לקבצים קטנים יותר, כך שמספר הקבצים גדול ממספר העובדים. FILE: זוהי האפשרות אם ברצונך לשבור את קבצי הקלט על פני כל העובדים. עליך להשתמש באפשרות זו אם מספר קבצי הקלט גדול בהרבה ממספר העובדים והנתונים בקבצים מופצים באופן שווה. החיסרון של אפשרות זו הוא שיש עובדים לא פעילים אם הנתונים בקבצים לא מופצים באופן שווה. אם מספר הקבצים קטן ממספר העובדים,
InvalidArgumentError
תעלה. אם זה קורה, הגדר במפורש את המדיניות ל-AutoShardPolicy.DATA
. לדוגמה, הבה נפיץ 2 קבצים על פני 2 עובדים עם עותק משוכפל אחד. קובץ 1 מכיל [0, 1, 2, 3, 4, 5] וקובץ 2 מכיל [6, 7, 8, 9, 10, 11]. תן למספר ההעתקים המסונכרן להיות 2 ולגודל האצווה העולמי להיות 4.- עובד 0:
- אצווה 1 = העתק 1: [0, 1]
- אצווה 2 = העתק 1: [2, 3]
- אצווה 3 = העתק 1: [4]
- אצווה 4 = העתק 1: [5]
- עובד 1:
- אצווה 1 = העתק 2: [6, 7]
- אצווה 2 = העתק 2: [8, 9]
- אצווה 3 = העתק 2: [10]
- אצווה 4 = העתק 2: [11]
נתונים: פעולה זו תשמור אוטומטית את האלמנטים בכל העובדים. כל אחד מהעובדים יקרא את כל מערך הנתונים ויעבד רק את הרסיס שהוקצה לו. כל שאר השברים ייזרקו. זה משמש בדרך כלל אם מספר קבצי הקלט קטן ממספר העובדים ואתה מעוניין בשבירת נתונים טובה יותר בכל העובדים. החיסרון הוא שכל מערך הנתונים ייקרא על כל עובד. לדוגמא, הבה נפיץ קבצים על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2.
- עובד 0:
- אצווה 1 = העתק 1: [0, 1]
- אצווה 2 = העתק 1: [4, 5]
- אצווה 3 = העתק 1: [8, 9]
- עובד 1:
- אצווה 1 = העתק 2: [2, 3]
- אצווה 2 = העתק 2: [6, 7]
- אצווה 3 = העתק 2: [10, 11]
כבוי: אם תכבה את השמירה האוטומטית, כל עובד יעבד את כל הנתונים. לדוגמא, בואו נפיץ קובץ אחד על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2. ואז כל עובד יראה את ההתפלגות הבאה:
- עובד 0:
- אצווה 1 = העתק 1: [0, 1]
- אצווה 2 = העתק 1: [2, 3]
- אצווה 3 = העתק 1: [4, 5]
- אצווה 4 = העתק 1: [6, 7]
- אצווה 5 = העתק 1: [8, 9]
אצווה 6 = העתק 1: [10, 11]
עובד 1:
אצווה 1 = העתק 2: [0, 1]
אצווה 2 = העתק 2: [2, 3]
אצווה 3 = העתק 2: [4, 5]
אצווה 4 = העתק 2: [6, 7]
אצווה 5 = העתק 2: [8, 9]
אצווה 6 = העתק 2: [10, 11]
איסוף מראש
כברירת מחדל, tf.distribute
מוסיף טרנספורמציה tf.distribute
מראש בסוף המשתמש שסופק על ידיtf.data.Dataset
. הארגומנט לשינוי טרנספורמציה מראש שהוא buffer_size
שווה למספר העתקים המסונכרנים.
tf.distribute.Strategy.distribute_datasets_from_function
נוֹהָג
ממשק API זה לוקח פונקציית קלט ומחזיר מופע tf.distribute.DistributedDataset
. לפונקציית הקלט שמשתמשים מעבירים יש ארגומנט tf.distribute.InputContext
והיא צריכה להחזיר מופעtf.data.Dataset
. באמצעות ממשק API זה, tf.distribute
לא מבצע שינויים נוספים במופעtf.data.Dataset
של המשתמשtf.data.Dataset
הקלט. באחריות המשתמש לאחסן ולגרוס את מערך הנתונים. tf.distribute
קורא לפונקציית הקלט במכשיר המעבד של כל אחד מהעובדים. מלבד המאפשר למשתמשים לציין את לוגיקת האצווה והשבירה שלהם, ממשק API זה גם מדגים יכולת הרחבה וביצועים טובים יותר בהשוואה ל- tf.distribute.Strategy.experimental_distribute_dataset
כאשר משתמשים בו להכשרה מרובת עובדים.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
נכסים
אצווה
ישtf.data.Dataset
המופעtf.data.Dataset
שהוא ערך ההחזרה של פונקציית הקלט באמצעות גודל אצווה לפי עותק משוכפל. גודל האצווה לפי עותק משוכפל הוא גודל האצווה הגלובלי חלקי מספר העותקים המשוכפלים שלוקחים חלק באימון הסנכרון. הסיבה לכך היא ש- tf.distribute
קורא לפונקציית הקלט במכשיר המעבד של כל אחד מהעובדים. מערך הנתונים שנוצר על עובד מסוים צריך להיות מוכן לשימוש על ידי כל העתקים של אותו עובד.
רסיסים
האובייקט tf.distribute.InputContext
המועבר באופן מרומז tf.distribute.InputContext
לפונקציית הקלט של המשתמש נוצר על ידי tf.distribute
מתחת למכסה המנוע. יש לו מידע על מספר העובדים, מזהה העובד הנוכחי וכו '. פונקציית קלט זו יכולה להתמודד עם התפלגות בהתאם למדיניות שהגדיר המשתמש באמצעות מאפיינים אלה שהם חלק מהאובייקט tf.distribute.InputContext
.
איסוף מראש
tf.distribute
לא מוסיף טרנספורמציה מראש בהמשךtf.data.Dataset
שהוחזר על ידי פונקציית הקלט של המשתמש.
מחזיקים מחולקים
בדומהtf.data.Dataset
שאינם מופצים, יהיה עליך ליצור איטרטור tf.distribute.DistributedDataset
כדי לחזור עליו ולגשת לאלמנטים ב- tf.distribute.DistributedDataset
. להלן הדרכים בהן אתה יכול ליצור tf.distribute.DistributedIterator
ולהשתמש בו לאימון המודל שלך:
שימושים
השתמש ב- Pythonic for construct loop
אתה יכול להשתמש בלולאה פיתונית ידידותית למשתמש כדי לחזור על tf.distribute.DistributedDataset
. האלמנטים המוחזרים מ- tf.distribute.DistributedIterator
יכולים להיות tf.Tensor
יחיד או tf.distribute.DistributedValues
המכיל ערך לכל עותק משוכפל. הצבת הלולאה בתוך tf.function
תתן דחיפה לביצועים. עם זאת, כרגע לא נתמכת break
return
עבור לולאה מעל tf.distribute.DistributedDataset
שממוקם בתוך tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
השתמש iter
ליצירת איטרטר מפורש
כדי לחזר על אלמנטים בתוך tf.distribute.DistributedDataset
למשל, אתה יכול ליצור tf.distribute.DistributedIterator
באמצעות iter
API עליו. באמצעות איטרטור מפורש, אתה יכול לחזור על מספר קבוע של צעדים. על מנת לקבל את האלמנט הבא tf.distribute.DistributedIterator
dist_iterator
, אתה יכול להתקשר next(dist_iterator)
, dist_iterator.get_next()
או dist_iterator.get_next_as_optional()
. שני הראשונים זהים למעשה:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
עם next()
או tf.distribute.DistributedIterator.get_next()
, אם tf.distribute.DistributedIterator
הגיע לסיומו, תושלך שגיאת OutOfRange. הלקוח יכול לתפוס את השגיאה בצד הפיתון ולהמשיך לבצע עבודות אחרות כמו בדיקת בדיקה והערכה. עם זאת, זה לא יעבוד אם אתה משתמש בלולאת אימון מארחת (כלומר, הפעל שלבים מרובים לכל tf.function
), שנראית כמו:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
מכיל שלבים מרובים על ידי גלישת גוף המדרגות בתוך tf.range
. במקרה זה, איטרציות שונות בלולאה ללא תלות יכולות להתחיל במקביל, כך שניתן להפעיל שגיאת OutOfRange באיטרציות מאוחרות יותר לפני חישוב האיטרציות הקודמות. ברגע שנזרקת שגיאת OutOfRange, כל האופציות בפונקציה יופסקו מיד. אם מדובר במקרה כלשהו שתרצה להימנע ממנו, חלופה שלא זורקת שגיאת tf.distribute.DistributedIterator.get_next_as_optional()
היא tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
מחזיר tf.experimental.Optional
שמכיל את האלמנט הבא או ללא ערך אם tf.distribute.DistributedIterator
הגיע לסיומו.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
שימוש במאפיין element_spec
אם אתה מעביר את האלמנטים של מערך נתונים מבוזר ל- tf.function
ורוצה להבטיח tf.TypeSpec
, תוכל לציין את ארגומנט input_signature
של tf.function
. הפלט של מערך נתונים מבוזר הוא tf.distribute.DistributedValues
שיכולים לייצג את הקלט למכשיר יחיד או למספר מכשירים. כדי לקבל את ה- tf.TypeSpec
המתאים לערך מבוזר זה תוכלו להשתמש במאפיין element_spec
של מערך הנתונים המבוזר או אובייקט iterator מבוזר.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
אצוותים חלקיים
קבוצות חלקיות נתקלות כאשר מופעיtf.data.Dataset
שמשתמשים יוצרים עשויים להכיל גדלי אצווה שאינם מתחלקים באופן שווה במספר העתקים או כאשר הקרדינליות של מופע מערך הנתונים אינה ניתנת לחלוקה לפי גודל האצווה. המשמעות היא שכאשר מערך הנתונים מופץ על פני מספר עותקים משוכפלים, הקריאה next
לאיתורים חוזרים מסוימת תביא ל- OutOfRangeError. כדי לטפל במקרה שימוש זה, tf.distribute
מחזיר קבוצות דמה בגודל אצווה 0 על עותקים משוכפלים שאין להם עוד נתונים לעיבוד.
במקרה של עובד יחיד, אם הנתונים לא מוחזרים על ידי השיחה next
באיטורציה, נוצרות קבוצות דמה בגודל אצווה 0 ומשמשות יחד עם הנתונים האמיתיים במערך הנתונים. במקרה של קבוצות חלקיות, קבוצת הנתונים הגלובלית האחרונה תכלול נתונים אמיתיים לצד קבוצות דמה של נתונים. תנאי ההפסקה לעיבוד נתונים בודק כעת אם לאחד מהעתקים יש נתונים. אם אין נתונים על אף העתק, נזרקת שגיאת OutOfRange.
במקרה של מספר רב של עובדים, הערך הבוליאני המייצג נוכחות של נתונים על כל אחד מהעובדים נצבר באמצעות תקשורת רפליקות צולבת וזה משמש לזיהוי אם כל העובדים סיימו לעבד את מערך הנתונים המבוזר. מכיוון שהדבר כרוך בתקשורת צולבת בין העובדים, קיים עונש ביצוע כלשהו.
אזהרות
בעת שימוש בממשקי API של
tf.distribute.Strategy.experimental_distribute_dataset
עם הגדרת עובד מרובה, משתמשים מעביריםtf.data.Dataset
שקורא מקבצים. אםtf.data.experimental.AutoShardPolicy
מוגדר כ-AUTO
אוFILE
, גודל האצווה בפועל לפי שלב עשוי להיות קטן יותר מגודל האצווה הגלובלי שהוגדר על ידי המשתמש. זה יכול לקרות כאשר שאר האלמנטים בקובץ הם פחות מגודל האצווה הגלובלי. משתמשים יכולים למצות את מערך הנתונים ללא תלות במספר השלבים להפעלה או להגדיר אתtf.data.experimental.AutoShardPolicy
ל-DATA
כדי לעקוף אותו.כרגע לא ניתן לתמוך
tf.distribute
באמצעותtf.distribute
וכלtf.distribute
שישtf.distribute
הנתונים מתעלמות כרגע. לדוגמא, אםmap_fn
הנתונים שלך ישmap_fn
שמשתמש ב-tf.random.uniform
לסיבוב תמונה, יש לך גרף מערך נתונים תלוי במצב (כלומר הזרע האקראי) במכונה המקומית בה מתבצע תהליך הפיתון.ניסיוני
tf.data.experimental.OptimizationOptions
כברירת מחדל יכולות בהקשרים מסוימים - כמו למשל בשימוש יחד עםtf.distribute
- לגרום לירידת ביצועים. עליך לאפשר אותם רק לאחר שתאמת שהם מועילים לביצוע עומס העבודה שלך במסגרת הגדרת הפצה.אנא עיין במדריך זה כיצד לייעל את צינור הקלט שלך עם
tf.data
באופן כללי. כמה טיפים נוספים:אם יש לך מספר עובדים ומשתמשים ב-
tf.data.Dataset.list_files
ליצירת מערך נתונים מכל הקבצים התואמיםtf.data.Dataset.list_files
גלוב אחת או יותר, זכור להגדיר את טיעוןseed
או להגדירshuffle=False
כך שכל עובד יגרוס את הקובץ באופן עקבי.אם צינור הקלט שלך כולל גם דשדוש הנתונים ברמת הרשומה וגם ניתוח הנתונים, אלא אם כן הנתונים הלא מעובדים גדולים משמעותית מהנתונים המנותחים (וזה בדרך כלל לא המקרה), ערבב תחילה ואז ניתוח, כפי שמוצג בדוגמה הבאה. זה עשוי להועיל לשימוש בזיכרון וביצועיו.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
שומרים על מאגר פנימי של רכיביbuffer_size
, וכך הפחתתbuffer_size
יכולה להעלות את הבעיה ב- OOM.לא ניתן להבטיח את סדר עיבוד הנתונים על ידי העובדים בעת שימוש ב-
tf.distribute.experimental_distribute_dataset
אוtf.distribute.distribute_datasets_from_function
. בדרך כלל זה נדרש אם אתה משתמש ב-tf.distribute
כדי לחזות את קנה המידה. עם זאת ניתן להוסיף אינדקס עבור כל רכיב באצווה ולהזמין פלטים בהתאם. הקטע הבא הוא דוגמה לאופן הזמנת תפוקות.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
כיצד אוכל להפיץ את הנתונים שלי אם אינני משתמש במופע קנוני tf.data.Dataset?
לפעמים משתמשים אינם יכולים להשתמש ב-tf.data.Dataset
כדי לייצג את הקלט שלהם, ולאחר מכן את ה- API המוזכרים לעיל כדי להפיץ את מערך הנתונים למספר מכשירים. במקרים כאלה אתה יכול להשתמש בטורסרים גולמיים או בקלטים מגנרטור.
השתמש בתפקוד_הפצה_ערכי_אופן ניסיוני לתשומות טנזור שרירותיות
strategy.run
מקבל את tf.distribute.DistributedValues
שהיא הפלט של next(iterator)
. כדי להעביר את ערכי הטנסור, השתמש ב- experimental_distribute_values_from_function
כדי לבנות tf.distribute.DistributedValues
מ- tf.distribute.DistributedValues
raw.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
השתמש ב- tf.data.Dataset.from_generator אם הקלט שלך הוא מחולל
אם יש לך פונקציית גנרטור שבה אתה רוצה להשתמש, אתה יכול ליצור מופעtf.data.Dataset
באמצעות ממשק ה- API של from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)