تدريب خادم المعلمات باستخدام ParameterServerStrategy

تنظيم صفحاتك في مجموعات يمكنك حفظ المحتوى وتصنيفه حسب إعداداتك المفضّلة.

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يعد تدريب خادم المعلمات طريقة موازية للبيانات الشائعة لتوسيع نطاق تدريب النموذج على أجهزة متعددة.

تتكون مجموعة تدريب خادم المعلمات من العمال وخوادم المعلمات . يتم إنشاء المتغيرات على خوادم المعلمات ويتم قراءتها وتحديثها من قبل العاملين في كل خطوة. بشكل افتراضي ، يقوم العمال بقراءة هذه المتغيرات وتحديثها بشكل مستقل دون المزامنة مع بعضهم البعض. هذا هو السبب في أن التدريب على نمط خادم المعامل يسمى أحيانًا التدريب غير المتزامن .

في TensorFlow 2 ، يتم تشغيل تدريب خادم المعلمات بواسطة فئة tf.distribute.experimental.ParameterServerStrategy ، التي توزع خطوات التدريب على نظام مجموعة يصل إلى آلاف العمال (مصحوبًا بخوادم المعلمات).

طرق التدريب المدعومة

هناك طريقتان رئيسيتان للتدريب:

كتلة بالوظائف والمهام

بغض النظر عن واجهة برمجة التطبيقات التي تختارها ( Model.fit أو حلقة تدريب مخصصة) ، يتضمن التدريب الموزع في TensorFlow 2: 'cluster' بها 'jobs' متعددة ، وقد يكون لكل وظيفة 'tasks' واحدة أو أكثر.

عند استخدام تدريب خادم المعلمات ، يوصى بالحصول على:

  • وظيفة منسق واحدة (لها اسم وظيفي chief )
  • وظائف متعددة للعمال (اسم الوظيفة worker ) ؛ و
  • وظائف خادم متعدد المعلمات (اسم الوظيفة ps )

بينما ينشئ المنسق الموارد ، ويرسل مهام التدريب ، ويكتب نقاط التفتيش ، ويتعامل مع حالات فشل المهام ، يقوم العاملون وخوادم المعلمات بتشغيل tf.distribute.Server الذي يستمع إلى الطلبات من المنسق.

تدريب خادم المعلمات مع Model.fit API

يتطلب تدريب خادم المعلمات باستخدام Model.fit API أن يستخدم المنسق كائن tf.distribute.experimental.ParameterServerStrategy و tf.keras.utils.experimental.DatasetCreator كمدخل. على غرار استخدام Model.fit بدون إستراتيجية ، أو مع إستراتيجيات أخرى ، يتضمن سير العمل إنشاء النموذج وتجميعه ، وإعداد عمليات الاسترجاعات ، متبوعة باستدعاء Model.fit .

تدريب خادم المعلمات مع حلقة تدريب مخصصة

مع حلقات التدريب المخصصة ، فإن فئة tf.distribute.experimental.coordinator.ClusterCoordinator هي المكون الرئيسي المستخدم للمنسق.

أهم واجهة برمجة تطبيقات يوفرها كائن ClusterCoordinator هي schedule :

  • تقوم واجهة برمجة التطبيقات (API) schedule بإدراج دالة tf في قائمة الانتظار وإرجاع قيمة tf.function شبيهة RemoteValue على الفور.
  • سيتم إرسال وظائف قائمة الانتظار إلى العاملين عن بعد في مؤشرات الترابط في الخلفية وسيتم ملء RemoteValue الخاصة بهم بشكل غير متزامن.
  • نظرًا لأن schedule لا يتطلب تعيين العامل ، يمكن تنفيذ tf.function التي تم تمريرها على أي عامل متاح.
  • إذا أصبح العامل الذي تم تنفيذه عليه غير متاح قبل اكتماله ، فستتم إعادة محاولة الوظيفة على عامل آخر متاح.
  • بسبب هذه الحقيقة وحقيقة أن تنفيذ الوظيفة ليس ذريًا ، يمكن تنفيذ الوظيفة أكثر من مرة.

بالإضافة إلى إرسال الوظائف البعيدة ، يساعد ClusterCoordinator أيضًا في إنشاء مجموعات بيانات عن جميع العمال وإعادة بناء مجموعات البيانات هذه عندما يتعافى العامل من الفشل.

إعداد البرنامج التعليمي

