الاستماع إلى المرأة الأولى في الندوة ML هذا الثلاثاء 19 أكتوبر في 09:00 PST سجل الآن

تدريب خادم المعلمات باستخدام ParameterServerStrategy

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

التدريب الخادم المعلمة هو الأسلوب بيانات موازية المشترك لرفع مستوى التدريب نموذج على أجهزة متعددة.

تتكون كتلة التدريب الخادم المعلمة العمال والخوادم المعلمة. يتم إنشاء المتغيرات على خوادم المعلمات ويتم قراءتها وتحديثها من قبل العاملين في كل خطوة. بشكل افتراضي ، يقوم العمال بقراءة هذه المتغيرات وتحديثها بشكل مستقل دون المزامنة مع بعضهم البعض. وهذا هو السبب في بعض الأحيان المعلمة التدريب على غرار الخادم يسمى التدريب غير متزامن.

في TensorFlow 2، هو مدعوم من التدريب الخادم المعلمة بواسطة tf.distribute.experimental.ParameterServerStrategy الدرجة التي توزع الخطوات تدريبية لمجموعة تحجيم تصل إلى الآلاف من العمال (يرافقه خوادم المعلمة).

طرق التدريب المدعومة

هناك طريقتان رئيسيتان مدعومتان للتدريب:

كتلة بالوظائف والمهام

بغض النظر عن API الاختيار ( Model.fit أو حلقة تدريبية مخصصة)، والتدريب وزعت في TensorFlow 2 ويشمل: أ 'cluster' مع العديد من 'jobs' ، ويمكن أن يكون كل من عمل واحد أو أكثر من 'tasks' .

عند استخدام تدريب خادم المعلمات ، يوصى بالحصول على:

  • واحد منسق وظيفة (والذي يحتوي على اسم وظيفة chief )
  • وظيفة عامل متعددة (وظيفة اسم worker )؛ و
  • وظيفة الخادم المعلمة متعددة (وظيفة اسم ps )

في حين أن منسق يخلق الموارد، برقيات تدريب المهام، ويكتب نقاط التفتيش، ويتعامل مع فشل مهمة والعمال والخوادم المعلمة تشغيل tf.distribute.Server أن الاستماع لطلبات من منسق.

التدريب الخادم المعلمة مع Model.fit API

المعلمة التدريب الملقم مع Model.fit API يتطلب منسق لاستخدام tf.distribute.experimental.ParameterServerStrategy الكائن، و tf.keras.utils.experimental.DatasetCreator كإدخال. على غرار Model.fit الاستخدام مع عدم وجود استراتيجية، أو مع غيرها من الاستراتيجيات، وسير العمل ينطوي على خلق وتجميع النموذج، بإعداد الاسترجاعات، تليها Model.fit المكالمة.

تدريب خادم المعلمات مع حلقة تدريب مخصصة

مع حلقات التدريب المخصصة، و tf.distribute.experimental.coordinator.ClusterCoordinator الفئة هي مكون رئيسي يستخدم لمنسق.

معظم API مهما قدمت من قبل ClusterCoordinator الهدف من ذلك هو schedule :

  • و schedule API enqueues على tf.function وإرجاع المستقبل مثل RemoteValue على الفور.
  • وسيتم ارسال وظائف في قائمة الانتظار إلى العاملين في المناطق النائية في المواضيع الأساسية وعلى RemoteValue الصورة سيتم شغلها بشكل غير متزامن.
  • منذ schedule لا يتطلب احالة عامل، و tf.function مرت في يمكن تنفيذها على أي عامل المتاحة.
  • إذا أصبح العامل الذي تم تنفيذه عليه غير متاح قبل اكتماله ، فستتم إعادة محاولة الوظيفة على عامل آخر متاح.
  • بسبب هذه الحقيقة وحقيقة أن تنفيذ الوظيفة ليس ذريًا ، يمكن تنفيذ الوظيفة أكثر من مرة.

بالإضافة إلى إيفاد وظائف عن بعد، و ClusterCoordinator يساعد أيضا على إنشاء قواعد البيانات على جميع العمال وإعادة بناء هذه المجموعات عندما يسترد العامل من الفشل.

إعداد البرنامج التعليمي

