דף זה תורגם על ידי Cloud Translation API.
Switch to English

קלט מבוזר

צפה ב TensorFlow.org הפעל ב- Google Colab צפה במקור ב- GitHub הורד מחברת

ממשקי ה- API של tf.distribute מספקים דרך קלה למשתמשים להרחיב את ההדרכה שלהם ממכונה יחידה למספר מכונות. בעת קנה המידה של הדגם שלהם, המשתמשים צריכים גם להפיץ את הקלט שלהם על פני מכשירים מרובים. tf.distribute מספק ממשקי API המשתמשים בהם תוכלו להפיץ אוטומטית את הקלט בין מכשירים.

מדריך זה יציג בפניכם את הדרכים השונות בהן תוכלו ליצור מערך נתונים tf.distribute באמצעות ממשקי API של tf.distribute . בנוסף, הנושאים הבאים יכוסו:

מדריך זה אינו מכסה שימוש בתשומות מבוזרות באמצעות ממשקי API של Keras.

ערכות נתונים מופצות

כדי להשתמש בממשקי API של tf.distribute גודל, מומלץ למשתמשים להשתמש ב- tf.data.Dataset כדי לייצג את הקלט שלהם. tf.distribute נעשה כדי לעבוד ביעילות עם tf.data.Dataset (לדוגמה, tf.data.Dataset נתונים אוטומטית מראש על כל מכשיר מאיץ) כאשר אופטימיזציות ביצועים משולבות באופן קבוע ביישום. אם יש לך מקרה שימוש לשימוש במשהו שאינו tf.data.Dataset , אנא עיין בסעיף מאוחר יותר במדריך זה. בלולאת אימונים שלא מופצת, המשתמשים יוצרים תחילה מופע tf.data.Dataset ואז tf.data.Dataset על האלמנטים. לדוגמה:

 import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.3.0

 global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

 
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

כדי לאפשר למשתמשים להשתמש באסטרטגיית tf.distribute עם שינויים מינימליים בקוד הקיים של המשתמש, הוצגו שני APIs tf.data.Dataset מופע tf.data.Dataset ויחזיר אובייקט נתונים מופץ. לאחר מכן המשתמש יכול לחזור על מופע הנתונים המופץ הזה ולהכשיר את המודל שלו כמו קודם. הבה נבחן כעת את שני ה- API - tf.distribute.Strategy.experimental_distribute_dataset ו- tf.distribute.Strategy.experimental_distribute_datasets_from_function בפירוט רב יותר:

tf.distribute.Strategy.experimental_distribute_dataset

נוֹהָג

ממשק API זה לוקח מופע tf.data.Dataset כקלט ומחזיר מופע tf.distribute.DistributedDataset . עליכם לערוך את מערך הנתונים הקלט בערך השווה לגודל האצווה הגלובלי. גודל אצווה גלובלי זה הוא מספר הדגימות שברצונך לעבד בכל המכשירים בשלב אחד. אתה יכול לחזור על מערך נתונים מבוזר זה בצורה פיתונית או ליצור איטרטור באמצעות iter . האובייקט המוחזר אינו מופע tf.data.Dataset ואינו תומך בממשקי API אחרים tf.data.Dataset או בודקים את מערך הנתונים בשום דרך. זהו ממשק ה- API המומלץ אם אין לך דרכים ספציפיות בהן ברצונך לחתוך את הקלט שלך על גבי העתקים שונים.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

נכסים

אצווה

tf.distribute מבצע מחדש את המופע tf.data.Dataset הקלט עם גודל אצווה חדש השווה לגודל האצווה הגלובלי חלקי במספר העותקים המשוכפלים בסנכרון. מספר העותקים המשוכפלים בסנכרון שווה למספר המכשירים שלוקחים חלק בהדרגה המופחתת במהלך האימונים. כאשר משתמש מתקשר next האיטרטור המופץ, מוחזר גודל אצווה לפי העתק של נתונים בכל העתק. קרדינליות הנתונים המשודרת שתמיד תהיה כפולה למספר העותקים המשוכפלים. להלן מספר דוגמאות:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    ללא הפצה:

    אצווה 1: [0, 1, 2, 3]

    אצווה 2: [4, 5]

    עם חלוקה של מעל 2 העתקים:

    אצווה 1: העתק 1: [0, 1] העתק 2: [2, 3]

    אצווה 2: העתק 2: [4] העתק 2: [5]

    האצווה האחרונה ([4, 5]) מפוצלת בין שני העתקים.

  • tf.data.Dataset.range(4).batch(4)

    ללא הפצה:

    אצווה 1: [[0], [1], [2], [3]]

    עם חלוקה של מעל 5 העתקים:

    אצווה 1: העתק 1: [0] העתק 2: [1] העתק 3: [2] העתק 4: [3] העתק 5: []

  • tf.data.Dataset.range(8).batch(4)

    ללא הפצה:

    אצווה 1: [0, 1, 2, 3]

    אצווה 2: [4, 5, 6, 7]

    עם חלוקה של מעל 3 העתקים:

    אצווה 1: העתק 1: [0, 1] העתק 2: [2, 3] העתק 3: []

    אצווה 2: העתק 1: [4, 5] העתק 2: [6, 7] העתק 3: []