سيتفرع البرنامج التعليمي إلى Model.fit ومسارات حلقة تدريب مخصصة ، ويمكنك اختيار المسار الذي يناسب احتياجاتك. تنطبق الأقسام الأخرى بخلاف "التدريب مع X" على كلا المسارين.

pip install portpicker

إعداد الكتلة

كما هو مذكور أعلاه ، تتطلب مجموعة تدريب خادم المعلمات مهمة منسق تقوم بتشغيل برنامج التدريب الخاص بك ، واحد أو أكثر من العمال ومهام خادم المعلمات التي تقوم بتشغيل خوادم TensorFlow - tf.distribute.Server - وربما مهمة تقييم إضافية تقوم بإجراء تقييم جانبي للسيارة (انظر قسم تقييم السيارة الجانبية أدناه). متطلبات إعدادها هي:

  • تحتاج مهمة المنسق إلى معرفة عناوين ومنافذ جميع خوادم TensorFlow الأخرى باستثناء المقيم.
  • يحتاج العمال وخوادم المعلمات إلى معرفة المنفذ الذي يحتاجون إليه للاستماع إليه. من أجل البساطة ، يمكنك عادةً تمرير معلومات المجموعة الكاملة عند إنشاء خوادم TensorFlow في هذه المهام.
  • ليس من الضروري أن تعرف مهمة المقيم تكوين مجموعة التدريب. إذا حدث ذلك ، فلا يجب محاولة الاتصال بمجموعة التدريب.
  • يجب أن يكون للعمال وخوادم المعلمات أنواع مهام مثل "worker" و "ps" ، على التوالي. يجب أن يستخدم المنسق "chief" كنوع المهمة لأسباب قديمة.

في هذا البرنامج التعليمي ، ستقوم بإنشاء مجموعة قيد التشغيل بحيث يمكن تشغيل تدريب خادم المعلمات بالكامل في Colab. سوف تتعلم كيفية إنشاء مجموعات حقيقية في قسم لاحق.

الكتلة قيد التشغيل

ستبدأ بإنشاء عدة خوادم TensorFlow مسبقًا والاتصال بها لاحقًا. لاحظ أن هذا فقط لغرض العرض التوضيحي لهذا البرنامج التعليمي ، وفي التدريب الحقيقي ، سيتم تشغيل الخوادم على أجهزة "worker" و "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

كثيرًا ما يتم استخدام إعداد الكتلة قيد المعالجة في اختبار الوحدة ، كما هو الحال هنا .

هناك خيار آخر للاختبار المحلي وهو بدء العمليات على الجهاز المحلي — تحقق من تدريب العمال المتعددين باستخدام Keras للحصول على مثال على هذا النهج.

إنشاء إستراتيجية ParameterServer

قبل الغوص في رمز التدريب ، دعنا ننشئ مثيلًا لكائن ParameterServerStrategy . لاحظ أن هذا ضروري بغض النظر عما إذا كنت Model.fit أو حلقة تدريب مخصصة. سيتم شرح الوسيطة variable_partitioner في قسم التجزئة المتغير .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

من أجل استخدام وحدات معالجة الرسومات للتدريب ، قم بتخصيص وحدات معالجة الرسومات المرئية لكل عامل. ستستخدم ParameterServerStrategy جميع وحدات معالجة الرسومات المتاحة على كل عامل ، مع تقييد أن جميع العمال يجب أن يكون لديهم نفس عدد وحدات معالجة الرسومات المتاحة.

التجزئة المتغيرة

يشير مصطلح التجزئة المتغيرة إلى تقسيم المتغير إلى عدة متغيرات أصغر ، والتي تسمى الأجزاء . قد تكون التجزئة المتغيرة مفيدة لتوزيع حمل الشبكة عند الوصول إلى هذه الأجزاء. من المفيد أيضًا توزيع حساب وتخزين متغير عادي عبر خوادم متعددة المعلمات.

لتمكين التجزئة المتغيرة ، يمكنك المرور في variable_partitioner عند إنشاء كائن ParameterServerStrategy . سيتم استدعاء variable_partitioner في كل مرة يتم فيها إنشاء متغير ومن المتوقع أن يُرجع عدد الأجزاء على طول كل بُعد من أبعاد المتغير. يتم توفير بعض variable_partitioner الجاهزة مثل tf.distribute.experimental.partitioners.MinSizePartitioner . يوصى باستخدام مقسمات على أساس الحجم مثل tf.distribute.experimental.partitioners.MinSizePartitioner لتجنب تقسيم المتغيرات الصغيرة ، والتي يمكن أن يكون لها تأثير سلبي على سرعة تدريب النموذج.