البرنامج التعليمي سوف تتفرع إلى Model.fit والعرف مسارات التدريب حلقة، ويمكنك اختيار واحد الذي يناسب احتياجاتك. تنطبق الأقسام بخلاف "التدريب مع X" على كلا المسارين.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

إعداد الكتلة

كما ذكر أعلاه، مجموعة التدريب الخادم المعلمة تتطلب مهمة منسق التي تدير برنامج التدريب الخاص بك، واحد أو عدد من العمال والمهام الخادم المعلمة التي تعمل TensorFlow servers- tf.distribute.Server -وربما مهمة تقييم إضافية تقييم يعمل جنبا سيارة (انظر قسم تقييم السيارة الجانبية أدناه). متطلبات إعدادها هي:

  • تحتاج مهمة المنسق إلى معرفة عناوين ومنافذ جميع خوادم TensorFlow الأخرى باستثناء المقيم.
  • يحتاج العمال وخوادم المعلمات إلى معرفة المنفذ الذي يحتاجون إليه للاستماع إليه. من أجل البساطة ، يمكنك عادةً تمرير معلومات المجموعة الكاملة عند إنشاء خوادم TensorFlow في هذه المهام.
  • ليس من الضروري أن تعرف مهمة المقيم تكوين مجموعة التدريب. إذا حدث ذلك ، فلا يجب محاولة الاتصال بمجموعة التدريب.
  • يجب أن يكون العمال والخوادم المعلمة أنواع المهمة كما "worker" و "ps" ، على التوالي. يجب استخدام منسق "chief" كنوع مهمة لأسباب الإرث.

في هذا البرنامج التعليمي ، ستقوم بإنشاء مجموعة قيد التشغيل بحيث يمكن تشغيل تدريب خادم المعلمات بالكامل في Colab. سوف تتعلم كيفية إعداد مجموعات حقيقية في جزء لاحق.

الكتلة قيد التشغيل

ستبدأ بإنشاء عدة خوادم TensorFlow مسبقًا والاتصال بها لاحقًا. علما أن هذا هو فقط لغرض التظاهر هذا البرنامج التعليمي، وفي تدريب حقيقي سيتم بدء تشغيل الخوادم على "worker" و "ps" الآلات.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

وكثيرا ما يستخدم الإعداد العنقودية العملية في وحدة الاختبار، مثل هنا .

وثمة خيار آخر للاختبار المحلي لإطلاق العمليات على جهاز تسجيل الوصول المحلي من التدريب متعدد عامل مع Keras للحصول على مثال لهذا النهج.

إنشاء إستراتيجية ParameterServer

قبل الغوص في التعليمات البرمجية التدريب، دعونا مثيل ParameterServerStrategy الكائن. علما بأن هناك حاجة لذلك بغض النظر عما إذا كنت تسير مع Model.fit أو حلقة التدريب المخصصة. و variable_partitioner سيتم شرح حجة في القسم عملية التجزئة متغير .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

من أجل استخدام وحدات معالجة الرسومات للتدريب ، قم بتخصيص وحدات معالجة الرسومات المرئية لكل عامل. ParameterServerStrategy سوف تستخدم جميع وحدات معالجة الرسومات المتاحة على كل عامل، مع تقييد أن جميع العاملين يجب أن يكون نفس العدد من وحدات معالجة الرسومات المتاحة.

التجزئة المتغيرة

تشير عملية التجزئة المتغيرة لتقسيم متغير الى عدة متغيرات أصغر، والتي تسمى شظايا. قد تكون التجزئة المتغيرة مفيدة لتوزيع حمل الشبكة عند الوصول إلى هذه الأجزاء. من المفيد أيضًا توزيع الحساب والتخزين لمتغير عادي عبر خوادم متعددة المعلمات.

لتمكين عملية التجزئة متغير، يمكنك تمرير في variable_partitioner عند بناء ParameterServerStrategy الكائن. و variable_partitioner سيتم استدعاؤه كل مرة عندما يتم إنشاء متغير، ومن المتوقع أن يعود عدد الأجزاء على طول كل البعد المتغير. بعض الخروج من مربع variable_partitioner يتم تقديم الصورة مثل tf.distribute.experimental.partitioners.MinSizePartitioner . فمن المستحسن استخدام partitioners على أساس حجم مثل tf.distribute.experimental.partitioners.MinSizePartitioner لتجنب تقسيم المتغيرات صغيرة، والتي يمكن أن يكون لها تأثير سلبي على سرعة تدريبية نموذجية.

