تدريب متعدد العمال مع Keras

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يوضح هذا البرنامج التعليمي كيفية إجراء تدريب موزع متعدد العمال باستخدام نموذج Model.fit وواجهة برمجة تطبيقات Model.fit باستخدام tf.distribute.Strategy API - تحديدًا فئة tf.distribute.MultiWorkerMirroredStrategy . بمساعدة هذه الإستراتيجية ، يمكن لنموذج Keras الذي تم تصميمه للتشغيل على عامل واحد أن يعمل بسلاسة على العديد من العمال مع الحد الأدنى من التغييرات في التعليمات البرمجية.

للراغبين في الحصول على فهم أعمق لـ tf.distribute.Strategy APIs ، يتوفر التدريب الموزع في دليل TensorFlow للحصول على نظرة عامة على استراتيجيات التوزيع التي يدعمها TensorFlow.

لمعرفة كيفية استخدام MultiWorkerMirroredStrategy مع Keras وحلقة تدريب مخصصة ، راجع حلقة التدريب المخصصة مع Keras و MultiWorkerMirroredStrategy .

لاحظ أن الغرض من هذا البرنامج التعليمي هو إظهار مثال بسيط متعدد العمال مع عاملين.

يثبت

ابدأ ببعض الواردات الضرورية:

import json
import os
import sys

قبل استيراد TensorFlow ، قم بإجراء بعض التغييرات على البيئة:

  1. قم بتعطيل كافة وحدات معالجة الرسومات. هذا يمنع الأخطاء التي يسببها جميع العمال الذين يحاولون استخدام نفس GPU. في التطبيق الواقعي ، سيكون كل عامل على جهاز مختلف.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. أعد تعيين متغير البيئة TF_CONFIG (ستتعرف على المزيد حول هذا لاحقًا):
os.environ.pop('TF_CONFIG', None)
  1. تأكد من أن الدليل الحالي على مسار Python - وهذا يسمح للمفكرة باستيراد الملفات المكتوبة بواسطة %%writefile لاحقًا:
if '.' not in sys.path:
  sys.path.insert(0, '.')

الآن قم باستيراد TensorFlow:

import tensorflow as tf

تعريف نموذج ومجموعة البيانات

بعد ذلك ، قم بإنشاء ملف mnist_setup.py بنموذج بسيط وإعداد مجموعة بيانات. سيتم استخدام ملف Python هذا بواسطة العمليات العاملة في هذا البرنامج التعليمي:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

تدريب نموذجي على عامل واحد

جرب تدريب النموذج لعدد صغير من الحقب وراقب نتائج عامل واحد للتأكد من أن كل شيء يعمل بشكل صحيح. مع تقدم التدريب ، يجب أن تنخفض الخسارة ويجب أن تزيد الدقة.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

تكوين متعدد العمال

الآن دعنا ندخل عالم التدريب متعدد العمال.

كتلة بالوظائف والمهام

في TensorFlow ، يتضمن التدريب الموزع: 'cluster' بها وظائف متعددة ، وقد تحتوي كل وظيفة على 'task' واحدة أو أكثر.

ستحتاج إلى متغير بيئة التكوين TF_CONFIG للتدريب على أجهزة متعددة ، لكل منها دور مختلف. TF_CONFIG عبارة عن سلسلة JSON تستخدم لتحديد تكوين الكتلة لكل عامل يمثل جزءًا من الكتلة.

هناك مكونان لمتغير TF_CONFIG : 'cluster' و 'task' .

  • 'cluster' هي نفسها لجميع العمال وتوفر معلومات حول مجموعة التدريب ، وهي عبارة عن إملاء يتكون من أنواع مختلفة من الوظائف ، مثل 'worker' أو 'chief' .

    • في التدريب متعدد العاملين باستخدام tf.distribute.MultiWorkerMirroredStrategy ، يوجد عادةً 'worker' واحد يتحمل المسؤوليات ، مثل حفظ نقطة تفتيش وكتابة ملف ملخص لـ TensorBoard ، بالإضافة إلى ما يفعله 'worker' منتظم. يشار إلى هذا 'worker' على أنه كبير العمال (مع اسم وظيفي 'chief' ).
    • من المعتاد أن يكون لدى 'chief' 'index' 0 يتم تعيينه (في الواقع ، هذه هي الطريقة التي يتم بها تنفيذ tf.distribute.Strategy ).
  • توفر 'task' معلومات عن المهمة الحالية وتختلف لكل عامل. تحدد 'type' و 'index' لهذا العامل.

