آموزش چند کارگر با Estimator

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش نشان می دهد که چگونه tf.distribute.Strategy می تواند برای آموزش توزیع شده چند کارگر با tf.estimator . اگر کد خود را با استفاده از tf.estimator و علاقه دارید مقیاس آن فراتر از یک ماشین واحد با کارایی بالا باشد ، این آموزش برای شما مناسب است.

قبل از شروع ، لطفا راهنمای استراتژی توزیع را بخوانید. آموزش آموزش چند GPU نیز مرتبط است ، زیرا این آموزش از همان مدل استفاده می کند.

برپایی

ابتدا TensorFlow و واردات لازم را نصب کنید.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

عملکرد ورودی

این آموزش از مجموعه داده های MNIST از مجموعه داده های TensorFlow استفاده می کند . این کد در اینجا مشابه آموزش آموزش چند GPU است با یک تفاوت اساسی: هنگام استفاده از برآوردگر برای آموزش چند کارگر ، برای اطمینان از همگرایی مدل لازم است که مجموعه داده را بر اساس تعداد کارگران خرد کنید. داده های ورودی توسط شاخص کارگر تقسیم می شود ، به طوری که هر کارگر 1/num_workers از بخشهای مختلف مجموعه داده پردازش می کند.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

رویکرد منطقی دیگر برای دستیابی به همگرایی ، مخلوط کردن مجموعه داده با دانه های مشخص در هر کارگر است.

پیکربندی چند کارگر

یکی از تفاوتهای اساسی در این آموزش (در مقایسه با آموزش چند GPU ) ، تنظیمات چند کاره است. متغیر محیطی TF_CONFIG روش استاندارد برای تعیین پیکربندی خوشه برای هر کارگر است که بخشی از خوشه است.

دو جز components TF_CONFIG : cluster و task . cluster اطلاعات مربوط به کل خوشه ، یعنی کارگران و سرورهای پارامتر را در cluster فراهم می کند. task اطلاعات مربوط به وظیفه فعلی را فراهم می کند. cluster componentلفه اول برای همه کارگران و سرورهای پارامتر موجود در کلاستر یکسان است و task componentلفه دوم در هر سرور کارگر و پارامتر متفاوت است و type و index خود را مشخص می کند. در این مثال ، type کار worker و index وظیفه 0 .

برای اهداف تصویرسازی ، این آموزش نحوه تنظیم TF_CONFIG با 2 کارگر در localhost . در عمل ، شما می توانید چندین کارگر در یک آدرس IP و پورت خارجی ایجاد کنید ، و TF_CONFIG روی هر کارگر به طور مناسب تنظیم کنید ، یعنی index کار را تغییر دهید.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

مدل را تعریف کنید

برای آموزش لایه ها ، بهینه ساز و عملکرد از دست دادن را بنویسید. این آموزش ، مدل را با لایه های Keras ، شبیه آموزش چند GPU ، تعریف می کند.

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

برای آموزش مدل ، از نمونه ای از tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy کپی از تمام متغیرها را در لایه های مدل در هر دستگاه در همه کارگران ایجاد می کند. برای جمع کردن شیب ها و همگام سازی متغیرها از CollectiveOps ، یک اپلیکیشن TensorFlow برای ارتباط جمعی استفاده می کند. راهنمای tf.distribute.Strategy جزئیات بیشتری در مورد این استراتژی دارد.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From <ipython-input-1-f1f424df316e>:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

مدل را آموزش دهید و ارزیابی کنید

بعد ، استراتژی توزیع را در RunConfig برای برآوردگر مشخص کنید و با فراخوانی tf.estimator.train_and_evaluate آموزش و ارزیابی کنید. این آموزش با تعیین استراتژی از طریق train_distribute فقط آموزش را توزیع می کند. توزیع ارزیابی از طریق eval_distribute نیز امکان پذیر است.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7eff5c6afe50>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.141045.
INFO:tensorflow:Loss for final step: 1.141045.
({'loss': 2.2780812, 'global_step': 938}, [])

عملکرد آموزش را بهینه کنید

شما اکنون یک مدل و یک برآوردگر چند کارگر با قدرت tf.distribute.Strategy . برای بهینه سازی عملکرد آموزش چند کارگر می توانید تکنیک های زیر را امتحان کنید:

  • افزایش اندازه دسته: اندازه دسته مشخص شده در اینجا برای هر GPU است. به طور کلی ، بزرگترین اندازه دسته ای که متناسب با حافظه GPU است توصیه می شود.
  • Cast variables: در صورت امکان متغیرها را به tf.float کنید. مدل رسمی ResNet مثالی از چگونگی انجام این کار را شامل می شود.
  • از ارتباطات جمعی استفاده کنید: MultiWorkerMirroredStrategy چندین پیاده سازی ارتباط جمعی را فراهم می کند.

    • RING مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی بین میزبان پیاده سازی می کند.
    • NCCL از NCCL انویدیا برای اجرای مجموعه ها استفاده می کند.
    • AUTO انتخاب را به زمان اجرا موکول می کند.

    بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPU ها و اتصال شبکه در خوشه بستگی دارد. برای لغو انتخاب خودکار ، مقدار معتبری برای پارامتر communication سازنده MultiWorkerMirroredStrategy مشخص کنید ، به عنوان مثال communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

برای کسب اطلاعات بیشتر در مورد سایر استراتژی ها و ابزارهایی که می توانید برای بهینه سازی عملکرد مدل های TensorFlow خود استفاده کنید ، به بخش عملکرد در راهنما مراجعه کنید.

مثالهای کد دیگر

  1. مثال آخر تا آخر برای آموزش چند کارگر در جریان تنش / اکوسیستم با استفاده از الگوهای Kubernetes. این مثال با یک مدل Keras شروع می شود و با استفاده از API tf.keras.estimator.model_to_estimator آن را به Estimator تبدیل می کند.
  2. مدل های رسمی ، که بسیاری از آنها را می توان برای اجرای چندین استراتژی توزیع پیکربندی کرد.