عندما variable_partitioner يتم تمريرها في وإذا قمت بإنشاء متغير مباشرة تحت strategy.scope() ، وسوف تصبح نوع الحاوية مع variables الممتلكات التي توفر الوصول إلى قائمة شظايا. في معظم الحالات ، سيتم تحويل هذه الحاوية تلقائيًا إلى Tensor من خلال تسلسل جميع القطع. نتيجة لذلك ، يمكن استخدامه كمتغير عادي. من ناحية أخرى، فإن بعض أساليب TensorFlow مثل tf.nn.embedding_lookup تقدم التنفيذ الفعال لهذا النوع الحاويات وفي هذه الأساليب سيتم تجنب سلسلة التلقائي.

يرجى الاطلاع على مستندات API من tf.distribute.experimental.ParameterServerStrategy لمزيد من التفاصيل.

تدريب مع Model.fit

يوفر Keras على API التدريب سهلة الاستخدام عبر Model.fit أن مقابض حلقة تدريبية تحت غطاء محرك السيارة، مع مرونة للتجاوز train_step ، والاستدعاء، والتي توفر وظائف مثل توفير نقطة تفتيش أو ملخص لإنقاذ TensorBoard. مع Model.fit ، يمكن استخدام رمز تدريب نفسه لاستراتيجيات أخرى مع تبادل بسيط من وجوه الاستراتيجية.

ادخال البيانات

Model.fit مع التدريب الخادم المعلمة يتطلب أن يتم توفير البيانات المدخلة في للاستدعاء أن يأخذ حجة واحدة من نوع tf.distribute.InputContext ، وإرجاع tf.data.Dataset . ثم، إنشاء tf.keras.utils.experimental.DatasetCreator الكائن الذي يأخذ من هذا القبيل callable ، واختياري tf.distribute.InputOptions يعترض عبر input_options حجة.

علما بأن فمن المستحسن أن خلط ورق اللعب وتكرار البيانات مع التدريب الخادم المعلمة، وتحديد steps_per_epoch في fit الدعوة حتى يعرف مكتبة حدود الحقبة.

يرجى الاطلاع على مدخلات الموزعة البرنامج التعليمي لمزيد من المعلومات حول InputContext حجة.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

التعليمات البرمجية في dataset_fn سيتم استدعاؤه على جهاز الإدخال، التي عادة ما تكون وحدة المعالجة المركزية، على كل من آلات عامل.

بناء النموذج وتجميعه

الآن، سوف تقوم بإنشاء tf.keras.Model -a تافهة tf.keras.models.Sequential نموذج لأغراض تليها مظاهرة من قبل Model.compile دعوة لدمج المكونات، مثل محسن، والقياسات، أو معلمات مثل steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

عمليات الاسترجاعات والتدريب

قبل الاتصال model.fit لتدريب الفعلي، دعونا إعداد عمليات الاسترجاعات اللازمة لالمهام المشتركة، مثل:

  • ModelCheckpoint : لحفظ الأوزان نموذج.
  • BackupAndRestore : للتأكد من يتم دعم التقدم التدريب تلقائيا، وتعافى إذا كان عدم توفر الخبرات الكتلة (مثل إحباط أو الاستباق)؛ أو
  • TensorBoard : لإنقاذ تقارير مرحلية إلى ملفات ملخص، التي تحصل على تصور في أداة TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

استخدام المباشر مع ClusterCoordinator (اختياري)

حتى إذا اخترت Model.fit مسار التدريب، يمكنك مثيل اختياريا tf.distribute.experimental.coordinator.ClusterCoordinator كائن لجدولة المهام الأخرى التي ترغب ليتم تنفيذها على العمال. اطلع على التدريب مع التدريب حلقة مخصصة الباب لمزيد من التفاصيل والأمثلة على ذلك.

التدريب بحلقة تدريب مخصصة

عن طريق حلقات تدريبية مخصصة مع tf.distribute.Strategy يوفر قدرا كبيرا من المرونة في تحديد حلقات التدريب. مع ParameterServerStrategy المحددة أعلاه (كما strategy )، سوف تستخدم tf.distribute.experimental.coordinator.ClusterCoordinator إيفاد تنفيذ خطوات التدريب للعاملين عن بعد.