فيما يلي مثال على التكوين:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

إليك نفس TF_CONFIG المتسلسلة كسلسلة JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

لاحظ أن tf_config هو مجرد متغير محلي في Python. لتكون قادرًا على استخدامه لتكوين تدريب ، يجب إجراء تسلسل لهذا الأمر باعتباره JSON ووضعه في متغير بيئة TF_CONFIG .

في مثال التكوين أعلاه ، قمت بتعيين المهمة 'type' إلى 'worker' والمهمة 'index' على 0 . لذلك ، هذه الآلة هي العامل الأول . سيتم تعيينه كـ 'chief' العامل ويقوم بعمل أكثر من الآخرين.

لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيف يمكنك إعداد متغير TF_CONFIG مع عاملين على localhost .

من الناحية العملية ، يمكنك إنشاء عدة عمال على عناوين / منافذ IP خارجية وتعيين متغير TF_CONFIG على كل عامل وفقًا لذلك.

في هذا البرنامج التعليمي ، ستستخدم عاملين:

  • يظهر TF_CONFIG العامل الأول ( 'chief' ) أعلاه.
  • بالنسبة للعامل الثاني ، ستقوم بتعيين tf_config['task']['index']=1

متغيرات البيئة والعمليات الفرعية في أجهزة الكمبيوتر المحمولة

ترث العمليات الفرعية متغيرات البيئة من الوالدين.

على سبيل المثال ، يمكنك تعيين متغير بيئة في عملية Jupyter Notebook هذه على النحو التالي:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

بعد ذلك ، يمكنك الوصول إلى متغير البيئة من عمليات فرعية:

echo ${GREETINGS}
Hello TensorFlow!

في القسم التالي ، ستستخدم طريقة مماثلة لتمرير TF_CONFIG إلى العمليات الفرعية للعمال. في سيناريو العالم الحقيقي ، لن تطلق وظائفك بهذه الطريقة ، لكنها كافية في هذا المثال.

اختر الاستراتيجية الصحيحة

في TensorFlow ، هناك نوعان رئيسيان من التدريب الموزع:

  • التدريب المتزامن ، حيث تتم مزامنة خطوات التدريب عبر العاملين والنسخ المتماثلة ، و
  • التدريب غير المتزامن ، حيث لا تتم مزامنة خطوات التدريب بشكل صارم (على سبيل المثال ، تدريب خادم المعلمات ).

يوضح هذا البرنامج التعليمي كيفية إجراء تدريب متزامن متعدد العمال باستخدام مثيل tf.distribute.MultiWorkerMirroredStrategy .

تُنشئ MultiWorkerMirroredStrategy نسخًا من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. يستخدم CollectiveOps ، عملية TensorFlow للاتصال الجماعي ، لتجميع التدرجات اللونية والحفاظ على تزامن المتغيرات. يحتوي دليل tf.distribute.Strategy على مزيد من التفاصيل حول هذه الإستراتيجية.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

توفر MultiWorkerMirroredStrategy متعددة عبر المعلمة tf.distribute.experimental.CommunicationOptions : 1) تنفذ RING مجموعات قائمة على الحلقة باستخدام gRPC كطبقة اتصال عبر المضيف ؛ 2) يستخدم NCCL مكتبة الاتصالات الجماعية NVIDIA لتنفيذ المجموعات ؛ و 3) AUTO يؤجل الاختيار لوقت التشغيل. يعتمد أفضل خيار للتنفيذ الجماعي على عدد وحدات معالجة الرسومات ونوعها والتوصيل البيني للشبكة في المجموعة.

