تدريب متعدد العمال مع Estimator

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يوضح هذا البرنامج التعليمي كيف يمكن استخدام tf.distribute.Strategy لتوزيع تدريب متعدد العمال باستخدام tf.estimator . إذا قمت بكتابة الكود الخاص بك باستخدام tf.estimator ، وكنت مهتمًا tf.estimator إلى ما هو أبعد من جهاز واحد بأداء عالٍ ، فهذا البرنامج التعليمي يناسبك.

قبل البدء ، يرجى قراءة دليل إستراتيجية التوزيع . يعد البرنامج التعليمي التدريبي متعدد GPU مناسبًا أيضًا ، لأن هذا البرنامج التعليمي يستخدم نفس النموذج.

يثبت

أولاً ، قم بإعداد TensorFlow والواردات الضرورية.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

وظيفة الإدخال

يستخدم هذا البرنامج التعليمي مجموعة بيانات MNIST من مجموعات بيانات TensorFlow . يشبه الكود هنا البرنامج التعليمي التدريبي على وحدات معالجة الرسومات المتعددة مع اختلاف رئيسي واحد: عند استخدام المقدر للتدريب متعدد العمال ، من الضروري تقسيم مجموعة البيانات حسب عدد العمال لضمان تقارب النموذج. يتم تجزئة بيانات الإدخال بواسطة فهرس العامل ، بحيث يعالج كل عامل 1/num_workers أجزاء مميزة من مجموعة البيانات.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

هناك طريقة أخرى معقولة لتحقيق التقارب وهي خلط مجموعة البيانات ببذور مميزة لكل عامل.

تكوين متعدد العمال

أحد الاختلافات الرئيسية في هذا البرنامج التعليمي (مقارنة بالبرنامج التعليمي التدريبي متعدد وحدات معالجة الرسومات ) هو الإعداد متعدد العمال. متغير البيئة TF_CONFIG هو الطريقة القياسية لتحديد تكوين الكتلة لكل عامل يمثل جزءًا من الكتلة.

هناك نوعان من مكونات TF_CONFIG : cluster task . يوفر cluster معلومات حول الكتلة بأكملها ، أي العمال وخوادم المعلمات في الكتلة. task توفر معلومات حول المهمة الحالية. cluster المكون الأول هي نفسها لجميع العاملين وخوادم المعلمات في المجموعة ، task المكون الثاني مختلفة على كل عامل وخادم معلمة وتحدد type index الخاصين به. في هذا المثال ، type المهمة worker index المهمة هو 0 .

لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيفية تعيين TF_CONFIG مع عاملين على localhost TF_CONFIG . من الناحية العملية ، يمكنك إنشاء عدة عمال على عنوان IP خارجي ومنفذ ، وتعيين TF_CONFIG على كل عامل بشكل مناسب ، أي تعديل index المهام.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

حدد النموذج

اكتب الطبقات والمحسن ووظيفة الخسارة للتدريب. يحدد هذا البرنامج التعليمي النموذج باستخدام طبقات Keras ، على غرار البرنامج التعليمي للتدريب على وحدات معالجة الرسومات المتعددة .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

لتدريب النموذج ، استخدم مثيل tf.distribute.experimental.MultiWorkerMirroredStrategy . تُنشئ MultiWorkerMirroredStrategy نسخًا من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. وهي تستخدم CollectiveOps ، وهي عملية TensorFlow للاتصال الجماعي ، لتجميع التدرجات اللونية والحفاظ على تزامن المتغيرات. يحتوي دليل tf.distribute.Strategy على مزيد من التفاصيل حول هذه الإستراتيجية.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From <ipython-input-1-f1f424df316e>:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

تدريب وتقييم النموذج

بعد ذلك ، حدد استراتيجية التوزيع في RunConfig ، وقم بالتدريب والتقييم عن طريق استدعاء tf.estimator.train_and_evaluate . يوزع هذا البرنامج التعليمي التدريب فقط عن طريق تحديد الإستراتيجية عبر train_distribute . من الممكن أيضًا توزيع التقييم عبر eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7eff5c6afe50>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.141045.
INFO:tensorflow:Loss for final step: 1.141045.
({'loss': 2.2780812, 'global_step': 938}, [])

تحسين أداء التدريب

لديك الآن نموذج tf.distribute.Strategy قادر على تعدد العمال مدعوم من tf.distribute.Strategy . يمكنك تجربة الأساليب التالية لتحسين أداء التدريب متعدد العمال:

  • زيادة حجم الدُفعة: حجم الدُفعة المحدد هنا هو لكل وحدة معالجة رسومات. بشكل عام ، يُنصح باستخدام أكبر حجم للدفعة يناسب ذاكرة وحدة معالجة الرسومات.
  • متغيرات Cast: صب المتغيرات على tf.float إن أمكن. يتضمن نموذج ResNet الرسمي مثالاً على كيفية القيام بذلك.
  • استخدام الاتصال الجماعي: توفر MultiWorkerMirroredStrategy اتصال جماعي متعددة.

    • RING بتنفيذ مجموعات قائمة على الحلقات باستخدام gRPC كطبقة اتصال عبر المضيف.
    • يستخدم NCCL Nvidia لتنفيذ المجموعات
    • AUTO يؤجل الاختيار لوقت التشغيل.

    يعتمد أفضل خيار للتنفيذ الجماعي على عدد ونوع وحدات معالجة الرسومات ، والتوصيل البيني للشبكة في المجموعة. لتجاوز الاختيار التلقائي ، حدد قيمة صالحة لمعامل communication MultiWorkerMirroredStrategy ، على سبيل المثال ، communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

قم بزيارة قسم الأداء في الدليل لمعرفة المزيد حول الاستراتيجيات والأدوات الأخرى التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow.

أمثلة التعليمات البرمجية الأخرى

  1. مثال نهائي لتدريب العديد من العمال في نظام Tensorflow / النظام البيئي باستخدام قوالب Kubernetes. يبدأ هذا المثال بنموذج Keras ويحوله إلى مقدر باستخدام tf.keras.estimator.model_to_estimator API.
  2. النماذج الرسمية ، يمكن تكوين العديد منها لتشغيل استراتيجيات توزيع متعددة.