לביצוע מחדש של מערך הנתונים יש מורכבות שטח הגוברת באופן לינארי עם מספר העותקים המשוכפלים. משמעות הדבר היא שבמקרה השימוש בהכשרה מרובת עובדים, צינור הקלט יכול להיתקל בשגיאות OOM.

גידור

tf.distribute גם tf.distribute על מערך הקלט בהכשרה מרובת עובדים. כל מערך נתונים נוצר במכשיר ה- CPU של העובד. שמירה אוטומטית על מערך נתונים על מערכת עובדים פירושה שלכל עובד מוקצה מערך משנה של מערך הנתונים כולו (אם tf.data.experimental.AutoShardPolicy ימין tf.data.experimental.AutoShardPolicy ). זאת בכדי להבטיח כי בכל אחד מהצעדים, יעבד כל עובד גודל אצווה גלובלי של אלמנטים של מערכי נתונים שאינם חופפים. tf.data.experimental.DistributeOptions יש כמה אפשרויות שונות שניתן לציין באמצעות tf.data.experimental.DistributeOptions .

 dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
 

ישנן שלוש אפשרויות שונות שאתה יכול להגדיר עבור tf.data.experimental.AutoShardPolicy :

  • AUTO: זוהי אפשרות ברירת המחדל שמשמעותה ייעשה ניסיון לחתוך על ידי FILE. הניסיון לחתוך על ידי FILE נכשל אם לא מתגלה מערך נתונים מבוסס קבצים. לאחר מכן tf.distribute יחזור לריסון על ידי DATA. שים לב שאם מערך הנתונים של הקלט מבוסס קבצים אך מספר הקבצים פחות ממספר העובדים, תעלה שגיאה.
  • קובץ: זוהי האפשרות אם ברצונך לחתוך את קבצי הקלט על כל העובדים. אם מספר הקבצים נמוך ממספר העובדים תתרחש שגיאה. עליך להשתמש באפשרות זו אם מספר קבצי הקלט גדול בהרבה ממספר העובדים והנתונים בקבצים מופצים באופן שווה. החיסרון של אפשרות זו הוא להעסיק עובדים סרק אם הנתונים בקבצים לא מופצים באופן שווה. לדוגמה, הבה נפיץ 2 קבצים על פני 2 עובדים עם העתק אחד כל אחד. קובץ 1 מכיל [0, 1, 2, 3, 4, 5] וקובץ 2 מכיל [6, 7, 8, 9, 10, 11]. תן למספר המשכפלים הכולל בסנכרון להיות 2 וגודל האצווה העולמי יהיה 4.

    • עובד 0:

    אצווה 1 = העתק 1: [0, 1]

    אצווה 2 = העתק 1: [2, 3]

    אצווה 3 = העתק 1: [4]

    אצווה 4 = העתק 1: [5]

    • עובד 1:

    אצווה 1 = העתק 2: [6, 7]

    אצווה 2 = העתק 2: [8, 9]

    אצווה 3 = העתק 2: [10]

    אצווה 4 = העתק 2: [11]

  • נתונים: זה יפרוש אוטומטית את האלמנטים בכל העובדים. כל אחד מהעובדים יקרא את מערך הנתונים המלא ויעבד רק את הגרעין שהוקצה לו. כל שאר הגרעינים יושלכו. בדרך כלל משתמשים בזה אם מספר קבצי הקלט הוא פחות ממספר העובדים ואתה מעוניין לנגן נתונים טובים יותר על כל העובדים. החיסרון הוא שכל מערך הנתונים יקרא על כל עובד. לדוגמה, הבה נפיץ 1 קבצים על פני שני עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר המשכפלים הכולל בסנכרון להיות 2.

    • עובד 0:

    אצווה 1 = העתק 1: [0, 1]

    אצווה 2 = העתק 1: [4, 5]

    אצווה 3 = העתק 1: [8, 9]

    • עובד 1:

    אצווה 1 = העתק 2: [2, 3]

    אצווה 2 = העתק 2: [6, 7]

    אצווה 3 = העתק 2: [10, 11]

  • כבוי: אם תכבה את האבטחה באופן אוטומטי, כל עובד יעבד את כל הנתונים. לדוגמה, הבה נפיץ 1 קבצים על פני שני עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר המשכפלים הכולל בסנכרון להיות 2. אז כל עובד יראה את התפוצה הבאה:

    • עובד 0:

    אצווה 1 = העתק 1: [0, 1]

    אצווה 2 = העתק 1: [2, 3]

    אצווה 3 = העתק 1: [4, 5]

    אצווה 4 = העתק 1: [6, 7]

    אצווה 5 = העתק 1: [8, 9]

    אצווה 6 = העתק 1: [10, 11]

    • עובד 1:

    אצווה 1 = העתק 2: [0, 1]

    אצווה 2 = העתק 2: [2, 3]

    אצווה 3 = העתק 2: [4, 5]

    אצווה 4 = העתק 2: [6, 7]

    אצווה 5 = העתק 2: [8, 9]

    אצווה 6 = העתק 2: [10, 11]

