مساعدة في حماية الحاجز المرجاني العظيم مع TensorFlow على Kaggle تاريخ التحدي

تدريب مخصص مع tf.distribute.Strategy

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

يوضح هذا البرنامج التعليمي كيفية استخدام tf.distribute.Strategy مع حلقات التدريب المخصصة. سنقوم بتدريب نموذج بسيط لشبكة CNN على مجموعة بيانات الموضة MNIST. تحتوي مجموعة بيانات Fashion MNIST على 60000 صورة قطار بحجم 28 × 28 و 10000 صورة اختبار بحجم 28 × 28.

نحن نستخدم حلقات تدريب مخصصة لتدريب نموذجنا لأنها تمنحنا مرونة وتحكمًا أكبر في التدريب. علاوة على ذلك ، من الأسهل تصحيح أخطاء النموذج وحلقة التدريب.

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

قم بتنزيل مجموعة بيانات Fashion MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

إنشاء استراتيجية لتوزيع المتغيرات والرسم البياني

كيف tf.distribute.MirroredStrategy العمل الاستراتيجية؟

  • يتم تكرار جميع المتغيرات والرسم البياني للنموذج على النسخ المتماثلة.
  • يتم توزيع الإدخال بالتساوي عبر النسخ المتماثلة.
  • تحسب كل نسخة متماثلة الخسارة والتدرجات للإدخال الذي تلقته.
  • تتم مزامنة التدرجات عبر جميع النسخ المتماثلة عن طريق جمعها.
  • بعد المزامنة ، يتم إجراء نفس التحديث على نسخ المتغيرات في كل نسخة متماثلة.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

إعداد خط أنابيب الإدخال

قم بتصدير الرسم البياني والمتغيرات إلى تنسيق SavedModel المحايد للنظام الأساسي. بعد حفظ النموذج الخاص بك ، يمكنك تحميله بالنطاق أو بدونه.

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

قم بإنشاء مجموعات البيانات وتوزيعها:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

قم بإنشاء النموذج

إنشاء نموذج باستخدام tf.keras.Sequential . يمكنك أيضًا استخدام Model Subclassing API للقيام بذلك.

def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

تحديد وظيفة الخسارة

عادةً ، في جهاز واحد مع 1 GPU / CPU ، يتم تقسيم الخسارة على عدد الأمثلة في دفعة الإدخال.

لذلك، وكيف ينبغي أن تحسب الخسارة عند استخدام tf.distribute.Strategy ؟

  • على سبيل المثال ، لنفترض أن لديك 4 وحدات معالجة رسومات وحجم دُفعة 64. تم توزيع دفعة واحدة من الإدخال عبر النسخ المتماثلة (4 وحدات معالجة رسومات) ، تحصل كل نسخة متماثلة على إدخال بحجم 16.

  • يقوم النموذج الموجود في كل نسخة متماثلة بتمرير أمامي مع المدخلات الخاصة به ويحسب الخسارة. الآن ، بدلاً من قسمة الخسارة على عدد الأمثلة في المدخلات الخاصة بها (BATCH_SIZE_PER_REPLICA = 16) ، يجب تقسيم الخسارة على GLOBAL_BATCH_SIZE (64).

لماذا فعل هذا؟

  • هذا يجب القيام به لأنه بعد وتحسب التدرجات على كل طبق الأصل، تتم مزامنتها عبر النسخ المتماثلة عن طريق جمع لهم.

كيف يتم القيام بذلك في TensorFlow؟

  • إذا كنت تكتب حلقة تدريبية مخصصة، كما هو الحال في هذا البرنامج التعليمي، يجب أن ألخص نصيب سبيل المثال الخسائر وتقسيم المبلغ على GLOBAL_BATCH_SIZE: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) أو يمكنك استخدام tf.nn.compute_average_loss والتي تأخذ في المثال الخسارة، أوزان عينة اختيارية، وGLOBAL_BATCH_SIZE كوسائط ويعود فقدان تحجيمها.

  • إذا كنت تستخدم خسائر تسوية في نموذجك ، فأنت بحاجة إلى قياس قيمة الخسارة حسب عدد النسخ المتماثلة. يمكنك القيام بذلك عن طريق استخدام tf.nn.scale_regularization_loss وظيفة.

  • باستخدام tf.reduce_mean غير مستحسن. يؤدي القيام بذلك إلى تقسيم الخسارة على الحجم الفعلي لكل نسخة متماثلة والذي قد يختلف من خطوة إلى أخرى.

  • ويتم هذا التخفيض والقياس تلقائيا في keras model.compile و model.fit

  • إذا كنت تستخدم tf.keras.losses الطبقات (كما في المثال أدناه)، يحتاج الحد من الخسائر تحدد بوضوح ليكون واحدا من NONE أو SUM . AUTO و SUM_OVER_BATCH_SIZE غير مسموح عند استخدامها مع tf.distribute.Strategy . AUTO غير مسموح لأن المستخدم يجب أن نفكر بوضوح حول ما خفض يريدون للتأكد من أنها صحيحة في حالة توزيعها. SUM_OVER_BATCH_SIZE غير مسموح لأن حاليا من شأنه أن تقسيم فقط من قبل في نسخة حجم الدفعة، وترك الفاصل من قبل عدد من النسخ المتماثلة للمستخدم، والتي قد يكون من السهل أن تفوت. لذا بدلاً من ذلك ، نطلب من المستخدم إجراء التخفيض بنفسه صراحةً.

  • إذا labels هي متعددة الأبعاد، ثم المتوسط و per_example_loss عبر عدد من العناصر في كل عينة. على سبيل المثال، إذا كان شكل predictions هو (batch_size, H, W, n_classes) و labels هو (batch_size, H, W) ، وسوف تحتاج إلى تحديث per_example_loss مثل: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