عندما يتم تمرير قسم variable_partitioner إلى الداخل وإذا قمت بإنشاء متغير مباشرة ضمن strategy.scope() ، فسيصبح نوع حاوية بخاصية variables التي توفر الوصول إلى قائمة الأجزاء. في معظم الحالات ، سيتم تحويل هذه الحاوية تلقائيًا إلى Tensor من خلال تسلسل جميع القطع. نتيجة لذلك ، يمكن استخدامه كمتغير عادي. من ناحية أخرى ، توفر بعض طرق TensorFlow مثل tf.nn.embedding_lookup تنفيذًا فعالاً لنوع الحاوية هذا وفي هذه الطرق سيتم تجنب التسلسل التلقائي.

يرجى الاطلاع على مستندات API الخاصة بـ tf.distribute.experimental.ParameterServerStrategy لمزيد من التفاصيل.

التدريب مع Model.fit

يوفر Keras واجهة برمجة تطبيقات تدريب سهلة الاستخدام عبر Model.fit التي تتعامل مع حلقة التدريب تحت الغطاء ، مع مرونة خطوات train_step القابلة للتجاوز وعمليات رد الاتصال ، والتي توفر وظائف مثل حفظ نقاط التفتيش أو الحفظ الموجز للوحة TensorBoard. باستخدام Model.fit ، يمكن استخدام نفس رمز التدريب لاستراتيجيات أخرى بتبديل بسيط لكائن الإستراتيجية.

ادخال البيانات

Model.fit مع تدريب خادم المعلمات أن يتم توفير بيانات الإدخال في ملف قابل للاستدعاء يأخذ وسيطة واحدة من النوع tf.distribute.InputContext ، ويعيد tf.data.Dataset . بعد ذلك ، أنشئ كائنًا tf.keras.utils.experimental.DatasetCreator يأخذ مثل هذا tf.distribute.InputOptions callable عبر وسيطة input_options .

لاحظ أنه يوصى بتبديل البيانات وتكرارها باستخدام تدريب خادم المعلمات ، وتحديد steps_per_epoch في fit call حتى تعرف المكتبة حدود الفترة.

الرجاء مراجعة البرنامج التعليمي للإدخال الموزع لمزيد من المعلومات حول وسيطة InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

سيتم استدعاء الكود الموجود في dataset_fn على جهاز الإدخال ، والذي يكون عادةً وحدة المعالجة المركزية ، على كل جهاز من الأجهزة العاملة.

بناء النموذج وتجميعه

الآن ، ستقوم بإنشاء tf.keras.Model - نموذج تافه tf.keras.models.Sequential لأغراض العرض التوضيحي - متبوعًا باستدعاء Model.compile لدمج المكونات ، مثل مُحسِّن أو مقاييس أو معلمات مثل steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

عمليات الاسترجاعات والتدريب

قبل استدعاء model.fit للتدريب الفعلي ، دعنا نجهز عمليات الاسترجاعات المطلوبة للمهام الشائعة ، مثل:

  • ModelCheckpoint : لحفظ أوزان النموذج.
  • BackupAndRestore : للتأكد من أن تقدم التدريب يتم نسخه احتياطيًا تلقائيًا ، واستعادته إذا واجهت المجموعة عدم توفر (مثل الإحباط أو الاستباق) ؛ أو
  • TensorBoard : لحفظ تقارير التقدم في ملفات موجزة ، والتي يتم تصورها في أداة TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

الاستخدام المباشر مع ClusterCoordinator (اختياري)

حتى إذا اخترت مسار التدريب Model.fit ، يمكنك اختياريًا إنشاء مثيل لكائن tf.distribute.experimental.coordinator.ClusterCoordinator لجدولة الوظائف الأخرى التي ترغب في تنفيذها على العمال. راجع قسم التدريب مع حلقة تدريب مخصصة لمزيد من التفاصيل والأمثلة.

التدريب بحلقة تدريب مخصصة

يوفر استخدام حلقات تدريب مخصصة مع tf.distribute.Strategy مرونة كبيرة في تحديد حلقات التدريب. باستخدام ParameterServerStrategy المحددة أعلاه ( strategy ) ، ستستخدم tf.distribute.experimental.coordinator.ClusterCoordinator لإرسال تنفيذ خطوات التدريب إلى العمال عن بعد.

