יש שאלה? התחבר לקהילה בפורום הביקור של TensorFlow

אימון מותאם אישית עם tf.distribute.Strategy

צפה ב- TensorFlow.org הפעל בגוגל קולאב צפה במקור ב- GitHub הורד מחברת

מדריך זה מדגים כיצד להשתמש ב- tf.distribute.Strategy עם לולאות אימון מותאמות אישית. נכשיר מודל CNN פשוט במערך הנתונים האופנתי MNIST. מערך MNIST האופנתי מכיל 60000 תמונות רכבת בגודל 28 x 28 ו 10000 תמונות בדיקה בגודל 28 x 28.

אנו משתמשים בלולאות אימון מותאמות אישית כדי לאמן את המודל שלנו מכיוון שהם נותנים לנו גמישות ושליטה רבה יותר באימונים. יתר על כן, קל יותר לנקות באגים על המודל ולולאת האימון.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

הורד את מערך הנתונים MNIST האופנתי

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

צור אסטרטגיה להפצת המשתנים והגרף

כיצד tf.distribute.MirroredStrategy אסטרטגיית tf.distribute.MirroredStrategy ?

  • כל המשתנים וגרף המודל משוכפלים על העתקים.
  • הקלט מופץ באופן שווה על פני העתקים.
  • כל עותק משוכפל מחשב את ההפסד והשיפועים עבור הקלט שקיבל.
  • השיפועים מסונכרנים בין כל העתקים על ידי סיכומם.
  • לאחר הסנכרון, אותו עדכון נעשה בעותקי המשתנים בכל העתק.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

הגדרת צינור קלט

ייצא את הגרף והמשתנים לפורמט SavedModel לפלטפורמה-אגנוסטית. לאחר שמירת המודל שלך, תוכל לטעון אותו עם או בלי ההיקף.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

צור את מערכי הנתונים והפיץ אותם:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

צור את המודל

צור מודל באמצעות tf.keras.Sequential . אתה יכול גם להשתמש ב- API של Subclassing Model לצורך ביצוע פעולה זו.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

הגדר את פונקציית האובדן

בדרך כלל, במכונה אחת עם GPU / CPU אחד, ההפסד מחולק במספר הדוגמאות בקבוצת הקלט.

אז איך צריך לחשב את ההפסד בשימוש ב- tf.distribute.Strategy ?

  • לדוגמא, נניח שיש לך 4 גרפי GPU וגודל אצווה של 64. קבוצה אחת של קלט מופצת על פני העתקים (4 GPU), כאשר כל העתק מקבל קלט בגודל 16.

  • המודל בכל העתק מבצע מעבר קדימה עם הקלט המתאים לו ומחשב את ההפסד. כעת, במקום לחלק את ההפסד במספר הדוגמאות בקלט המתאים לו (BATCH_SIZE_PER_REPLICA = 16), יש לחלק את ההפסד ב- GLOBAL_BATCH_SIZE (64).

מדוע לעשות זאת?

  • זה צריך להיעשות מכיוון שלאחר חישוב הדרגתיות על כל העתק, הם מסונכרנים על פני העתקים על ידי סיכומם .