تدريب النموذج

من خلال تكامل tf.distribute.Strategy API في tf.keras ، فإن التغيير الوحيد الذي ستقوم به لتوزيع التدريب على العديد من العاملين هو إرفاق مبنى النموذج model.compile() داخل strategy.scope() . يحدد نطاق استراتيجية التوزيع كيف وأين يتم إنشاء المتغيرات ، وفي حالة MultiWorkerMirroredStrategy ، فإن المتغيرات التي تم إنشاؤها هي MirroredVariable s ، ويتم تكرارها على كل عامل.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

للتشغيل فعليًا باستخدام MultiWorkerMirroredStrategy ستحتاج إلى تشغيل عمليات العمال وتمرير TF_CONFIG إليهم.

مثل ملف mnist_setup.py المكتوب سابقًا ، إليك main.py التي سيديرها كل عامل:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

في مقتطف الشفرة أعلاه ، لاحظ أن global_batch_size ، الذي يتم تمريره إلى Dataset.batch ، مضبوط على per_worker_batch_size * num_workers . هذا يضمن أن كل عامل يعالج دفعات من أمثلة per_worker_batch_size بغض النظر عن عدد العمال.

يحتوي الدليل الحالي الآن على كلا ملفي Python:

ls *.py
main.py
mnist_setup.py

لذا ، قم بتسلسل TF_CONFIG إلى متغيرات البيئة:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن ، يمكنك تشغيل عملية عاملة تقوم بتشغيل main.py واستخدام TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

هناك بعض الأشياء التي يجب ملاحظتها حول الأمر أعلاه:

  1. يستخدم %%bash وهو عبارة عن دفتر ملاحظات "سحري" لتشغيل بعض أوامر bash.
  2. يستخدم العلامة --bg لتشغيل عملية bash في الخلفية ، لأن هذا العامل لن ينتهي. ينتظر جميع العمال قبل أن يبدأ.

لن تقوم عملية العامل ذات الخلفية بطباعة الإخراج إلى دفتر الملاحظات هذا ، لذا فإن &> تعيد توجيه إخراجها إلى ملف حتى تتمكن من فحص ما حدث في ملف السجل لاحقًا.

لذلك ، انتظر بضع ثوان حتى تبدأ العملية:

import time
time.sleep(10)

الآن ، افحص ما تم إخراجه إلى ملف سجل العامل حتى الآن:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

يجب أن يقول السطر الأخير من ملف السجل: تم Started server with target: grpc://localhost:12345 . أصبح العامل الأول جاهزًا الآن ، وينتظر جميع العمال الآخرين ليكونوا مستعدين للمضي قدمًا.

لذا قم بتحديث tf_config لعملية العامل الثاني لالتقاط:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

إطلاق العامل الثاني. سيبدأ هذا التدريب نظرًا لأن جميع العمال نشطين (لذلك ليست هناك حاجة لخلفية هذه العملية):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

إذا أعدت التحقق من السجلات التي كتبها العامل الأول ، فستعرف أنه شارك في تدريب هذا النموذج:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

مما لا يثير الدهشة ، أن هذا كان أبطأ من التشغيل التجريبي في بداية هذا البرنامج التعليمي.

يؤدي تشغيل عدة عمال على جهاز واحد إلى إضافة النفقات العامة فقط.

لم يكن الهدف هنا هو تحسين وقت التدريب ، ولكن فقط لإعطاء مثال على تدريب متعدد العمال.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

تدريب متعدد العمال في العمق

لقد تعلمت حتى الآن كيفية إجراء إعداد أساسي متعدد العمال.

خلال الفترة المتبقية من البرنامج التعليمي ، ستتعرف بالتفصيل على العوامل الأخرى التي قد تكون مفيدة أو مهمة لحالات الاستخدام الحقيقي.

تقسيم مجموعة البيانات

في التدريب متعدد العمال ، يلزم تقسيم مجموعة البيانات لضمان التقارب والأداء.