بعد ذلك ، ستقوم بإنشاء نموذج ، وتحديد مجموعة بيانات ووظيفة خطوة ، كما فعلت في حلقة التدريب مع tf.distribute.Strategy s. يمكنك العثور على مزيد من التفاصيل في البرنامج التعليمي المخصص باستخدام tf.distribute.Strategy .

لضمان الجلب المسبق الفعال لمجموعة البيانات ، استخدم واجهات برمجة تطبيقات إنشاء مجموعة البيانات الموزعة الموصى بها والمذكورة في خطوات تدريب الإرسال إلى قسم العاملين عن بُعد أدناه. تأكد أيضًا من استدعاء Strategy.run داخل worker_fn للاستفادة الكاملة من وحدات معالجة الرسومات المخصصة للعاملين. باقي الخطوات هي نفسها بالنسبة للتدريب مع أو بدون وحدات معالجة الرسومات.

لنقم بإنشاء هذه المكونات في الخطوات التالية:

قم بإعداد البيانات

أولاً ، اكتب دالة تُنشئ مجموعة بيانات تتضمن منطق المعالجة المسبقة الذي تنفذه طبقات Keras المسبقة .

ستنشئ هذه الطبقات خارج dataset_fn ولكن ستطبق التحويل داخل dataset_fn ، حيث dataset_fn مجموعة البيانات_fn إلى tf.function ، والتي لا تسمح بإنشاء متغيرات بداخلها.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

أنشئ أمثلة على لعبة في مجموعة بيانات:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

بعد ذلك ، أنشئ مجموعة بيانات التدريب ملفوفة في dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

بناء النموذج

بعد ذلك ، قم بإنشاء النموذج والكائنات الأخرى. تأكد من إنشاء جميع المتغيرات ضمن strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

دعنا نؤكد أن استخدام FixedShardsPartitioner قسّم جميع المتغيرات إلى جزأين وتم تعيين كل جزء لخوادم معلمات مختلفة:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

حدد خطوة التدريب

ثالثًا ، قم بإنشاء خطوة التدريب ملفوفة في tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

في وظيفة خطوة التدريب أعلاه ، يمكن أن يدعم استدعاء Strategy.run و Strategy.reduce في step_fn عدة وحدات معالجة رسومات لكل عامل. إذا تم تخصيص وحدات معالجة الرسومات للعمال ، فسوف تقوم Strategy.run بتوزيع مجموعات البيانات على عدة نسخ متماثلة.

إرسال خطوات التدريب إلى العمال عن بعد

بعد تحديد جميع العمليات الحسابية بواسطة ParameterServerStrategy ، ستستخدم فئة tf.distribute.experimental.coordinator.ClusterCoordinator لإنشاء الموارد وتوزيع خطوات التدريب على العاملين عن بُعد.

لنقم أولاً بإنشاء كائن ClusterCoordinator في كائن الإستراتيجية:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

ثم أنشئ مجموعة بيانات لكل عامل ومكرر. في per_worker_dataset_fn أدناه ، يوصى بلف مجموعة dataset_fn في strategy.distribute_datasets_from_function . يوصى بتوزيع مجموعات البيانات_وظيفة_الوظيفة للسماح بالجلب المسبق الفعال لوحدات معالجة الرسومات بسلاسة.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

الخطوة الأخيرة هي توزيع الحساب على العاملين عن بعد باستخدام ClusterCoordinator.schedule :

  • يقوم أسلوب schedule بإدراج دالة tf في قائمة الانتظار وإرجاع قيمة tf.function شبيهة RemoteValue على الفور. سيتم إرسال الوظائف الموجودة في قائمة الانتظار إلى العمال البعيدين في مؤشرات الترابط في الخلفية وسيتم ملء RemoteValue بشكل غير متزامن.
  • يمكن استخدام طريقة join ( ClusterCoordinator.join ) للانتظار حتى يتم تنفيذ جميع الوظائف المجدولة.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

إليك كيفية جلب نتيجة RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

بدلاً من ذلك ، يمكنك تشغيل جميع الخطوات والقيام بشيء ما أثناء انتظار الانتهاء:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

للحصول على التدريب الكامل وسير عمل الخدمة لهذا المثال المحدد ، يرجى مراجعة هذا الاختبار .

المزيد حول إنشاء مجموعة البيانات