تحديد المقاييس لتتبع الخسارة والدقة

تتعقب هذه المقاييس فقدان الاختبار والتدريب ودقة الاختبار. يمكنك استخدام .result() للحصول على إحصاءات المتراكمة في أي وقت.

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

حلقة التدريب

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5044084787368774, Accuracy: 81.87333679199219, Test Loss: 0.3816865086555481, Test Accuracy: 86.5999984741211
Epoch 2, Loss: 0.3375805616378784, Accuracy: 87.8566665649414, Test Loss: 0.3369813859462738, Test Accuracy: 87.76000213623047
Epoch 3, Loss: 0.2896445095539093, Accuracy: 89.50499725341797, Test Loss: 0.299490362405777, Test Accuracy: 89.22000122070312
Epoch 4, Loss: 0.259074866771698, Accuracy: 90.58833312988281, Test Loss: 0.2881558835506439, Test Accuracy: 89.33000183105469
Epoch 5, Loss: 0.2341146171092987, Accuracy: 91.38999938964844, Test Loss: 0.2916182577610016, Test Accuracy: 89.61000061035156
Epoch 6, Loss: 0.21513047814369202, Accuracy: 92.02333068847656, Test Loss: 0.2755740284919739, Test Accuracy: 89.85000610351562
Epoch 7, Loss: 0.1952667236328125, Accuracy: 92.88333129882812, Test Loss: 0.27464523911476135, Test Accuracy: 90.36000061035156
Epoch 8, Loss: 0.17831537127494812, Accuracy: 93.3566665649414, Test Loss: 0.26432710886001587, Test Accuracy: 90.19000244140625
Epoch 9, Loss: 0.16429665684700012, Accuracy: 93.85333251953125, Test Loss: 0.2659859359264374, Test Accuracy: 91.0999984741211
Epoch 10, Loss: 0.1503313183784485, Accuracy: 94.42166900634766, Test Loss: 0.2602477967739105, Test Accuracy: 91.06999969482422

أشياء يجب ملاحظتها في المثال أعلاه:

  • نحن بالتكرار عبر train_dist_dataset و test_dist_dataset يستخدم for x in ... بناء.
  • فقدان تحجيم هي القيمة عودة distributed_train_step . يتم تجميع هذه القيمة عبر النسخ المتماثلة باستخدام tf.distribute.Strategy.reduce الدعوة ومن ثم عبر دفعات عن طريق جمع قيمة الإرجاع من tf.distribute.Strategy.reduce المكالمات.
  • tf.keras.Metrics ينبغي تحديث داخل train_step و test_step أن يعدم قبل tf.distribute.Strategy.run . * tf.distribute.Strategy.run يعود النتائج من كل متماثلة محلية في الاستراتيجية، وهناك طرق متعددة لتستهلك هذه النتيجة. يمكنك القيام tf.distribute.Strategy.reduce للحصول على قيمة مجمعة. يمكنك أيضا القيام tf.distribute.Strategy.experimental_local_results للحصول على قائمة من القيم الواردة في النتيجة، واحد لكل متماثلة محلية.

استعادة أحدث نقطة تفتيش والاختبار

نموذج checkpointed مع tf.distribute.Strategy يمكن استعادة مع أو بدون استراتيجية.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0999984741211

طرق بديلة للتكرار على مجموعة بيانات

باستخدام التكرارات

