קלט מבוזר

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

ממשקי ה-API של tf.distribute מספקים דרך קלה למשתמשים לשנות את ההכשרה שלהם ממכונה בודדת למספר מכונות. כאשר מרחיבים את המודל שלהם, המשתמשים צריכים גם להפיץ את הקלט שלהם על פני מספר מכשירים. tf.distribute מספק ממשקי API שבאמצעותם תוכל להפיץ את הקלט שלך באופן אוטומטי בין מכשירים.

מדריך זה יראה לך את הדרכים השונות שבהן תוכל ליצור מערך נתונים ואיטרטורים מבוזרים באמצעות ממשקי API tf.distribute . בנוסף, הנושאים הבאים יידונו:

מדריך זה אינו מכסה שימוש בקלט מבוזר עם ממשקי API של Keras.

מערכי נתונים מבוזרים

כדי להשתמש בממשקי API של tf.distribute מידה, מומלץ למשתמשים להשתמש ב- tf.data.Dataset כדי לייצג את הקלט שלהם. tf.distribute נעשתה לעבוד ביעילות עם tf.data.Dataset (לדוגמה, שליפה אוטומטית מראש של נתונים לכל מכשיר מאיץ) עם אופטימיזציות של ביצועים שמשולבות באופן קבוע בהטמעה. אם יש לך מקרה שימוש לשימוש במשהו אחר מלבד tf.data.Dataset , עיין בסעיף מאוחר יותר במדריך זה. בלולאת אימון לא מבוזרת, משתמשים יוצרים תחילה מופע tf.data.Dataset ולאחר מכן חוזרים על האלמנטים. לדוגמה:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

כדי לאפשר למשתמשים להשתמש באסטרטגיית tf.distribute עם שינויים מינימליים בקוד הקיים של המשתמש, הוצגו שני ממשקי API אשר יפיצו מופע tf.data.Dataset ויחזירו אובייקט מערך נתונים מבוזר. לאחר מכן, משתמש יוכל לחזור על מופע הנתונים המבוזר הזה ולאמן את המודל שלו כמו קודם. הבה נסתכל כעת על שני ממשקי ה-API - tf.distribute.Strategy.experimental_distribute_dataset ו- tf.distribute.Strategy.distribute_datasets_from_function בפירוט רב יותר:

tf.distribute.Strategy.experimental_distribute_dataset

נוֹהָג

ממשק API זה לוקח מופע tf.data.Dataset כקלט ומחזיר מופע tf.distribute.DistributedDataset . עליך לאסוף את מערך הנתונים של הקלט עם ערך השווה לגודל האצווה הגלובלי. גודל אצווה גלובלי זה הוא מספר הדגימות שברצונך לעבד בכל המכשירים בשלב אחד. אתה יכול לחזור על מערך הנתונים המבוזר הזה בצורה פייתונית או ליצור איטרטור באמצעות iter . האובייקט המוחזר אינו מופע tf.data.Dataset ואינו תומך באף ממשק API אחר אשר ממיר או בודק את מערך הנתונים בשום אופן. זהו ה-API המומלץ אם אין לך דרכים ספציפיות שבהן ברצונך לחלק את הקלט שלך על פני העתקים שונים.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

נכסים

אצווה

tf.distribute משחזרת את מופע הקלט tf.data.Dataset עם גודל אצווה חדש השווה לגודל האצווה העולמי חלקי מספר העתקים המסונכרנים. מספר ההעתקים המסונכרנים שווה למספר המכשירים שלוקחים חלק בשיפוע כל הפחתה במהלך האימון. כאשר משתמש קורא ל- next באיטרטור המבוזר, גודל אצווה לכל עותק מוחזר על כל עותק. הקרדינליות של מערך הנתונים המחודשים תהיה תמיד כפולה של מספר ההעתקים. הנה כמה דוגמאות:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • ללא הפצה:
    • אצווה 1: [0, 1, 2, 3]
    • אצווה 2: [4, 5]
    • עם הפצה מעל 2 העתקים. האצווה האחרונה ([4, 5]) מחולקת בין 2 העתקים.

    • אצווה 1:

      • העתק 1:[0, 1]
      • העתק 2:[2, 3]
    • אצווה 2:

      • העתק 2: [4]
      • העתק 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • ללא הפצה:
    • אצווה 1: [[0], [1], [2], [3]]
    • עם הפצה מעל 5 העתקים:
    • אצווה 1:
      • העתק 1: [0]
      • העתק 2: [1]
      • העתק 3: [2]
      • העתק 4: [3]
      • העתק 5: []
  • tf.data.Dataset.range(8).batch(4)

    • ללא הפצה:
    • אצווה 1: [0, 1, 2, 3]
    • אצווה 2: [4, 5, 6, 7]
    • עם הפצה על פני 3 העתקים:
    • אצווה 1:
      • העתק 1: [0, 1]
      • העתק 2: [2, 3]
      • העתק 3: []
    • אצווה 2:
      • העתק 1: [4, 5]
      • העתק 2: [6, 7]
      • העתק 3: []