يتم إنشاء مجموعة البيانات في الكود أعلاه باستخدام ClusterCoordinator.create_per_worker_dataset API). يقوم بإنشاء مجموعة بيانات واحدة لكل عامل وإرجاع كائن حاوية. يمكنك استدعاء التابع iter لإنشاء مكرر لكل عامل. يحتوي المكرر لكل عامل على مكرر واحد لكل عامل وسيتم استبدال الشريحة المقابلة للعامل في وسيطة الإدخال للدالة التي تم تمريرها إلى طريقة ClusterCoordinator.schedule قبل تنفيذ الوظيفة على عامل معين.

في الوقت الحالي ، تفترض طريقة ClusterCoordinator.schedule أن العمال متساوون ، وبالتالي تفترض أن مجموعات البيانات الخاصة بالعاملين المختلفين هي نفسها فيما عدا أنها قد يتم خلطها بشكل مختلف إذا كانت تحتوي على عملية Dataset.shuffle . لهذا السبب ، يوصى أيضًا بتكرار مجموعات البيانات إلى أجل غير مسمى وأن تقوم بجدولة عدد محدود من الخطوات بدلاً من الاعتماد على OutOfRangeError من مجموعة بيانات.

ملاحظة مهمة أخرى هي أن مجموعات بيانات tf.data لا تدعم التسلسل الضمني وإلغاء التسلسل عبر حدود المهام. لذلك من المهم إنشاء مجموعة البيانات الكاملة داخل الوظيفة التي تم تمريرها إلى ClusterCoordinator.create_per_worker_dataset .

تقييم

هناك أكثر من طريقة لتحديد وتشغيل حلقة التقييم في التدريب الموزع. لكل منها مزاياها وعيوبها كما هو موضح أدناه. يوصى باستخدام طريقة التقييم المضمنة إذا لم يكن لديك تفضيل.

التقييم المضمن

في هذه الطريقة ، يقوم المنسق بالتناوب بين التدريب والتقييم وبالتالي يطلق عليه التقييم الداخلي .

هناك العديد من الفوائد للتقييم المضمن. فمثلا:

  • يمكن أن يدعم نماذج التقييم الكبيرة ومجموعات بيانات التقييم التي لا يمكن لمهمة واحدة الاحتفاظ بها.
  • يمكن استخدام نتائج التقييم لاتخاذ قرارات لتدريب الحقبة التالية.

هناك طريقتان لتنفيذ التقييم المباشر: التقييم المباشر والتقييم الموزع.

  • التقييم المباشر : بالنسبة للنماذج الصغيرة ومجموعات بيانات التقييم ، يمكن للمنسق إجراء التقييم مباشرة على النموذج الموزع مع مجموعة بيانات التقييم على المنسق:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • التقييم الموزع : بالنسبة للنماذج الكبيرة أو مجموعات البيانات التي لا يمكن تشغيلها مباشرة على المنسق ، يمكن لمهمة المنسق توزيع مهام التقييم على العمال عبر أساليب ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

تقييم جانب السيارة

هناك طريقة أخرى تسمى تقييم السيارة الجانبية حيث تقوم بإنشاء مهمة مقيِّم مخصصة تقرأ نقاط التفتيش بشكل متكرر وتدير التقييم عند نقطة تفتيش أحدث. يسمح لبرنامجك التدريبي بالانتهاء مبكرًا إذا لم تكن بحاجة إلى تغيير حلقة التدريب بناءً على نتائج التقييم. ومع ذلك ، فإنه يتطلب مهمة مقيِّم إضافية ونقاط تفتيش دورية لتحريك التقييم. فيما يلي حلقة تقييم محتملة للسيارة الجانبية:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

مجموعات في العالم الحقيقي

في بيئة إنتاج حقيقية ، ستقوم بتشغيل جميع المهام في عمليات مختلفة على أجهزة مختلفة. إن أبسط طريقة لتكوين معلومات الكتلة في كل مهمة هي تعيين متغيرات البيئة "TF_CONFIG" واستخدام tf.distribute.cluster_resolver.TFConfigClusterResolver لتحليل "TF_CONFIG" .

للحصول على وصف عام حول متغيرات البيئة "TF_CONFIG" ، راجع دليل التدريب الموزع .

إذا بدأت مهام التدريب الخاصة بك باستخدام Kubernetes أو قوالب تكوين أخرى ، فمن المحتمل جدًا أن هذه القوالب قد عينت بالفعل “TF_CONFIG" لك.