קדימה

כברירת מחדל, tf.distribute מוסיף טרנספורמציה של prefetch בסוף המשתמש סיפק מופע tf.data.Dataset . הוויכוח לשינוי buffer_size שהוא buffer_size שווה למספר העותקים המשוכפלים בסנכרון.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

נוֹהָג

ממשק API זה לוקח פונקציית קלט ומחזיר מופע tf.distribute.DistributedDataset . לפונקציית הקלט שמשתמשים עוברים יש ארגומנט tf.distribute.InputContext והיא צריכה להחזיר מופע tf.data.Dataset . עם ממשק API זה, tf.distribute אינו מבצע שינויים נוספים במופע tf.data.Dataset של המשתמש tf.data.Dataset הקלט. באחריות המשתמש לבצע אצווה ולגרד את מערך הנתונים. tf.distribute קורא לפונקציית הקלט במכשיר ה- CPU של כל אחד מהעובדים. מלבד מתן אפשרות למשתמשים לציין את היגיון האצווה והגריסה שלהם, ממשק API זה מדגים מדרגיות וביצועים טובים יותר בהשוואה ל- tf.distribute.Strategy.experimental_distribute_dataset כאשר הוא משמש לאימוני עובדים רבים.

 mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

נכסים

אצווה

יש tf.data.Dataset המופע tf.data.Dataset שהוא ערך החזרה של פונקציית הקלט באמצעות גודל אצווה לפי העתק. גודל האצווה לפי העתק הוא גודל האצווה העולמי המחולק במספר העותקים המשכפלים שלוקחים חלק באימוני סנכרון. הסיבה לכך היא ש- tf.distribute קורא לפונקציית הקלט במכשיר ה- CPU של כל אחד מהעובדים. מערך הנתונים שנוצר על ידי עובד נתון אמור להיות מוכן לשימוש על ידי כל העותקים המשוכפלים על אותו עובד.

גידור

האובייקט tf.distribute.InputContext המועבר באופן מרומז tf.distribute.InputContext לפונקציית הקלט של המשתמש נוצר על ידי tf.distribute מתחת למכסה המנוע. יש לו מידע על מספר העובדים, מזהה עובד נוכחי וכו '. פונקציית קלט זו יכולה להתמודד עם גניזות לפי המדיניות שקבע המשתמש באמצעות מאפיינים אלה המהווים חלק מאובייקט tf.distribute.InputContext .

קדימה

tf.distribute אינו מוסיף טרנספורמציה של prefetch בסוף tf.data.Dataset שהוחזר על ידי פונקציית הקלט שסופקה על ידי המשתמש.

איטטורים מבוזרים

בדומה tf.data.Dataset שאינם מופצים, תצטרך ליצור איטרטור tf.distribute.DistributedDataset כדי לחזור עליו ולגשת לאלמנטים ב- tf.distribute.DistributedDataset . להלן הדרכים בהן אתה יכול ליצור tf.distribute.DistributedIterator ולהשתמש בו כדי להכשיר את המודל שלך:

שימושים

השתמש בפיתוניק לבניית לולאה

אתה יכול להשתמש בלולאה פיתונית ידידותית למשתמש כדי לחזור על tf.distribute.DistributedDataset . tf.distribute.DistributedDataset . האלמנטים המוחזרים מ- tf.distribute.DistributedIterator יכולים להיות tf.Tensor בודד או tf.distribute.DistributedValues המכיל ערך לכל העתק. הצבת הלולאה בתוך פונקציה tf.function תתן דחיפה לביצועים. עם זאת, break return כרגע אינם נתמכים אם הלולאה ממוקמת בתוך פונקציה tf.function . אנו גם לא תומכים בהצבת הלולאה בתוך tf.function כאשר אנו משתמשים באסטרטגיות מרובות עובדים כמו tf.distribute.experimental.MultiWorkerMirroredStrategy ו- tf.distribute.TPUStrategy . הצבת הלולאה בתוך tf.function עובדת עבור עובד יחיד tf.distribute.TPUStrategy אך לא בעת שימוש בתרמילי TPU.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

השתמש ב- iter כדי ליצור איטרטור מפורש

כדי לחזור על האלמנטים במופע tf.distribute.DistributedDataset , אתה יכול ליצור tf.distribute.DistributedIterator באמצעות ממשק ה- iter עליו. בעזרת איטרטור מפורש, באפשרותך לחזור על מספר קבוע של שלבים. על מנת לקבל את האלמנט הבא tf.distribute.DistributedIterator dist_iterator , אתה יכול להתקשר next(dist_iterator) , dist_iterator.get_next() או dist_iterator.get_next_as_optional() . השניים הקודמים זהים למעשה:

 num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
 
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

עם next() או tf.distribute.DistributedIterator.get_next() , אם tf.distribute.DistributedIterator הגיע לקצהו, תושלך שגיאת OutOfRange. הלקוח יכול לתפוס את השגיאה בצד הפיתון ולהמשיך לבצע עבודות אחרות כמו בדיקת בדיקות והערכה. עם זאת, זה לא יעבוד אם אתה משתמש בלולאת אימונים של מארחים (כלומר, הפעל מספר צעדים לכל tf.function ), שנראה כמו:

 @tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
 

train_fn מכיל שלבים מרובים על ידי עטוף את גוף המדרגות בתוך tf.range . במקרה זה, איטרציות שונות בלולאה ללא תלות יכולות להתחיל במקביל, כך שניתן להפעיל שגיאת OutOfRange באיטרציות מאוחרות יותר לפני שהחישוב של איטרציות קודמות מסתיים. ברגע שתושלך שגיאת OutOfRange, כל האפשרויות בפונקציה יופסקו מייד. אם זה מקרה שאתה רוצה להימנע ממנו, אלטרנטיבה שאינה זורקת שגיאת tf.distribute.DistributedIterator.get_next_as_optional() היא tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional מחזיר tf.experimental.Optional שמכיל את האלמנט הבא או ללא ערך אם tf.distribute.DistributedIterator הגיע לסיומו.

 # You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
 
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

שימוש במאפיין element_spec

אם אתה מעביר את האלמנטים של מערך נתונים מופץ ל- tf.function ורוצה אחריות tf.TypeSpec , אתה יכול לציין את טיעון input_signature של tf.function . הפלט של מערך נתונים מבוזר הוא tf.distribute.DistributedValues שיכולים לייצג את הקלט למכשיר יחיד או למספר התקנים. כדי להשיג את ה- tf.TypeSpec התואם לערך מבוזר זה אתה יכול להשתמש במאפיין element_spec של מערך הנתונים המופץ או אובייקט iterator מבוזר.

 global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs
  
  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

קבוצות חלקיות

tf.data.Dataset חלקיות נתקלות כאשר מקרי tf.data.Dataset שמשתמשים יוצרים עשויים להכיל גדלי אצווה שאינם מתחלקים באופן שווה במספר העותקים המשוכפלים או כאשר הקרדינליות של מופע הנתונים אינה מתחלקת בגודל האצווה. המשמעות היא שכאשר מערך הנתונים מופץ על גבי מספר עותקים מרובים, השיחה next בכמה איטרטורים תביא ל- OutOfRangeError. כדי לטפל במקרה שימוש זה, tf.distribute מחזיר קבוצות דמה בגודל אצווה בגודל העתקים שאין להם נתונים נוספים לעיבוד.