يعتمد المثال في القسم السابق على المشاركة التلقائية الافتراضية التي توفرها tf.distribute.Strategy API. يمكنك التحكم في التجزئة عن طريق تعيين tf.data.experimental.AutoShardPolicy من tf.data.experimental.DistributeOptions .

لمعرفة المزيد حول التجزئة التلقائية ، راجع دليل الإدخال الموزع .

فيما يلي مثال سريع على كيفية إيقاف تشغيل التجزئة التلقائية ، بحيث تعالج كل نسخة متماثلة كل مثال ( غير مستحسن ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

تقييم

إذا قمت بتمرير validation_data إلى Model.fit ، فسيتم التبديل بين التدريب والتقييم لكل فترة. يتم توزيع التقييم الذي يأخذ validation_data من الصحة عبر نفس مجموعة العمال ويتم تجميع نتائج التقييم وإتاحتها لجميع العمال.

على غرار التدريب ، تتم مشاركة مجموعة بيانات التحقق تلقائيًا على مستوى الملف. تحتاج إلى تعيين حجم دفعة عام في مجموعة بيانات validation_steps وتعيين Validation_steps.

يوصى أيضًا بمجموعة بيانات متكررة للتقييم.

بدلاً من ذلك ، يمكنك أيضًا إنشاء مهمة أخرى تقرأ نقاط التحقق بشكل دوري وتقوم بتشغيل التقييم. هذا ما يفعله المقدر. لكن هذه ليست طريقة موصى بها لإجراء التقييم ، وبالتالي يتم حذف تفاصيلها.

أداء

لديك الآن نموذج Keras الذي تم إعداده بالكامل للتشغيل في عدة عمال باستخدام MultiWorkerMirroredStrategy .

لتعديل أداء التدريب متعدد العاملين ، يمكنك تجربة ما يلي:

  • توفر tf.distribute.MultiWorkerMirroredStrategy العديد من تطبيقات الاتصال الجماعي :

    • تقوم RING بتنفيذ مجموعات قائمة على الحلقات باستخدام gRPC كطبقة اتصال عبر المضيف.
    • يستخدم NCCL مكتبة الاتصالات الجماعية NVIDIA لتنفيذ المجموعات.
    • AUTO يؤجل الاختيار لوقت التشغيل.

    يعتمد أفضل خيار للتنفيذ الجماعي على عدد وحدات معالجة الرسومات ونوع وحدات معالجة الرسومات والتوصيل البيني للشبكة في المجموعة. لتجاوز الاختيار التلقائي ، حدد المعامل Communication_options الخاص MultiWorkerMirroredStrategy communication_options فمثلا:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • صب المتغيرات على tf.float إن أمكن:

    • يتضمن نموذج ResNet الرسمي مثالاً على كيفية القيام بذلك.

التسامح مع الخطأ

في التدريب المتزامن ، ستفشل الكتلة إذا فشل أحد العمال ولم توجد آلية للتعافي من الفشل.

يأتي استخدام Keras مع tf.distribute.Strategy مع ميزة التسامح مع الخطأ في الحالات التي يموت فيها العمال أو يكونون غير مستقرين. يمكنك القيام بذلك عن طريق الحفاظ على حالة التدريب في نظام الملفات الموزعة الذي تختاره ، بحيث يتم استرداد حالة التدريب عند إعادة تشغيل المثيل الذي فشل أو تم إيقافه مسبقًا.

عندما يصبح العامل غير متاح ، سيفشل العمال الآخرون (ربما بعد انقضاء المهلة). في مثل هذه الحالات ، يحتاج العامل غير المتاح إلى إعادة التشغيل ، وكذلك العمال الآخرين الذين فشلوا.

رد اتصال ModelCheckpoint

لم يعد رد الاتصال ModelCheckpoint يوفر وظيفة التسامح مع الخطأ ، يرجى استخدام رد الاتصال BackupAndRestore بدلاً من ذلك.

لا يزال من الممكن استخدام رد الاتصال ModelCheckpoint لحفظ نقاط التفتيش. ولكن مع هذا ، إذا توقف التدريب أو انتهى بنجاح ، من أجل مواصلة التدريب من نقطة التفتيش ، يكون المستخدم مسؤولاً عن تحميل النموذج يدويًا.

اختياريا ، يمكن للمستخدم اختيار حفظ واستعادة النموذج / الأوزان خارج رد الاتصال ModelCheckpoint .

نموذج الادخار والتحميل

لحفظ النموذج الخاص بك باستخدام model.save أو tf.saved_model.save ، يجب أن تكون وجهة الحفظ مختلفة لكل عامل.

  • بالنسبة للعمال غير الرئيسيين ، ستحتاج إلى حفظ النموذج في دليل مؤقت.
  • بالنسبة إلى الرئيس ، ستحتاج إلى الحفظ في دليل النموذج المقدم.

يجب أن تكون الدلائل المؤقتة الخاصة بالعامل فريدة من نوعها لمنع الأخطاء الناتجة عن العديد من العمال الذين يحاولون الكتابة إلى نفس الموقع.

النموذج المحفوظ في جميع الدلائل متطابق ، وعادة ما يجب فقط الرجوع إلى النموذج المحفوظ من قبل الرئيس للاستعادة أو التقديم.

يجب أن يكون لديك بعض منطق التنظيف الذي يحذف الدلائل المؤقتة التي أنشأها العمال بمجرد اكتمال تدريبك.

سبب الادخار على الرئيس والعاملين في نفس الوقت هو أنك قد تقوم بتجميع المتغيرات أثناء نقاط التفتيش الأمر الذي يتطلب مشاركة كل من الرئيس والعاملين في بروتوكول الاتصال allreduce. من ناحية أخرى ، فإن السماح للرئيس والعاملين بالحفظ في دليل النموذج نفسه سيؤدي إلى حدوث أخطاء بسبب الخلاف.

باستخدام MultiWorkerMirroredStrategy ، يتم تشغيل البرنامج على كل عامل ، ومن أجل معرفة ما إذا كان العامل الحالي رئيسًا ، فإنه يستفيد من كائن محلل الكتلة الذي يحتوي على سمات task_type و task_id :

  • يخبرك task_type ما هي الوظيفة الحالية (على سبيل المثال 'worker' ).
  • يخبرك task_id بمعرف العامل.
  • العامل مع task_id == 0 تم تعيينه كعامل رئيسي.

في مقتطف الشفرة أدناه ، توفر وظيفة write_filepath مسار الملف للكتابة ، والذي يعتمد على معرف المهمة الخاص task_id :

  • بالنسبة للعامل الرئيسي (مع task_id == 0 ) ، فإنه يكتب إلى مسار الملف الأصلي.
  • بالنسبة للعمال الآخرين ، يقوم بإنشاء دليل مؤقت - temp_dir - مع task_id في مسار الدليل للكتابة:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

بذلك ، أنت الآن جاهز للحفظ:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

كما هو موضح أعلاه ، يجب تحميل النموذج لاحقًا فقط من رئيس المسار المحفوظ فيه ، لذلك دعنا نزيل النماذج المؤقتة التي حفظها العمال غير الرئيسيين:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

الآن ، عندما يحين وقت التحميل ، دعنا نستخدم واجهة برمجة تطبيقات tf.keras.models.load_model الملائمة ، ونستمر في العمل الإضافي.

هنا ، افترض فقط استخدام عامل واحد لتحميل التدريب ومتابعته ، وفي هذه الحالة لا تستدعي tf.keras.models.load_model ضمن strategy.scope() (لاحظ أن strategy = tf.distribute.MultiWorkerMirroredStrategy() ، كما هو محدد سابقًا ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

حفظ واستعادة نقطة التفتيش

من ناحية أخرى ، تتيح لك نقاط التفتيش حفظ أوزان النموذج واستعادتها دون الحاجة إلى حفظ النموذج بالكامل.

هنا ، ستقوم بإنشاء tf.train.Checkpoint واحد الذي يتتبع النموذج ، والذي تتم إدارته بواسطة tf.train.CheckpointManager ، بحيث يتم الاحتفاظ بأحدث نقطة فحص فقط:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

بمجرد إعداد CheckpointManager ، فأنت الآن جاهز لحفظ وإزالة نقاط التفتيش التي حفظها العمال غير الرئيسيين:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

الآن ، عندما تحتاج إلى استعادة النموذج ، يمكنك العثور على أحدث نقطة تفتيش محفوظة باستخدام وظيفة tf.train.latest_checkpoint . بعد استعادة نقطة التفتيش ، يمكنك متابعة التدريب.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

رد الاتصال BackupAndRestore

يوفر رد الاتصال tf.keras.callbacks.BackupAndRestore وظيفة التسامح مع الخطأ عن طريق النسخ الاحتياطي للنموذج ورقم الحقبة الحالية في ملف نقطة تفتيش مؤقت تحت وسيطة backup_dir إلى BackupAndRestore . يتم ذلك في نهاية كل حقبة.

بمجرد مقاطعة الوظائف وإعادة تشغيلها ، يستعيد رد الاتصال آخر نقطة تفتيش ، ويستمر التدريب من بداية حقبة الانقطاع. سيتم التخلص من أي تدريب جزئي تم إجراؤه بالفعل في الحقبة غير المكتملة قبل الانقطاع ، بحيث لا يؤثر على حالة النموذج النهائية.

لاستخدامه ، قم بتوفير مثيل لـ tf.keras.callbacks.BackupAndRestore في استدعاء Model.fit .

باستخدام MultiWorkerMirroredStrategy ، في حالة مقاطعة عامل ، تتوقف المجموعة بأكملها مؤقتًا حتى تتم إعادة تشغيل العامل الذي تمت مقاطعته. سيتم إعادة تشغيل العمال الآخرين أيضًا ، وينضم العامل الذي تمت مقاطعته إلى المجموعة. بعد ذلك ، يقرأ كل عامل ملف نقاط التفتيش الذي تم حفظه مسبقًا ويلتقط حالته السابقة ، مما يسمح للمجموعة بالعودة إلى المزامنة. ثم يستمر التدريب.

يستخدم رد الاتصال BackupAndRestore CheckpointManager لحفظ واستعادة حالة التدريب ، والتي تنشئ ملفًا يسمى نقطة التفتيش التي تتعقب نقاط التفتيش الموجودة مع أحدثها. لهذا السبب ، لا ينبغي إعادة استخدام backup_dir لتخزين نقاط التفتيش الأخرى لتجنب تضارب الأسماء.

حاليًا ، يدعم رد الاتصال BackupAndRestore تدريب العامل الفردي بدون إستراتيجية - MirroredStrategy - والتدريب متعدد العاملين باستخدام MultiWorkerMirroredStrategy .

فيما يلي مثالان لكل من تدريب العمال المتعددين وتدريب العمال الفرديين:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

إذا قمت بفحص دليل backup_dir الذي حددته في BackupAndRestore ، فقد تلاحظ بعض ملفات نقاط التفتيش التي تم إنشاؤها مؤقتًا. هذه الملفات ضرورية لاستعادة المثيلات المفقودة سابقًا ، وستتم إزالتها بواسطة المكتبة في نهاية Model.fit عند الخروج بنجاح من تدريبك.

مصادر إضافية

  1. يوفر التدريب الموزع في دليل TensorFlow نظرة عامة على استراتيجيات التوزيع المتاحة.
  2. تُظهر حلقة التدريب المخصصة مع Keras و MultiWorkerMirroredStrategy كيفية استخدام MultiWorkerMirroredStrategy مع Keras وحلقة تدريب مخصصة.
  3. تحقق من النماذج الرسمية ، والتي يمكن تكوين العديد منها لتشغيل استراتيجيات توزيع متعددة.
  4. يوفر دليل الأداء الأفضل مع وظيفة tf معلومات حول الاستراتيجيات والأدوات الأخرى ، مثل TensorFlow Profiler الذي يمكنك استخدامه لتحسين أداء نماذج TensorFlow.