עזרה להגן על שונית המחסום הגדולה עם TensorFlow על Kaggle הצטרפו אתגר

אימון מותאם אישית עם tf.distribute.Strategy

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

הדרכה זו מדגימה כיצד להשתמש tf.distribute.Strategy עם לולאות אימונים מותאמים אישית. אנו נאמן מודל פשוט של CNN על מערך הנתונים האופנה של MNIST. מערך הנתונים האופנה של MNIST מכיל 60,000 תמונות רכבת בגודל 28 x 28 ו-10,000 תמונות בדיקה בגודל 28 x 28.

אנו משתמשים בלולאות אימון מותאמות אישית כדי לאמן את המודל שלנו מכיוון שהם נותנים לנו גמישות ושליטה רבה יותר באימונים. יתר על כן, קל יותר לנפות באגים במודל ובלולאת האימון.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

הורד את מערך הנתונים של fashion MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

צור אסטרטגיה להפצת המשתנים והגרף

איך tf.distribute.MirroredStrategy עבודת אסטרטגיה?

  • כל המשתנים וגרף המודל משוכפלים על ההעתקים.
  • הקלט מתחלק באופן שווה על פני ההעתקים.
  • כל העתק מחשב את ההפסד והשיפועים עבור הקלט שקיבל.
  • ההדרגות מסונכרנות על פני כל ההעתקים על ידי סיכומם.
  • לאחר הסנכרון, מתבצע אותו עדכון להעתקי המשתנים בכל עותק.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

הגדרת צינור קלט

ייצא את הגרף והמשתנים לפורמט SavedModel האגנסטי לפלטפורמה. לאחר שמירת הדגם שלך, תוכל לטעון אותו עם או בלי ההיקף.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

צור את מערכי הנתונים והפצתם:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

צור את הדגם

צור מודל באמצעות tf.keras.Sequential . אתה יכול גם להשתמש ב-API של Model Subclassing כדי לעשות זאת.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

הגדר את פונקציית ההפסד

בדרך כלל, במכונה בודדת עם 1 GPU/CPU, ההפסד מחולק במספר הדוגמאות באצוות הקלט.

אז, איך צריך ההפסד יחושב בעת שימוש tf.distribute.Strategy ?

  • לדוגמא, נניח שיש לך 4 GPU וגודל אצווה של 64. אצווה אחת של קלט מופצת על פני ההעתקים (4 GPUs), כל העתק מקבל קלט בגודל 16.

  • המודל בכל העתק מבצע מעבר קדימה עם הקלט המתאים שלו ומחשב את ההפסד. כעת, במקום לחלק את ההפסד במספר הדוגמאות בקלט שלו (BATCH_SIZE_PER_REPLICA = 16), יש לחלק את ההפסד ב-GLOBAL_BATCH_SIZE (64).

למה לעשות את זה?

  • זה צריך להיעשות כי לאחר הדרגתיים מחושבים על כל העתק, הם מסונכרנים ברחבי העתקים מסיכום אותם.

איך עושים זאת ב- TensorFlow?

  • אם אתה כותב לולאה אימונים מותאמת אישית, כמו במדריך זה, אתה צריך לסכם את ההפסדים לכל למשל ולחלק את הסכום על ידי GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) או שאתה יכול להשתמש tf.nn.compute_average_loss אשר לוקח את ההפסד המדולל למשל, משקולות מדגם אופציונלי, ו GLOBAL_BATCH_SIZE כטיעונים וחוזר האובדן המדורג.

  • אם אתה משתמש בהפסדי רגולציה במודל שלך, עליך לשנות את קנה המידה של ערך ההפסד לפי מספר העתקים. ניתן לעשות זאת על ידי שימוש tf.nn.scale_regularization_loss הפונקציה.

  • שימוש tf.reduce_mean אינו מומלץ. פעולה זו מחלקת את ההפסד לפי גודל אצווה בפועל לכל העתק, שעשוי להשתנות שלב לשלב.

  • הפחתת ומדרוג הדבר נעשה באופן אוטומטי keras model.compile ו model.fit

  • אם באמצעות tf.keras.losses כיתות (כמו בדוגמה להלן), הפחתת אובדן צריך להיות מוגדר במפורש להיות אחד NONE או SUM . AUTO ו SUM_OVER_BATCH_SIZE הם אסורים בעת שימוש עם tf.distribute.Strategy . AUTO מוגדר כאסור משום שהמשתמש צריך לחשוב במפורש על מה הפחתה שהם רוצים לוודא שהיא נכונה במקרה המופץ. SUM_OVER_BATCH_SIZE הוא פסול בגלל כרגע זה רק לחלק ידי לכל גודל אצווה העתק, ולהשאיר את המפריד על ידי מספר העתקים למשתמש, אשר עשוי להיות קל להחמיץ. אז במקום זאת אנו מבקשים מהמשתמש לבצע את ההפחתה בעצמו באופן מפורש.

  • אם labels הוא רב-ממדי, אז מחשבים את הממוצע של per_example_loss בין מספר אלמנטים בכל מדגם. לדוגמה, אם בצורת predictions הוא (batch_size, H, W, n_classes) ואת labels הוא (batch_size, H, W) , תצטרך לעדכן per_example_loss כמו: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