למקרה עובד יחיד, אם לא יוחזרו נתונים על ידי השיחה next באיטטור, נוצרות אצוות דמה בגודל אצווה 0 ומשמשות יחד עם הנתונים האמיתיים במערך הנתונים. במקרה של קבוצות חלקיות, קבוצת הנתונים הגלובלית האחרונה תכיל נתונים אמיתיים לצד אצוות נתונים דמה. תנאי העצירה לעיבוד נתונים בודק כעת אם לאחד מהעתקים יש נתונים. אם אין נתונים על אף אחד מהעתקים, שגיאת OutOfRange נזרקת.

במקרה רב העובדים, הערך הבוליאני המייצג נוכחות של נתונים על כל אחד מהעובדים מצטבר באמצעות תקשורת העתק חוצה וזה משמש כדי לזהות אם כל העובדים סיימו לעבד את מערך הנתונים המופץ. מכיוון שהדבר כרוך בתקשורת בין עובדים יש מעורב מסוים בביצועים.

אזהרות

  • בעת שימוש בממשקי API של tf.distribute.Strategy.experimental_distribute_dataset עם מערך עובדים מרובים, משתמשים עוברים tf.data.Dataset שקורא מקבצים. אם tf.data.experimental.AutoShardPolicy מוגדר כ- AUTO או FILE , גודל האצווה בפועל לכל שלב עשוי להיות קטן מגודל האצווה הגלובלי המוגדר על ידי המשתמש. זה יכול לקרות כאשר האלמנטים הנותרים בקובץ הם פחות מגודל האצווה הגלובלי. משתמשים יכולים למצות את מערך הנתונים ללא tf.data.experimental.AutoShardPolicy במספר הצעדים להפעלה או הגדרת tf.data.experimental.AutoShardPolicy ל- DATA כדי לעקוף אותה.

  • טרנספורמציות tf.distribute לא נתמכות כרגע באמצעות tf.distribute וניתן tf.distribute מצבים tf.distribute . לדוגמה, אם map_fn הנתונים שלך יש map_fn שמשתמש tf.random.uniform כדי לסובב תמונה, יש לך גרף מערך נתונים שתלוי במצב (כלומר הזרע המקרי) במחשב המקומי בו מבוצע תהליך הפיתון.

  • ניסוי tf.data.experimental.OptimizationOptions כברירת מחדל עלולות בהקשרים מסוימים - כמו למשל בשימוש יחד עם tf.distribute - לגרום tf.distribute ביצועים. עליך להפעיל אותם רק לאחר שתאמת שהם יועילו לביצוע עומס העבודה שלך במסגרת תפוצה.

  • הסדר בו מעובדים הנתונים על ידי העובדים בעת השימוש tf.distribute.experimental_distribute_dataset או tf.distribute.experimental_distribute_datasets_from_function אינו מובטח. זה בדרך כלל נדרש אם אתה משתמש tf.distribute כדי חיזוי בקנה מידה. עם זאת, באפשרותך להוסיף אינדקס עבור כל אלמנט באצווה ולהזמין תפוקות בהתאם. הקטע הבא הוא דוגמה כיצד להזמין תפוקות.

 mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

כיצד אוכל להפיץ את הנתונים שלי אם אינני משתמש במופע tf.data.Dataset קנוני?

לפעמים משתמשים אינם יכולים להשתמש ב- tf.data.Dataset כדי לייצג את הקלט שלהם ובהמשך בממשקי ה- API שהוזכרו לעיל כדי להפיץ את מערך הנתונים למספר מכשירים. במקרים כאלה אתה יכול להשתמש בטנסורים גולמיים או בתשומות מגנרטור.

השתמש ב experimental_distribute_values_from_function לתשומות טנסור שרירותיות

strategy.run מקבל את tf.distribute.DistributedValues שהוא הפלט של next(iterator) . כדי להעביר את ערכי הטנזור, השתמש ב experimental_distribute_values_from_function כדי לבנות tf.distribute.DistributedValues של tf.distribute.DistributedValues גולמיים.

 mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

השתמש ב tf.data.Dataset.from_generator אם הקלט שלך מגנרטור

אם יש לך פונקציית גנרטור שאתה רוצה להשתמש בה, תוכל ליצור מופע tf.data.Dataset באמצעות ממשק ה- API של from_generator .

 mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)