לחלוקה חוזרת של מערך הנתונים יש מורכבות שטח שגדלה באופן ליניארי עם מספר ההעתקים. משמעות הדבר היא שבמקרה השימוש בהכשרה מרובת עובדים, צינור הקלט יכול להיתקל בשגיאות OOM.

ריסוק

tf.distribute גם משבצת אוטומטית את מערך הנתונים של הקלט בהדרכה מרובת עובדים עם MultiWorkerMirroredStrategy ו- TPUStrategy . כל מערך נתונים נוצר במכשיר ה-CPU של העובד. פיצול אוטומטי של מערך נתונים על קבוצה של עובדים פירושה שלכל עובד מוקצית תת-קבוצה של מערך הנתונים כולו (אם מוגדרת ה- tf.data.experimental.AutoShardPolicy הנכון). זאת כדי להבטיח שבכל שלב, גודל אצווה גלובלי של רכיבי נתונים שאינם חופפים יעובד על ידי כל עובד. לריסוק אוטומטי יש כמה אפשרויות שונות שניתן לציין באמצעות tf.data.experimental.DistributeOptions . שימו לב שאין פיצול אוטומטי בהכשרה מרובה עובדים עם ParameterServerStrategy , ומידע נוסף על יצירת מערכי נתונים עם אסטרטגיה זו ניתן למצוא במדריך אסטרטגיית שרת פרמטרים .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