اضبط متغير البيئة "TF_CONFIG"

لنفترض أن لديك 3 عمال وخادمين معاملين ، يمكن أن يكون "TF_CONFIG" للعامل 1:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

يمكن أن يكون "TF_CONFIG" للمقيم:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

جزء "cluster" في سلسلة "TF_CONFIG" أعلاه للمقيم اختياري.

إذا كنت تستخدم نفس النظام الثنائي لجميع المهام

إذا كنت تفضل تشغيل كل هذه المهام باستخدام ثنائي واحد ، فستحتاج إلى السماح لفرع البرنامج الخاص بك بأدوار مختلفة في البداية:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

يبدأ الكود التالي تشغيل خادم TensorFlow وينتظر:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

معالجة فشل المهمة

فشل العامل

tf.distribute.experimental.coordinator.ClusterCoordinator أو Model.fit مع الخطأ لفشل العامل. عند استرداد العامل ، سيتم استدعاء وظيفة مجموعة البيانات المقدمة مسبقًا (إما إلى ClusterCoordinator.create_per_worker_dataset لحلقة تدريب مخصصة ، أو tf.keras.utils.experimental.DatasetCreator لـ Model.fit ) على العمال لإعادة إنشاء مجموعات البيانات.

فشل خادم المعلمة أو المنسق

ومع ذلك ، عندما يرى المنسق خطأ خادم معلمة ، فإنه سيرفع UnavailableError أو AbortedError على الفور. يمكنك إعادة تشغيل المنسق في هذه الحالة. يمكن أن يصبح المنسق نفسه غير متاح أيضًا. لذلك ، يوصى باستخدام أدوات معينة حتى لا تفقد تقدم التدريب:

  • بالنسبة لـ Model.fit ، يجب عليك استخدام رد BackupAndRestore ، والذي يعالج حفظ التقدم والاستعادة تلقائيًا. انظر قسم عمليات الاسترجاعات والتدريب أعلاه للحصول على مثال.

  • للحصول على حلقة تدريب مخصصة ، يجب عليك التحقق من متغيرات النموذج بشكل دوري وتحميل متغيرات النموذج من نقطة تحقق ، إن وجدت ، قبل بدء التدريب. يمكن الاستدلال على تقدم التدريب تقريبًا من optimizer.iterations إذا تم تحديد نقطة تحقق للمحسن:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

إحضار RemoteValue

إحضار RemoteValue مضمون إذا تم تنفيذ الوظيفة بنجاح. هذا لأنه يتم حاليًا نسخ قيمة الإرجاع على الفور إلى المنسق بعد تنفيذ الوظيفة. إذا كان هناك أي فشل عامل أثناء النسخ ، فستتم إعادة محاولة الوظيفة على عامل آخر متاح. لذلك ، إذا كنت ترغب في تحسين الأداء ، يمكنك جدولة الوظائف بدون قيمة مرتجعة.

الإبلاغ عن الأخطاء

بمجرد أن يرى المنسق خطأ مثل UnavailableError من خوادم المعلمات أو أخطاء التطبيق الأخرى مثل InvalidArgument من tf.debugging.check_numerics ، فإنه سيلغي جميع الوظائف المعلقة والقائمة في قائمة الانتظار قبل رفع الخطأ. سيؤدي إحضار RemoteValue المقابلة لها إلى ظهور خطأ CancelledError .

بعد ظهور خطأ ، لن يقوم المنسق بإثارة نفس الخطأ أو أي خطأ من الوظائف الملغاة.

تحسين الأداء

هناك عدة أسباب محتملة إذا رأيت مشكلات في الأداء عند التدريب باستخدام ParameterServerStrategy و ClusterResolver .

أحد الأسباب الشائعة هو أن خوادم المعلمات بها حمل غير متوازن وأن بعض خوادم المعلمات المحملة بكثافة قد وصلت إلى السعة. يمكن أن يكون هناك أيضًا العديد من الأسباب الجذرية. بعض الطرق البسيطة للتخفيف من هذه المشكلة هي:

  1. قم بتقسيم متغيرات النموذج الكبيرة الخاصة بك عن طريق تحديد variable_partitioner عند إنشاء ParameterServerStrategy .
  2. تجنب إنشاء متغير نقطة فعالة مطلوبًا من قبل جميع خوادم المعلمات في خطوة واحدة إن أمكن. على سبيل المثال ، استخدم معدل تعلم ثابتًا أو فئة فرعية tf.keras.optimizers.schedules.LearningRateSchedule في محسنات الأداء نظرًا لأن السلوك الافتراضي هو أن معدل التعلم سيصبح متغيرًا يتم وضعه على خادم معلمات معين ويطلبه جميع خوادم المعلمات الأخرى في كل خطوة .
  3. قم بتبديل مفرداتك الكبيرة قبل تمريرها إلى طبقات معالجة Keras المسبقة.