הגדר את המדדים למעקב אחר אובדן ודיוק

מדדים אלה עוקבים אחר אובדן הבדיקה וההדרכה ודיוק הבדיקה. אתה יכול להשתמש .result() כדי לקבל את הסטטיסטיקה המצטברת בכל עת.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

לולאת אימון

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5044084787368774, Accuracy: 81.87333679199219, Test Loss: 0.3816865086555481, Test Accuracy: 86.5999984741211
Epoch 2, Loss: 0.3375805616378784, Accuracy: 87.8566665649414, Test Loss: 0.3369813859462738, Test Accuracy: 87.76000213623047
Epoch 3, Loss: 0.2896445095539093, Accuracy: 89.50499725341797, Test Loss: 0.299490362405777, Test Accuracy: 89.22000122070312
Epoch 4, Loss: 0.259074866771698, Accuracy: 90.58833312988281, Test Loss: 0.2881558835506439, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.2341146171092987, Accuracy: 91.38999938964844, Test Loss: 0.2916182577610016, Test Accuracy: 89.61000061035156
Epoch 6, Loss: 0.21513047814369202, Accuracy: 92.02333068847656, Test Loss: 0.2755740284919739, Test Accuracy: 89.85000610351562
Epoch 7, Loss: 0.1952667236328125, Accuracy: 92.88333129882812, Test Loss: 0.27464523911476135, Test Accuracy: 90.36000061035156
Epoch 8, Loss: 0.17831537127494812, Accuracy: 93.3566665649414, Test Loss: 0.26432710886001587, Test Accuracy: 90.19000244140625
Epoch 9, Loss: 0.16429665684700012, Accuracy: 93.85333251953125, Test Loss: 0.2659859359264374, Test Accuracy: 91.0999984741211
Epoch 10, Loss: 0.1503313183784485, Accuracy: 94.42166900634766, Test Loss: 0.2602477967739105, Test Accuracy: 91.06999969482422

דברים שכדאי לשים לב אליהם בדוגמה למעלה:

  • אנחנו iterating רחבי train_dist_dataset ו test_dist_dataset באמצעות for x in ... מבנה.
  • ההפסד המדורג הוא הערך המוחזר של distributed_train_step . ערך זה נאסף על-פני העתקים באמצעות tf.distribute.Strategy.reduce השיחה ולאחר מכן ברחבי אצוות ידי סיכום ערך ההחזרה של tf.distribute.Strategy.reduce שיחות.
  • tf.keras.Metrics יש לעדכן בתוך train_step ו test_step שמקבל להורג על ידי tf.distribute.Strategy.run . * tf.distribute.Strategy.run מחזיר את תוצאות מכל העתק מקומי באסטרטגיה, ויש מספר דרכים לצרוך תוצאה זו. אתה יכול לעשות tf.distribute.Strategy.reduce לקבל ערך מצטבר. אתה גם יכול לעשות tf.distribute.Strategy.experimental_local_results כדי לקבל את רשימת הערכים הכלולים התוצאה, אחד לכל העתק המקומי.

שחזר את המחסום העדכני ביותר ובדוק

מודל checkpointed עם tf.distribute.Strategy ניתן לשחזר עם או בלי אסטרטגיה.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0999984741211

דרכים חלופיות של איטרציה על מערך נתונים

שימוש באיטרטורים