ثم، سوف تقوم بإنشاء نموذج، وتحديد مجموعة بيانات وظيفة الخطوة، كما فعلت في حلقة التدريب مع الآخر tf.distribute.Strategy الصورة. يمكنك العثور على مزيد من التفاصيل في تدريب مخصص مع tf.distribute.Strategy البرنامج التعليمي.

لضمان كفاءة الجلب المسبق مجموعة البيانات، استخدم أوصى بتوزيع بيانات واجهات برمجة التطبيقات إنشاء المذكورة في خطوات التدريب مرسل إلى العاملين في المناطق النائية أدناه. أيضا، تأكد من أن يدعو Strategy.run داخل worker_fn للاستفادة الكاملة من وحدات معالجة الرسومات المخصصة للعمال. باقي الخطوات هي نفسها بالنسبة للتدريب مع أو بدون وحدات معالجة الرسومات.

لنقم بإنشاء هذه المكونات في الخطوات التالية:

قم بإعداد البيانات

أولا، كتابة دالة التي تخلق مجموعة البيانات التي تشمل تجهيزها المنطق تنفذها Keras طبقات تجهيزها .

ستقوم بإنشاء هذه الطبقات خارج dataset_fn لكن تطبيق التحول داخل dataset_fn ، وبما انك سوف التفاف dataset_fn إلى tf.function ، والتي لا تسمح المتغيرات التي سيتم إنشاؤها في داخله.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

أنشئ أمثلة على لعبة في مجموعة بيانات:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

ثم، إنشاء مجموعة بيانات التدريب ملفوفة في dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

بناء النموذج

بعد ذلك ، قم بإنشاء النموذج والكائنات الأخرى. تأكد من خلق كل المتغيرات تحت strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

تأكيد دعونا أن استخدام FixedShardsPartitioner تقسيم كل المتغيرات إلى قسمين شظايا وتم تعيين كل جزء إلى خوادم المعلمة مختلفة:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

حدد خطوة التدريب

ثالثا، إنشاء الخطوة التدريب ملفوفة في tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

في وظيفة التدريب الخطوة أعلاه، داعيا Strategy.run و Strategy.reduce في step_fn يمكن أن تدعم وحدات معالجة الرسومات المتعددة للعامل الواحد. إذا كان العمال قد خصصت وحدات معالجة الرسومات، Strategy.run ستقوم بتوزيع مجموعات البيانات على عدة نسخ متماثلة.

إرسال خطوات التدريب إلى العمال عن بعد

بعد تعريف كافة العمليات الحسابية التي ParameterServerStrategy ، سوف تستخدم tf.distribute.experimental.coordinator.ClusterCoordinator فئة لخلق موارد وتوزيع خطوات التدريب للعاملين عن بعد.

دعونا أولا إنشاء ClusterCoordinator الكائن وتمر في كائن استراتيجية:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

ثم أنشئ مجموعة بيانات لكل عامل ومكرر. في per_worker_dataset_fn أدناه، التفاف dataset_fn إلى strategy.distribute_datasets_from_function ينصح للسماح الجلب المسبق فعالة لوحدات معالجة الرسومات بسهولة.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

والخطوة الأخيرة هي لتوزيع حساب للعاملين عن بعد باستخدام ClusterCoordinator.schedule :

  • و schedule طريقة enqueues على tf.function وإرجاع المستقبل مثل RemoteValue على الفور. وسيتم ارسال وظائف في قائمة الانتظار إلى العاملين في المناطق النائية في المواضيع الأساسية و RemoteValue سيتم شغلها بشكل غير متزامن.
  • على join طريقة ( ClusterCoordinator.join ) يمكن استخدامها لالانتظار حتى يتم تنفيذ جميع المهام المجدولة.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

هنا هو كيف يمكن أن تجلب نتيجة ل RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

بدلاً من ذلك ، يمكنك تشغيل جميع الخطوات والقيام بشيء ما أثناء انتظار الانتهاء:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

لاستكمال التدريب وخدمة سير العمل لهذا المثال بالذات، يرجى مراجعة هذا الاختبار .

المزيد حول إنشاء مجموعة البيانات