איך לעשות זאת ב- TensorFlow?

  • אם אתה כותב לולאת אימונים מותאמת אישית, כמו במדריך זה, עליך לסכם את ההפסדים לכל דוגמה ולחלק את הסכום ב- GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) או שאתה יכול להשתמש ב- tf.nn.compute_average_loss אשר לוקח את ההפסד לכל דוגמה, משקולות הדוגמה האופציונליות ו- GLOBAL_BATCH_SIZE כארגומנטים ומחזיר את ההפסד המוקטן.

  • אם אתה משתמש בהפסדי רגולציה במודל שלך, עליך לשנות את ערך ההפסד לפי מספר העתקים. אתה יכול לעשות זאת באמצעות הפונקציה tf.nn.scale_regularization_loss .

  • השימוש ב- tf.reduce_mean אינו מומלץ. פעולה זו מחלקת את ההפסד לפי גודל אצווה משוכפל בפועל שעשוי להשתנות שלב אחר שלב.

  • צמצום model.compile המידה הזה מתבצעים באופן אוטומטי ב- model.compile model.fit ו- model.fit

  • אם משתמשים ב-tf.keras.losses שיעורים (כמו בדוגמה שלהלן), יש לציין במפורש את צמצום ההפסד כדי להיות אחד מ- NONE או SUM . AUTO ו- SUM_OVER_BATCH_SIZE אינם מורשים בשימוש עם tf.distribute.Strategy . אסור להשתמש ב- AUTO מכיוון שהמשתמש צריך לחשוב במפורש איזו הפחתה הוא רוצה לוודא שהיא נכונה במקרה המבוזר. SUM_OVER_BATCH_SIZE אסור מכיוון שכרגע הוא יחלק רק לפי גודל אצווה משוכפל, וישאיר את החלוקה לפי מספר העתקים למשתמש, שעלול להיות קל לפספס. אז במקום זאת אנו מבקשים מהמשתמש לעשות את הצמצום בעצמו במפורש.

  • אם labels הן רב מימדיות, אז ממוצע per_example_loss פני מספר האלמנטים בכל מדגם. לדוגמא, אם צורת predictions היא (batch_size, H, W, n_classes) labels הן (batch_size, H, W) , יהיה עליך לעדכן per_example_loss כמו: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

הגדר את המדדים למעקב אחר אובדן ודיוק

מדדים אלה עוקבים אחר אובדן הבדיקה ואימון ודיוק המבחנים. תוכל להשתמש ב .result() כדי לקבל את הנתונים הסטטיסטיים שנצברו בכל עת.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

לולאת אימונים

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5044084787368774, Accuracy: 81.87333679199219, Test Loss: 0.3816865086555481, Test Accuracy: 86.5999984741211
Epoch 2, Loss: 0.3375805616378784, Accuracy: 87.8566665649414, Test Loss: 0.3369813859462738, Test Accuracy: 87.76000213623047
Epoch 3, Loss: 0.2896445095539093, Accuracy: 89.50499725341797, Test Loss: 0.299490362405777, Test Accuracy: 89.22000122070312
Epoch 4, Loss: 0.259074866771698, Accuracy: 90.58833312988281, Test Loss: 0.2881558835506439, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.2341146171092987, Accuracy: 91.38999938964844, Test Loss: 0.2916182577610016, Test Accuracy: 89.61000061035156
Epoch 6, Loss: 0.21513047814369202, Accuracy: 92.02333068847656, Test Loss: 0.2755740284919739, Test Accuracy: 89.85000610351562
Epoch 7, Loss: 0.1952667236328125, Accuracy: 92.88333129882812, Test Loss: 0.27464523911476135, Test Accuracy: 90.36000061035156
Epoch 8, Loss: 0.17831537127494812, Accuracy: 93.3566665649414, Test Loss: 0.26432710886001587, Test Accuracy: 90.19000244140625
Epoch 9, Loss: 0.16429665684700012, Accuracy: 93.85333251953125, Test Loss: 0.2659859359264374, Test Accuracy: 91.0999984741211
Epoch 10, Loss: 0.1503313183784485, Accuracy: 94.42166900634766, Test Loss: 0.2602477967739105, Test Accuracy: 91.06999969482422

דברים שיש לציין בדוגמה לעיל:

שחזר את נקודת הביקורת האחרונה ובדיקה

ניתן לשחזר מודל tf.distribute.Strategy עם tf.distribute.Strategy עם או בלי אסטרטגיה.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0999984741211

דרכים חלופיות לחזור על מערך נתונים

שימוש באיטרטורים

אם ברצונך לחזור על מספר צעדים נתון ולא דרך מערך הנתונים כולו, תוכל ליצור איטרטור באמצעות שיחת iter והשיחה המפורשת next באיטרטור. אתה יכול לבחור לחזור על מערך הנתונים בתוך tf.function ומחוצה לו. הנה קטע קטן המדגים איטרציה של מערך הנתונים מחוץ ל- tf.function באמצעות iterator.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14126229286193848, Accuracy: 95.0
Epoch 10, Loss: 0.1343936026096344, Accuracy: 95.0
Epoch 10, Loss: 0.12443388998508453, Accuracy: 94.84375
Epoch 10, Loss: 0.1607474684715271, Accuracy: 94.21875
Epoch 10, Loss: 0.10524413734674454, Accuracy: 96.71875
Epoch 10, Loss: 0.11492376029491425, Accuracy: 96.71875
Epoch 10, Loss: 0.16041627526283264, Accuracy: 94.21875
Epoch 10, Loss: 0.13022005558013916, Accuracy: 94.6875
Epoch 10, Loss: 0.17113295197486877, Accuracy: 93.28125
Epoch 10, Loss: 0.12315043061971664, Accuracy: 95.625

