ترجمت واجهة Cloud Translation API‏ هذه الصفحة.
Switch to English

تدريب متعدد العمال مع Estimator

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

نظرة عامة

يوضح هذا البرنامج التعليمي كيف يمكن استخدام tf.distribute.Strategy لتوزيع تدريب متعدد العمال باستخدام tf.estimator . إذا قمت بكتابة الكود الخاص بك باستخدام tf.estimator ، وكنت مهتمًا tf.estimator إلى ما وراء جهاز واحد بأداء عالٍ ، فهذا البرنامج التعليمي يناسبك.

قبل البدء ، يرجى قراءة دليل إستراتيجية التوزيع . يعد البرنامج التعليمي التدريبي متعدد GPU مناسبًا أيضًا ، لأن هذا البرنامج التعليمي يستخدم نفس النموذج.

اقامة

أولاً ، قم بإعداد TensorFlow والواردات الضرورية.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json

وظيفة الإدخال

يستخدم هذا البرنامج التعليمي مجموعة بيانات MNIST من مجموعات بيانات TensorFlow . يشبه الكود هنا البرنامج التعليمي التدريبي متعدد وحدات معالجة الرسومات مع اختلاف رئيسي واحد: عند استخدام Estimator للتدريب متعدد العمال ، من الضروري تقسيم مجموعة البيانات حسب عدد العمال لضمان تقارب النموذج. يتم تقسيم بيانات الإدخال بواسطة فهرس العامل ، بحيث يقوم كل عامل بمعالجة 1/num_workers أجزاء مميزة من مجموعة البيانات.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

هناك طريقة أخرى معقولة لتحقيق التقارب وهي خلط مجموعة البيانات ببذور مميزة لكل عامل.

تكوين متعدد العمال

أحد الاختلافات الرئيسية في هذا البرنامج التعليمي (مقارنةً بالبرنامج التعليمي التدريبي متعدد وحدات معالجة الرسومات ) هو الإعداد متعدد العمال. متغير البيئة TF_CONFIG هو الطريقة القياسية لتحديد تكوين الكتلة لكل عامل يمثل جزءًا من الكتلة.

هناك نوعان من مكونات TF_CONFIG : cluster task . يوفر cluster معلومات حول الكتلة بأكملها ، أي العمال وخوادم المعلمات في الكتلة. task توفر معلومات حول المهمة الحالية. cluster المكونات الأولى هي نفسها لجميع العاملين وخوادم المعلمات في المجموعة ، task المكون الثاني مختلفة على كل عامل وخادم معلمة وتحدد type index الخاصين به. في هذا المثال ، type المهمة worker index المهمة هو 0 .

لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيفية تعيين TF_CONFIG مع عاملين على localhost TF_CONFIG . من الناحية العملية ، يمكنك إنشاء عدة عمال على عنوان IP خارجي ومنفذ ، وتعيين TF_CONFIG على كل عامل بشكل مناسب ، أي تعديل index المهام.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

حدد النموذج

اكتب الطبقات والمحسن ووظيفة الخسارة للتدريب. يحدد هذا البرنامج التعليمي النموذج مع طبقات Keras ، على غرار البرنامج التعليمي للتدريب على وحدات معالجة الرسومات المتعددة .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

لتدريب النموذج ، استخدم مثيل tf.distribute.experimental.MultiWorkerMirroredStrategy . تُنشئ MultiWorkerMirroredStrategy نسخًا من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. وهي تستخدم CollectiveOps ، وهي عملية TensorFlow للاتصال الجماعي ، لتجميع التدرجات اللونية والحفاظ على تزامن المتغيرات. يحتوي دليل tf.distribute.Strategy على مزيد من التفاصيل حول هذه الاستراتيجية.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

تدريب وتقييم النموذج

بعد ذلك ، حدد استراتيجية التوزيع في RunConfig ، وقم بالتدريب والتقييم من خلال استدعاء tf.estimator.train_and_evaluate . يوزع هذا البرنامج التعليمي التدريب فقط عن طريق تحديد الإستراتيجية عبر train_distribute . من الممكن أيضًا توزيع التقييم عبر eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f4c6c18af98>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1389045.

INFO:tensorflow:Loss for final step: 1.1389045.

({'loss': 2.276255, 'global_step': 938}, [])

تحسين أداء التدريب

لديك الآن نموذج tf.distribute.Strategy قادر على تعدد العمال مدعوم من tf.distribute.Strategy . يمكنك تجربة الأساليب التالية لتحسين أداء التدريب متعدد العمال:

  • زيادة حجم الدُفعة: حجم الدُفعة المحدد هنا هو لكل وحدة معالجة رسومات. بشكل عام ، يُنصح باستخدام أكبر حجم للدفعة يناسب ذاكرة وحدة معالجة الرسومات.
  • متغيرات Cast: صب المتغيرات على tf.float إن أمكن. يتضمن نموذج ResNet الرسمي مثالاً على كيفية القيام بذلك.
  • استخدام الاتصال الجماعي: توفر MultiWorkerMirroredStrategy العديد من تطبيقات الاتصال الجماعي .

    • تنفذ RING مجموعات قائمة على الحلقات باستخدام gRPC كطبقة اتصال عبر المضيف.
    • يستخدم NCCL Nvidia لتنفيذ المجموعات.
    • AUTO يؤجل الاختيار لوقت التشغيل.

    يعتمد أفضل خيار للتنفيذ الجماعي على عدد ونوع وحدات معالجة الرسومات ، والتوصيل البيني للشبكة في المجموعة. لتجاوز الاختيار التلقائي ، حدد قيمة صالحة لمعامل communication الخاص MultiWorkerMirroredStrategy ، على سبيل المثال ، communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

قم بزيارة قسم الأداء في الدليل لمعرفة المزيد حول الاستراتيجيات والأدوات الأخرى التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow.

أمثلة التعليمات البرمجية الأخرى

  1. مثال نهائي لتدريب العديد من العمال في نظام Tensorflow / النظام البيئي باستخدام قوالب Kubernetes. يبدأ هذا المثال بنموذج Keras ويحوله إلى مقدر باستخدام tf.keras.estimator.model_to_estimator API.
  2. النماذج الرسمية ، يمكن تكوين العديد منها لتشغيل استراتيجيات توزيع متعددة.