يتم إنشاء مجموعة البيانات في رمز أعلاه باستخدام ClusterCoordinator.create_per_worker_dataset API). يقوم بإنشاء مجموعة بيانات واحدة لكل عامل وإرجاع كائن حاوية. يمكنك استدعاء iter على طريقة لإنشاء مكرر لكل عامل. والعامل في مكرر يحتوي مكرر واحد لكل عامل وسيتم استبداله شريحة المقابلة للعامل في حجة مدخلات وظيفة تمريرها إلى ClusterCoordinator.schedule طريقة قبل أن يتم تنفيذ الدالة على عامل معين.

وفي الوقت الراهن، ClusterCoordinator.schedule يفترض طريقة العمال يعادل وبالتالي يفترض قواعد البيانات على عمال مختلف هي نفسها إلا أنها قد تكون تعديلا بشكل مختلف إذا كانت تحتوي على Dataset.shuffle العملية. وبسبب هذا، فمن المستحسن أيضا أن مجموعات البيانات إلى أن تتكرر إلى ما لا نهاية وتقوم بجدولة عدد محدود من الخطوات بدلا من الاعتماد على OutOfRangeError من مجموعة بيانات.

ملاحظة هامة أخرى هي أن tf.data مجموعات البيانات لا تدعم التسلسل الضمني وإلغاء التسلسل عبر الحدود المهمة. لذلك من المهم لإنشاء بيانات كاملة داخل وظيفة التي تم تمريرها إلى ClusterCoordinator.create_per_worker_dataset .

تقييم

هناك أكثر من طريقة لتحديد وتشغيل حلقة التقييم في التدريب الموزع. لكل منها مزاياها وعيوبها كما هو موضح أدناه. يوصى باستخدام طريقة التقييم المضمنة إذا لم يكن لديك تفضيل.

التقييم المضمن

في هذه الطريقة، والمناوبين منسق بين التدريب والتقييم وبالتالي يطلق عليه ذلك التقييم المضمنة.

هناك العديد من الفوائد للتقييم المضمن. على سبيل المثال:

  • يمكن أن يدعم نماذج التقييم الكبيرة ومجموعات بيانات التقييم التي لا يمكن لمهمة واحدة الاحتفاظ بها.
  • يمكن استخدام نتائج التقييم لاتخاذ قرارات لتدريب الحقبة التالية.

هناك طريقتان لتنفيذ التقييم المباشر: التقييم المباشر والتقييم الموزع.

  • تقييم مباشر: لنماذج صغيرة ومجموعات البيانات التقييم، المنسق يمكن تشغيل تقييم مباشرة على نموذج الموزعة مع بيانات التقييم على منسق:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • تقييم توزيع: للحصول على نماذج كبيرة أو مجموعات البيانات التي هي غير ممكنة لتشغيلها مباشرة على منسق، مهمة منسق يمكن توزيع المهام تقييم للعمال عن طريق ClusterCoordinator.schedule / ClusterCoordinator.join طرق:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

تقييم السيارة الجانبية

وتسمى طريقة أخرى تقييم الجانب السيارات حيث يمكنك إنشاء مهمة مقيم مخصصة الذي يقرأ مرارا وتكرارا نقاط التفتيش وتدير التقييم على أحدث نقطة تفتيش. يسمح لبرنامج التدريب الخاص بك بالانتهاء مبكرًا إذا لم تكن بحاجة إلى تغيير حلقة التدريب الخاصة بك بناءً على نتائج التقييم. ومع ذلك ، فإنه يتطلب مهمة مقيِّم إضافية ونقاط تفتيش دورية لتحريك التقييم. فيما يلي حلقة تقييم محتملة للسيارة الجانبية:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

مجموعات في العالم الحقيقي

في بيئة إنتاج حقيقية ، ستقوم بتشغيل جميع المهام في عمليات مختلفة على أجهزة مختلفة. إن أبسط طريقة لمعلومات كتلة تكوين على كل مهمة لتعيين "TF_CONFIG" متغيرات البيئة واستخدام tf.distribute.cluster_resolver.TFConfigClusterResolver تحليل "TF_CONFIG" .

للحصول على وصف العام حول "TF_CONFIG" متغيرات البيئة، تشير إلى التدريب الموزعة دليل.