سبب آخر محتمل لقضايا الأداء هو المنسق. يعتمد تنفيذك الأول schedule / join على لغة Python ، وبالتالي قد يكون هناك عبء على مؤشر الترابط. كما يمكن أن يكون زمن الانتقال بين المنسق والعاملين كبيرًا. اذا كانت هذه القضيه،

  • بالنسبة لـ Model.fit ، يمكنك تعيين وسيطة steps_per_execution المقدمة في Model.compile إلى قيمة أكبر من 1.

  • للحصول على حلقة تدريب مخصصة ، يمكنك تجميع خطوات متعددة في وظيفة tf.function واحدة:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

مع تحسين المكتبة بشكل أكبر ، نأمل ألا يضطر معظم المستخدمين إلى حزم الخطوات يدويًا في المستقبل.

بالإضافة إلى ذلك ، هناك حيلة صغيرة لتحسين الأداء تتمثل في جدولة الوظائف بدون قيمة مرتجعة كما هو موضح في قسم فشل مهمة المعالجة أعلاه.

القيود المعروفة

تمت تغطية معظم القيود المعروفة بالفعل في الأقسام أعلاه. يقدم هذا القسم ملخصا.

ParameterServerStrategy العامة

  • os.environment["grpc_fail_fast"]="use_caller" مطلوب في كل مهمة بما في ذلك المنسق ، لجعل التسامح مع الخطأ يعمل بشكل صحيح.
  • تدريب خادم المعامل المتزامن غير مدعوم.
  • عادة ما يكون من الضروري تجميع خطوات متعددة في وظيفة واحدة لتحقيق الأداء الأمثل.
  • لا يتم دعم تحميل نموذج_محفوظ عبر tf.saved_model.load يحتوي على متغيرات مُقسمة. لاحظ أن تحميل مثل هذا النموذج المحفوظ باستخدام خدمة TensorFlow من المتوقع أن يعمل.
  • لا يتم دعم تحميل نقطة اختبار تحتوي على متغيرات فتحة المُحسِّن المُقسمة إلى عدد مختلف من الأجزاء.
  • لا يتم دعم الاسترداد من فشل خادم المعلمات بدون إعادة تشغيل مهمة المنسق.
  • ينتج عن استخدام tf.lookup.StaticHashTable (الذي يتم استخدامه بشكل شائع في بعض طبقات معالجة Keras ، مثل tf.keras.layers.IntegerLookup و tf.keras.layers.StringLookup و tf.keras.layers.TextVectorization ) موارد موضوعة على المنسق في هذا الوقت مع تدريب خادم المعلمات. هذا له آثار على الأداء للبحث عن RPCs من العاملين إلى المنسق. هذه هي الأولوية القصوى الحالية لمعالجتها.

تفاصيل Model.fit

  • مطلوب وسيطة steps_per_epoch في Model.fit . يمكنك تحديد قيمة توفر فترات زمنية مناسبة في حقبة ما.
  • لا يحتوي ParameterServerStrategy على دعم عمليات الاسترجاعات المخصصة التي تحتوي على مكالمات على مستوى الدُفعات لأسباب تتعلق بالأداء. يجب عليك تحويل هذه المكالمات إلى مكالمات على مستوى الفترة الزمنية باستخدام steps_per_epoch بشكل مناسب ، بحيث يتم استدعاؤها في كل عدد steps_per_epoch من الخطوات. لا تتأثر عمليات الاسترجاعات المضمنة: تم تعديل مكالماتها على مستوى المجموعة لتكون فعالة. يتم الآن التخطيط لدعم استدعاءات مستوى المجموعة لـ ParameterServerStrategy .
  • للسبب نفسه ، بخلاف الاستراتيجيات الأخرى ، يتم تسجيل شريط التقدم والمقاييس فقط في حدود الفترة الزمنية.
  • run_eagerly غير معتمد.

تفاصيل حلقة التدريب المخصصة