ישנן שלוש אפשרויות שונות שתוכל להגדיר עבור tf.data.experimental.AutoShardPolicy :

  • AUTO: זוהי אפשרות ברירת המחדל, כלומר יתבצע ניסיון לרסיס על ידי FILE. הניסיון לרסיס על ידי FILE נכשל אם לא זוהה מערך נתונים מבוסס קבצים. לאחר מכן tf.distribute יחזור לריסוק על ידי DATA. שים לב שאם מערך הנתונים של הקלט מבוסס על קבצים אך מספר הקבצים קטן ממספר העובדים, InvalidArgumentError . אם זה קורה, הגדר במפורש את המדיניות ל- AutoShardPolicy.DATA , או פצל את מקור הקלט שלך לקבצים קטנים יותר כך שמספר הקבצים גדול ממספר העובדים.
  • קובץ: זו האפשרות אם ברצונך לגזור את קבצי הקלט על כל העובדים. עליך להשתמש באפשרות זו אם מספר קבצי הקלט גדול בהרבה ממספר העובדים והנתונים בקבצים מחולקים באופן שווה. החיסרון של אפשרות זו הוא שיש עובדים סרק אם הנתונים בקבצים אינם מופצים באופן שווה. אם מספר הקבצים קטן ממספר העובדים, InvalidArgumentError . אם זה קורה, הגדר במפורש את המדיניות ל- AutoShardPolicy.DATA . לדוגמה, תן לנו להפיץ 2 קבצים על פני 2 עובדים עם עותק אחד כל אחד. קובץ 1 מכיל [0, 1, 2, 3, 4, 5] וקובץ 2 מכיל [6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2 וגודל האצווה הגלובלי להיות 4.

    • עובד 0:
    • אצווה 1 = העתק 1: [0, 1]
    • אצווה 2 = העתק 1: [2, 3]
    • אצווה 3 = העתק 1: [4]
    • אצווה 4 = העתק 1: [5]
    • עובד 1:
    • אצווה 1 = העתק 2: [6, 7]
    • אצווה 2 = העתק 2: [8, 9]
    • אצווה 3 = העתק 2: [10]
    • אצווה 4 = העתק 2: [11]
  • נתונים: זה יגזור אוטומטית את האלמנטים על פני כל העובדים. כל אחד מהעובדים יקרא את כל מערך הנתונים ויעבד רק את הרסיס שהוקצה לו. כל שאר הרסיסים יושלכו. זה משמש בדרך כלל אם מספר קבצי הקלט קטן ממספר העובדים ואתה רוצה פיצול טוב יותר של נתונים בין כל העובדים. החיסרון הוא שכל מערך הנתונים ייקרא על כל עובד. לדוגמה, תן לנו להפיץ קובץ אחד על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2.

    • עובד 0:
    • אצווה 1 = העתק 1: [0, 1]
    • אצווה 2 = העתק 1: [4, 5]
    • אצווה 3 = העתק 1: [8, 9]
    • עובד 1:
    • אצווה 1 = העתק 2: [2, 3]
    • אצווה 2 = העתק 2: [6, 7]
    • אצווה 3 = העתק 2: [10, 11]
  • כבוי: אם תבטל את הפיצול האוטומטי, כל עובד יעבד את כל הנתונים. לדוגמה, תן לנו להפיץ קובץ אחד על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2. אז כל עובד יראה את ההתפלגות הבאה:

    • עובד 0:
    • אצווה 1 = העתק 1: [0, 1]
    • אצווה 2 = העתק 1: [2, 3]
    • אצווה 3 = העתק 1: [4, 5]
    • אצווה 4 = העתק 1: [6, 7]
    • אצווה 5 = העתק 1: [8, 9]
    • אצווה 6 = העתק 1: [10, 11]

    • עובד 1:

    • אצווה 1 = העתק 2: [0, 1]

    • אצווה 2 = העתק 2: [2, 3]

    • אצווה 3 = העתק 2: [4, 5]

    • אצווה 4 = העתק 2: [6, 7]

    • אצווה 5 = העתק 2: [8, 9]

    • אצווה 6 = העתק 2: [10, 11]

אחזור מראש

כברירת מחדל, tf.distribute מוסיף טרנספורמציה של אחזור מראש בסוף מופע tf.data.Dataset שסופק על ידי המשתמש. הארגומנט לטרנספורמציה של אחזור מראש שהוא buffer_size שווה למספר העתקים המסונכרנים.

tf.distribute.Strategy.distribute_datasets_from_function

נוֹהָג

API זה לוקח פונקציית קלט ומחזיר מופע tf.distribute.DistributedDataset . לפונקציית הקלט שמשתמשים מעבירים יש ארגומנט tf.distribute.InputContext והיא אמורה להחזיר מופע tf.data.Dataset . עם API זה, tf.distribute אינו מבצע שינויים נוספים במופע tf.data.Dataset של המשתמש המוחזר מפונקציית הקלט. באחריות המשתמש לקבץ ולפצל את מערך הנתונים. tf.distribute קורא לפונקציית הקלט בהתקן המעבד של כל אחד מהעובדים. מלבד לאפשר למשתמשים לציין את היגיון האצווה והפיצול שלהם, ממשק API זה גם מדגים מדרגיות וביצועים טובים יותר בהשוואה ל- tf.distribute.Strategy.experimental_distribute_dataset כאשר הוא משמש להכשרה מרובה עובדים.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

נכסים

אצווה

המופע tf.data.Dataset שהוא ערך ההחזרה של פונקציית הקלט צריך להיות באצווה באמצעות גודל אצווה לכל עותק. גודל האצווה לכל עותק הוא גודל האצווה העולמי חלקי מספר העתקים שלוקחים חלק באימון הסנכרון. הסיבה לכך היא ש- tf.distribute קורא לפונקציית הקלט בהתקן המעבד של כל אחד מהעובדים. מערך הנתונים שנוצר על עובד נתון צריך להיות מוכן לשימוש על ידי כל העתקים באותו עובד.

ריסוק

האובייקט tf.distribute.InputContext שמועבר באופן מרומז כארגומנט לפונקציית הקלט של המשתמש נוצר על ידי tf.distribute מתחת למכסה המנוע. יש לו מידע על מספר העובדים, מזהה עובד נוכחי וכו'. פונקציית קלט זו יכולה להתמודד עם פיצול לפי מדיניות שנקבעה על ידי המשתמש באמצעות מאפיינים אלה שהם חלק מהאובייקט tf.distribute.InputContext .

אחזור מראש

tf.distribute אינו מוסיף טרנספורמציה של אחזור מראש בסוף מערך הנתונים tf.data.Dataset המוחזר על ידי פונקציית הקלט שסופקה על ידי המשתמש.

איטרטורים מבוזרים

בדומה tf.data.Dataset שאינם מבוזרים, תצטרך ליצור איטרטור tf.distribute.DistributedDataset כדי לחזור עליו ולגשת לרכיבים ב- tf.distribute.DistributedDataset . להלן הדרכים שבהן תוכל ליצור tf.distribute.DistributedIterator ולהשתמש בו כדי לאמן את המודל שלך:

שימושים

השתמש בבניית לולאה Pythonic

אתה יכול להשתמש בלולאה Pythonic ידידותית למשתמש כדי לחזור על ה- tf.distribute.DistributedDataset . האלמנטים המוחזרים מה- tf.distribute.DistributedIterator יכולים להיות tf.Tensor בודד או tf.distribute.DistributedValues שמכיל ערך לכל העתק. מיקום הלולאה בתוך tf.function .תעניק דחיפה לביצועים. עם זאת, break return אינם נתמכים כרגע עבור לולאה מעל tf.distribute.DistributedDataset המוצב בתוך tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

השתמש iter כדי ליצור איטרטור מפורש

כדי לחזור על האלמנטים במופע tf.distribute.DistributedDataset , אתה יכול ליצור tf.distribute.DistributedIterator באמצעות ה-API של iter שעליו. עם איטרטור מפורש, אתה יכול לבצע איטרציה עבור מספר קבוע של שלבים. על מנת לקבל את האלמנט הבא tf.distribute.DistributedIterator dist_iterator , אתה יכול לקרוא next(dist_iterator) , dist_iterator.get_next() או dist_iterator.get_next_as_optional() . שני הראשונים זהים בעצם:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

עם next() או tf.distribute.DistributedIterator.get_next() , אם ה- tf.distribute.DistributedIterator הגיע לקצה שלו, תיזרק שגיאת OutOfRange. הלקוח יכול לתפוס את השגיאה בצד הפיתון ולהמשיך בעבודות אחרות כגון בדיקה והערכה. עם זאת, זה לא יעבוד אם אתה משתמש בלולאת אימון מארח (כלומר, הרץ מספר שלבים לכל tf.function ), שנראה כך:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn מכיל שלבים מרובים על ידי גלישת גוף הצעד בתוך tf.range . במקרה זה, איטרציות שונות בלולאה ללא תלות יכולות להתחיל במקביל, כך שניתן להפעיל שגיאת OutOfRange באיטרציות מאוחרות יותר לפני שהחישוב של האיטרציות הקודמות מסתיים. ברגע שנזרק שגיאת OutOfRange, כל האופציות בפונקציה יסתיימו מיד. אם זה מקרה שהיית רוצה להימנע ממנו, חלופה שאינה זורקת שגיאת OutOfRange היא tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional מחזירה tf.experimental.Optional שמכילה את הרכיב הבא או ללא ערך אם ה- tf.distribute.DistributedIterator הגיע לסיומו.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

שימוש במאפיין element_spec

אם אתה מעביר את הרכיבים של מערך נתונים מבוזר ל- tf.function ורוצה להבטיח tf.TypeSpec , תוכל לציין את הארגומנט input_signature של ה- tf.function . הפלט של מערך נתונים מבוזר הוא tf.distribute.DistributedValues שיכול לייצג את הקלט למכשיר בודד או למספר מכשירים. כדי לקבל את tf.TypeSpec המתאים לערך מבוזר זה, אתה יכול להשתמש במאפיין element_spec של מערך הנתונים המבוזר או אובייקט האיטרטור המבוזר.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

אצוות חלקיות

נתקלים באצוות חלקיות כאשר מופעי tf.data.Dataset שמשתמשים יוצרים עשויים להכיל גדלי אצווה שאינם ניתנים לחלוקה שווה במספר העתקים או כאשר הקרדינליות של מופע מערך הנתונים אינה מתחלקת בגודל האצווה. משמעות הדבר היא שכאשר מערך הנתונים מופץ על פני מספר העתקים, הקריאה next בכמה איטרטורים תגרום ל-OutOfRangeError. כדי לטפל במקרה השימוש הזה, tf.distribute מחזיר אצווה דמה בגודל אצווה 0 על העתקים שאין להם יותר נתונים לעיבוד.

במקרה של עובד יחיד, אם הנתונים אינם מוחזרים בקריאה next באיטרטור, נוצרות אצווה דמה בגודל אצווה 0 ומשמשות יחד עם הנתונים האמיתיים במערך הנתונים. במקרה של אצווה חלקית, אצווה הנתונים הגלובלית האחרונה תכיל נתונים אמיתיים לצד אצווה דמה של נתונים. תנאי העצירה לעיבוד נתונים בודק כעת אם לאחד מהעותקים יש נתונים. אם אין נתונים על אף אחת מהעותקים העתקים, נגרמת שגיאת OutOfRange.

במקרה של ריבוי עובדים, הערך הבוליאני המייצג נוכחות של נתונים על כל אחד מהעובדים נצבר באמצעות תקשורת צולבת העתק זה משמש כדי לזהות אם כל העובדים סיימו לעבד את מערך הנתונים המבוזר. מכיוון שזה כרוך בתקשורת צולבת בין עובדים, קיים עונש מסוים על ביצועים.

אזהרות

  • בעת שימוש בממשקי API של tf.distribute.Strategy.experimental_distribute_dataset עם הגדרת מספר עובדים, המשתמשים מעבירים tf.data.Dataset שקורא מקבצים. אם tf.data.experimental.AutoShardPolicy מוגדר ל- AUTO או FILE , גודל האצווה בפועל לכל שלב עשוי להיות קטן יותר מגודל האצווה הגלובלי שהוגדר על ידי המשתמש. זה יכול לקרות כאשר הרכיבים הנותרים בקובץ קטנים מגודל האצווה העולמי. משתמשים יכולים למצות את מערך הנתונים מבלי להיות תלוי במספר השלבים להפעלה או להגדיר את tf.data.experimental.AutoShardPolicy ל- DATA כדי לעקוף אותו.

  • טרנספורמציות סטטיסטיות של מערך נתונים אינן נתמכות כעת עם tf.distribute , וכרגע מתעלמים מכל אופציה סטטיסטית שייתכן שיש למערך הנתונים. לדוגמה, אם למערך הנתונים שלך יש map_fn שמשתמש ב- tf.random.uniform כדי לסובב תמונה, אז יש לך גרף נתונים שתלוי במצב (כלומר הזרע האקראי) במחשב המקומי שבו מתבצע תהליך הפיתון.

  • tf.data.experimental.OptimizationOptions ניסיוני אפשרויות המושבתות כברירת מחדל יכולות בהקשרים מסוימים -- כגון בשימוש יחד עם tf.distribute -- לגרום לירידה בביצועים. עליך להפעיל אותם רק לאחר שתאמת שהם מועילים לביצועי עומס העבודה שלך בהגדרת הפצה.

  • אנא עיין במדריך זה כיצד לייעל את צינור הקלט שלך עם tf.data באופן כללי. כמה טיפים נוספים:

    • אם יש לך מספר עובדים ואתה משתמש ב- tf.data.Dataset.list_files כדי ליצור מערך נתונים מכל הקבצים התואמים דפוס גלוב אחד או יותר, זכור להגדיר את ארגומנט ה- seed או להגדיר shuffle=False כך שכל עובד ירסק את הקובץ באופן עקבי.

    • אם צינור הקלט שלך כולל ערבוב של הנתונים ברמת הרשומה וגם ניתוח הנתונים, אלא אם כן הנתונים שלא מנותחו גדולים משמעותית מהנתונים המנותחים (מה שבדרך כלל לא המקרה), תחילה ערבב ואז ניתוח, כפי שמוצג בדוגמה הבאה. זה עשוי להועיל לשימוש בזיכרון ולביצועים.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) שומר על מאגר פנימי של רכיבי buffer_size , ובכך הפחתת buffer_size יכולה להקל על בעיית OOM.

  • הסדר שבו הנתונים מעובדים על ידי העובדים בעת שימוש ב- tf.distribute.experimental_distribute_dataset או tf.distribute.distribute_datasets_from_function אינו מובטח. זה נדרש בדרך כלל אם אתה משתמש ב- tf.distribute לחיזוי קנה מידה. עם זאת, ניתן להכניס אינדקס עבור כל רכיב באצווה ולהזמין פלטים בהתאם. הקטע הבא הוא דוגמה לאופן הזמנת פלטים.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

כיצד אוכל להפיץ את הנתונים שלי אם אני לא משתמש במופע קנוני של tf.data.Dataset?

לפעמים משתמשים לא יכולים להשתמש ב- tf.data.Dataset כדי לייצג את הקלט שלהם, ולאחר מכן בממשקי ה-API שהוזכרו לעיל כדי להפיץ את מערך הנתונים למספר מכשירים. במקרים כאלה ניתן להשתמש בטנזורים גולמיים או בתשומות מגנרטור.

השתמש ב-experimental_distribute_values_from_function עבור קלט טנסור שרירותי

strategy.run מקבל את tf.distribute.DistributedValues שהוא הפלט של next(iterator) . כדי להעביר את ערכי הטנסור, השתמש ב- experimental_distribute_values_from_function כדי לבנות tf.distribute.DistributedValues גולמיים.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

השתמש ב-tf.data.Dataset.from_generator אם הקלט שלך הוא ממחולל

אם יש לך פונקציית מחולל שברצונך להשתמש בה, תוכל ליצור מופע tf.data.Dataset באמצעות ה-API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.