إذا كنت تبدأ مهام التدريب باستخدام Kubernetes أو القوالب التكوين الأخرى، فمن المحتمل جدا أن هذه القوالب وقد وضعت بالفعل “TF_CONFIG" بالنسبة لك.

تعيين "TF_CONFIG" متغير البيئة

افترض أن لديك 3 عمال و 2 خوادم المعلمة، "TF_CONFIG" من عامل 1 يمكن أن يكون:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

و "TF_CONFIG" للمقيم يمكن أن يكون:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

و "cluster" جزء في أعلاه "TF_CONFIG" سلسلة لمقيم اختياري.

إذا كنت تستخدم نفس النظام الثنائي لجميع المهام

إذا كنت تفضل تشغيل كل هذه المهام باستخدام ثنائي واحد ، فستحتاج إلى السماح لفرع البرنامج الخاص بك بأدوار مختلفة في البداية:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

يبدأ الكود التالي تشغيل خادم TensorFlow وينتظر:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

معالجة فشل المهمة

فشل العامل

tf.distribute.experimental.coordinator.ClusterCoordinator أو Model.fit توفر المدمج في التسامح مع الخطأ لعدم عامل. بعد شفائهم عامل، وظيفة التي سبق تقديمها مجموعة البيانات (إما ل ClusterCoordinator.create_per_worker_dataset لحلقة تدريبية مخصصة، أو tf.keras.utils.experimental.DatasetCreator ل Model.fit سيتم استدعاء) على العمال لإعادة إنشاء قواعد البيانات.

فشل خادم المعلمة أو المنسق

ومع ذلك، عندما يرى منسق خطأ في الخادم المعلمة، وسوف تثير UnavailableError أو AbortedError على الفور. يمكنك إعادة تشغيل المنسق في هذه الحالة. يمكن أن يصبح المنسق نفسه غير متاح أيضًا. لذلك ، يوصى باستخدام أدوات معينة حتى لا تفقد تقدم التدريب:

  • ل Model.fit ، يجب عليك استخدام BackupAndRestore الاستدعاء، التي تتولى توفير التقدم واستعادة تلقائيا. انظر الاسترجاعات والتدريب المقطع أعلاه للحصول على مثال.

  • للحصول على حلقة تدريب مخصصة ، يجب عليك التحقق من متغيرات النموذج بشكل دوري وتحميل متغيرات النموذج من نقطة تحقق ، إن وجدت ، قبل بدء التدريب. ويمكن الاستدلال على ذلك التقدم تدريب ما يقرب من optimizer.iterations إذا تم checkpointed محسن:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

جلب ل RemoteValue

جلب ل RemoteValue مضمونة للنجاح إذا تم تنفيذ وظيفة بنجاح. هذا لأنه يتم حاليًا نسخ قيمة الإرجاع على الفور إلى المنسق بعد تنفيذ الوظيفة. إذا كان هناك أي فشل عامل أثناء النسخ ، فستتم إعادة محاولة الوظيفة على عامل آخر متاح. لذلك ، إذا كنت ترغب في تحسين الأداء ، يمكنك جدولة الوظائف بدون قيمة مرتجعة.

الإبلاغ عن الأخطاء

وبمجرد أن يرى منسق خطأ مثل UnavailableError من خوادم المعلمة أو أخطاء التطبيقات الأخرى مثل InvalidArgument من tf.debugging.check_numerics ، فإنه سيتم إلغاء جميع وظائف معلقة وقائمة الانتظار قبل رفع الخطأ. جلب لهم المقابلة RemoteValue سوف ق رفع CancelledError .

بعد ظهور خطأ ، لن يقوم المنسق بإثارة نفس الخطأ أو أي خطأ من الوظائف الملغاة.

تحسين الأداء

هناك عدة أسباب محتملة إذا كنت ترى مشكلات في الأداء عند تدريب مع ParameterServerStrategy و ClusterResolver .