אם אתה רוצה לחזר על פני מספר נתון של צעדים ולא דרך במערך כולו אתה יכול ליצור באיטרטור באמצעות iter תקרא מפורש next על iterator. אתה יכול לבחור לחזור על מערך הנתונים הן בתוך ומחוץ ל-tf.function. הנה קטע קטן המדגים איטרציה של מערך הנתונים מחוץ ל-tf.function באמצעות איטרטור.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14126229286193848, Accuracy: 95.0
Epoch 10, Loss: 0.1343936026096344, Accuracy: 95.0
Epoch 10, Loss: 0.12443388998508453, Accuracy: 94.84375
Epoch 10, Loss: 0.1607474684715271, Accuracy: 94.21875
Epoch 10, Loss: 0.10524413734674454, Accuracy: 96.71875
Epoch 10, Loss: 0.11492376029491425, Accuracy: 96.71875
Epoch 10, Loss: 0.16041627526283264, Accuracy: 94.21875
Epoch 10, Loss: 0.13022005558013916, Accuracy: 94.6875
Epoch 10, Loss: 0.17113295197486877, Accuracy: 93.28125
Epoch 10, Loss: 0.12315043061971664, Accuracy: 95.625

איטרציה בתוך tf.function

ניתן גם לחזר על הקלט כולו train_dist_dataset בתוך tf.function באמצעות for x in ... מבנה או על ידי יצירת iterators כמו שעשינו לעיל. הדוגמה הבאה ממחישה גלישת עידן אחד של אימונים tf.function ולביקורות מעל train_dist_dataset בתוך פונקציה.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13766956329345703, Accuracy: 94.89666748046875
Epoch 2, Loss: 0.12510614097118378, Accuracy: 95.35166931152344
Epoch 3, Loss: 0.11464647948741913, Accuracy: 95.70333099365234
Epoch 4, Loss: 0.10295023769140244, Accuracy: 96.12000274658203
Epoch 5, Loss: 0.09352775663137436, Accuracy: 96.49666595458984
Epoch 6, Loss: 0.08494547754526138, Accuracy: 96.87166595458984
Epoch 7, Loss: 0.07917638123035431, Accuracy: 97.09166717529297
Epoch 8, Loss: 0.07128290832042694, Accuracy: 97.37833404541016
Epoch 9, Loss: 0.06662175804376602, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06016768515110016, Accuracy: 97.82833099365234

מעקב אחר אובדן אימונים על פני העתקים

אנחנו לא ממליצים להשתמש tf.metrics.Mean לעקוב אובדן אימונים ברחבי העתקים שונים, בגלל החישוב הדרוג האובדן כי מתבצע.

לדוגמה, אם אתה מנהל עבודת הכשרה עם המאפיינים הבאים:

  • שני העתקים
  • שתי דגימות מעובדות על כל העתק
  • ערכי ההפסד המתקבלים: [2, 3] ו-[4, 5] בכל העתק
  • גודל אצווה גלובלי = 4

עם קנה מידה אובדן, אתה מחשב את ערך ההפסד לכל מדגם על כל עותק על ידי הוספת ערכי ההפסד, ולאחר מכן חלוקה בגודל האצווה העולמי. במקרה זה: (2 + 3) / 4 = 1.25 ו- (4 + 5) / 4 = 2.25 .

אם אתה משתמש tf.metrics.Mean לעקוב אובדן פני שני ההעתקים, התוצאה היא שונה. בדוגמה זו, אתה בסופו של דבר עם total של 3.50 ו count של 2, שתוצאתו total / count = 1.75 כאשר result() נקראת על מטרי. הפסד מחושב עם tf.keras.Metrics שמשתנה בהתאם גורם נוסף הוא שווה למספר של העתקים מסונכרנים.

מדריך ודוגמאות

הנה כמה דוגמאות לשימוש באסטרטגיית הפצה עם לולאות אימון מותאמות אישית:

  1. מדריך הדרכה מבוזר
  2. DenseNet למשל באמצעות MirroredStrategy .
  3. ברט למשל מאומנים באמצעות MirroredStrategy ו TPUStrategy . דוגמה זו מועילה במיוחד להבנת כיצד לטעון ממחסום וליצור מחסומים תקופתיים במהלך אימונים מבוזרים וכו'.
  4. NCF למשל מאומנים באמצעות MirroredStrategy כי ניתן להפעיל באמצעות keras_use_ctl הדגל.
  5. NMT למשל מאומנים באמצעות MirroredStrategy .

דוגמאות נוספות המופיעות מדריך אסטרטגית הפצה .

הצעדים הבאים

  • נסה את חדשות tf.distribute.Strategy API על המודלים שלך.
  • בקר בסעיף הביצועי במדריך כדי ללמוד עוד על אסטרטגיות אחרות כלים שאתה יכול להשתמש בו כדי למטב את הביצועים של דגמי TensorFlow שלך.