إذا كنت ترغب في تكرار أكثر من عدد معين من الخطوات وليس من خلال ورقة العمل بالكامل يمكنك إنشاء مكرر باستخدام iter دعوة وصراحة دعوة next على مكرر. يمكنك اختيار التكرار على مجموعة البيانات داخل وخارج وظيفة tf. فيما يلي مقتطف صغير يوضح تكرار مجموعة البيانات خارج دالة tf باستخدام مكرر.

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14126229286193848, Accuracy: 95.0
Epoch 10, Loss: 0.1343936026096344, Accuracy: 95.0
Epoch 10, Loss: 0.12443388998508453, Accuracy: 94.84375
Epoch 10, Loss: 0.1607474684715271, Accuracy: 94.21875
Epoch 10, Loss: 0.10524413734674454, Accuracy: 96.71875
Epoch 10, Loss: 0.11492376029491425, Accuracy: 96.71875
Epoch 10, Loss: 0.16041627526283264, Accuracy: 94.21875
Epoch 10, Loss: 0.13022005558013916, Accuracy: 94.6875
Epoch 10, Loss: 0.17113295197486877, Accuracy: 93.28125
Epoch 10, Loss: 0.12315043061971664, Accuracy: 95.625

التكرار داخل دالة tf

يمكنك أيضا أعاد خلال إدخال كامل train_dist_dataset داخل tf.function باستخدام for x in ... بناء أو خلق المكررات كما فعلنا أعلاه. يوضح المثال التالي التفاف عصر واحد من التدريب في tf.function وبالتكرار عبر train_dist_dataset داخل الدالة.

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13766956329345703, Accuracy: 94.89666748046875
Epoch 2, Loss: 0.12510614097118378, Accuracy: 95.35166931152344
Epoch 3, Loss: 0.11464647948741913, Accuracy: 95.70333099365234
Epoch 4, Loss: 0.10295023769140244, Accuracy: 96.12000274658203
Epoch 5, Loss: 0.09352775663137436, Accuracy: 96.49666595458984
Epoch 6, Loss: 0.08494547754526138, Accuracy: 96.87166595458984
Epoch 7, Loss: 0.07917638123035431, Accuracy: 97.09166717529297
Epoch 8, Loss: 0.07128290832042694, Accuracy: 97.37833404541016
Epoch 9, Loss: 0.06662175804376602, Accuracy: 97.47999572753906
Epoch 10, Loss: 0.06016768515110016, Accuracy: 97.82833099365234

تتبع فقدان التدريب عبر النسخ المتماثلة

نحن لا نوصي باستخدام tf.metrics.Mean لتتبع فقدان التدريب عبر النسخ المتماثلة مختلفة، بسبب احتساب التحجيم الخسارة التي نفذت.

على سبيل المثال ، إذا كنت تدير وظيفة تدريبية بالخصائص التالية:

  • نسختان متماثلتان
  • تتم معالجة عينتين على كل نسخة طبق الأصل
  • قيم الخسارة الناتجة: [2 ، 3] و [4 ، 5] على كل نسخة متماثلة
  • حجم الدفعة العالمية = 4

باستخدام مقياس الخسارة ، يمكنك حساب قيمة الخسارة لكل عينة في كل نسخة متماثلة عن طريق إضافة قيم الخسارة ، ثم القسمة على حجم الدُفعة العام. في هذه الحالة: (2 + 3) / 4 = 1.25 و (4 + 5) / 4 = 2.25 .

إذا كنت تستخدم tf.metrics.Mean لتتبع فقدان عبر النسخ المتماثلة، وهما النتيجة مختلفة. في هذا المثال، ينتهي بك الأمر مع total من 3.50 و count من 2، الذي النتائج في total / count = 1.75 عند result() ودعا متري. خسارة تحسب مع tf.keras.Metrics يتم تحجيم بواسطة عامل إضافي يساوي عدد النسخ المتماثلة متزامنة.

دليل وأمثلة

فيما يلي بعض الأمثلة لاستخدام إستراتيجية التوزيع مع حلقات تدريب مخصصة:

  1. دليل التدريب الموزع
  2. DenseNet سبيل المثال باستخدام MirroredStrategy .
  3. بيرت سبيل المثال تدريب باستخدام MirroredStrategy و TPUStrategy . هذا المثال مفيد بشكل خاص لفهم كيفية التحميل من نقطة تفتيش وإنشاء نقاط تفتيش دورية أثناء التدريب الموزع وما إلى ذلك.
  4. NCF سبيل المثال تدريب باستخدام MirroredStrategy التي يمكن تمكين باستخدام keras_use_ctl العلم.
  5. NMT سبيل المثال تدريب باستخدام MirroredStrategy .

مزيد من الأمثلة الواردة في دليل استراتيجية التوزيع .

الخطوات التالية

  • تجربة جديدة tf.distribute.Strategy API على النماذج الخاصة بك.
  • زيارة قسم الأداء في دليل لمعرفة المزيد حول الاستراتيجيات وغيرها من الأدوات التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow الخاص بك.