أحد الأسباب الشائعة هو أن خوادم المعلمات بها حمل غير متوازن وقد وصلت بعض خوادم المعلمات المحملة بكثافة إلى السعة. يمكن أن يكون هناك أيضًا أسباب جذرية متعددة. بعض الطرق البسيطة للتخفيف من هذه المشكلة هي:

  1. شارد متغيرات نموذج كبيرة عن طريق تحديد variable_partitioner عند بناء ParameterServerStrategy .
  2. تجنب إنشاء متغير نقطة فعالة مطلوبًا من قبل جميع خوادم المعلمات في خطوة واحدة إن أمكن. على سبيل المثال، استخدام معدل التعلم ثابت أو فئة فرعية tf.keras.optimizers.schedules.LearningRateSchedule في أبتيميزر منذ السلوك الافتراضي هو أن معدل التعلم سوف تصبح متغير وضعت في الخادم المعلمة معين وطلب كافة ملقمات معلمة أخرى في كل خطوة .
  3. قم بتبديل مفرداتك الكبيرة قبل تمريرها إلى طبقات معالجة Keras المسبقة.

سبب آخر محتمل لقضايا الأداء هو المنسق. تنفيذ أول من الخاص بك schedule / join يستند بيثون، وبالتالي قد تكون خيوط في سماء المنطقة. كما يمكن أن يكون زمن الانتقال بين المنسق والعاملين كبيرًا. اذا كانت هذه القضيه،

  • ل Model.fit ، يمكنك تعيين steps_per_execution الحجة المقدمة في Model.compile إلى قيمة أكبر من 1.

  • لحلقة تدريبية مخصصة، يمكنك حزمة خطوات متعددة في واحد tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

مع تحسين المكتبة بشكل أكبر ، نأمل ألا يضطر معظم المستخدمين إلى حزم الخطوات يدويًا في المستقبل.

بالإضافة إلى ذلك ، هناك حيلة صغيرة لتحسين الأداء وهي جدولة الوظائف بدون قيمة مرتجعة كما هو موضح في قسم فشل مهمة المعالجة أعلاه.

القيود المعروفة

تمت تغطية معظم القيود المعروفة بالفعل في الأقسام أعلاه. يقدم هذا القسم ملخصا.

ParameterServerStrategy عام

  • os.environment["grpc_fail_fast"]="use_caller" هو مطلوب على كل مهمة بما في ذلك منسق، لجعل خطأ العمل التسامح بشكل صحيح.
  • تدريب خادم المعامل المتزامن غير مدعوم.
  • عادة ما يكون من الضروري تجميع خطوات متعددة في وظيفة واحدة لتحقيق الأداء الأمثل.
  • غير معتمد لتحميل saved_model عبر tf.saved_model.load تحتوي على متغيرات sharded. لاحظ أن تحميل مثل هذا النموذج المحفوظ باستخدام خدمة TensorFlow من المتوقع أن يعمل.
  • لا يتم دعم تحميل نقطة اختبار تحتوي على متغيرات فتحة مُحسِّن مُقسمة إلى عدد مختلف من الأجزاء.
  • لا يتم دعم الاسترداد من فشل خادم المعلمات بدون إعادة تشغيل مهمة المنسق.
  • استخدام tf.lookup.StaticHashTable (الذي يعمل عادة من قبل بعض tf.keras.layers.experimental.preprocessing طبقات، مثل IntegerLookup ، StringLookup ، و TextVectorization ) النتائج في الموارد وضعت على منسق في هذا الوقت مع التدريب الخادم المعلمة. هذا له آثار على الأداء للبحث عن RPCs من العاملين إلى المنسق. هذه هي الأولوية القصوى الحالية لمعالجتها.

Model.fit تفاصيل

  • steps_per_epoch مطلوب حجة في Model.fit . يمكنك تحديد قيمة توفر فترات زمنية مناسبة في حقبة ما.
  • ParameterServerStrategy لايوجد دعم للالاسترجاعات المخصصة التي لديها المكالمات على مستوى دفعة لأسباب تتعلق بالأداء. يجب تحويل تلك المكالمات إلى المكالمات على مستوى العصر مع اختار مناسبة steps_per_epoch ، بحيث تسمى كل steps_per_epoch عدد من الخطوات. لا تتأثر عمليات الاسترجاعات المضمنة: تم تعديل مكالماتها على مستوى المجموعة لتكون فعالة. دعم مكالمات مستوى دفعة ل ParameterServerStrategy يجري التخطيط.
  • للسبب نفسه ، بخلاف الاستراتيجيات الأخرى ، يتم تسجيل شريط التقدم والمقاييس فقط في حدود الفترة الزمنية.
  • run_eagerly غير معتمد.

تفاصيل حلقة التدريب المخصصة