מתבשל בתוך פונקציית tf

אתה יכול גם לחזור על כל train_dist_dataset הקלט train_dist_dataset בתוך train_dist_dataset tf באמצעות הפונקציה for x in ... או על ידי יצירת איטרטורים כמו שעשינו לעיל. הדוגמה שלהלן מדגימה עוטפת תקופת אימונים אחת בתפקוד tf. train_dist_dataset על train_dist_dataset בתוך הפונקציה.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13766956329345703, Accuracy: 94.89666748046875
Epoch 2, Loss: 0.12510614097118378, Accuracy: 95.35166931152344
Epoch 3, Loss: 0.11464647948741913, Accuracy: 95.70333099365234
Epoch 4, Loss: 0.10295023769140244, Accuracy: 96.12000274658203
Epoch 5, Loss: 0.09352775663137436, Accuracy: 96.49666595458984
Epoch 6, Loss: 0.08494547754526138, Accuracy: 96.87166595458984
Epoch 7, Loss: 0.07917638123035431, Accuracy: 97.09166717529297
Epoch 8, Loss: 0.07128290832042694, Accuracy: 97.37833404541016
Epoch 9, Loss: 0.06662175804376602, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06016768515110016, Accuracy: 97.82833099365234

מעקב אחר אובדן אימונים בין עותקים משוכפלים

אנו לא ממליצים להשתמש ב- tf.metrics.Mean כדי לעקוב אחר אובדן האימונים על פני העתקים שונים, בגלל חישוב קנה המידה של האובדן שמתבצע.

לדוגמה, אם אתה מנהל עבודת הכשרה עם המאפיינים הבאים:

  • שתי העתקות
  • שתי דוגמאות מעובדות על כל העתק
  • ערכי אובדן כתוצאה: [2, 3] ו- [4, 5] בכל העתק
  • גודל אצווה גלובלי = 4

עם קנה המידה של הפסד, אתה מחשב את ערך ההפסד לכל דגימה על כל עותק משוכפל על ידי הוספת ערכי ההפסד, ואז מחלק לפי גודל האצווה הגלובלי. במקרה זה: (2 + 3) / 4 = 1.25 ו- (4 + 5) / 4 = 2.25 .

אם אתה משתמש ב- tf.metrics.Mean כדי לעקוב אחר אובדן בשני העתקים, התוצאה שונה. בדוגמה זו, בסופו של דבר אתה מגיע total של 3.50 count של 2, מה שמביא total / count = 1.75 כאשר result() נקראת במדד. אובדן המחושב באמצעות tf.keras.Metrics ידי גורם נוסף השווה למספר העתקים המסונכרנים.

מדריך ודוגמאות

להלן מספר דוגמאות לשימוש באסטרטגיית הפצה עם לולאות אימון מותאמות אישית:

  1. מדריך הדרכה מבוזר
  2. דוגמה של DenseNet באמצעות MirroredStrategy .
  3. דוגמה ל- BERT שהוכשרה באמצעות MirroredStrategy ו- TPUStrategy . דוגמה זו מועילה במיוחד להבנת האופן שבו ניתן לטעון ממחסום וליצור מחסומים תקופתיים במהלך אימונים מבוזרים וכו '.
  4. דוגמה ל- NCF שהוכשרה באמצעות MirroredStrategy שניתן להפעיל באמצעות הדגל keras_use_ctl .
  5. דוגמה ל- NMT שהוכשרה באמצעות MirroredStrategy .

דוגמאות נוספות המופיעות במדריך האסטרטגיה להפצה .

הצעדים הבאים