หน้านี้ได้รับการแปลโดย Cloud Translation API
Switch to English

การฝึกอบรมผู้ปฏิบัติงานหลายคนด้วยเครื่องมือประมาณการ

ดูใน TensorFlow.org เรียกใช้ใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดสมุดบันทึก

ภาพรวม

บทช่วยสอนนี้สาธิตวิธีใช้ tf.distribute.Strategy สำหรับการฝึกอบรมผู้ปฏิบัติงานหลายคนแบบกระจายกับ tf.estimator หากคุณเขียนโค้ดของคุณโดยใช้ tf.estimator และคุณสนใจที่จะปรับขนาดมากกว่าเครื่องเดียวที่มีประสิทธิภาพสูงบทช่วยสอนนี้เหมาะสำหรับคุณ

ก่อนเริ่มต้นโปรดอ่านคู่มือ กลยุทธ์การจัดจำหน่าย บทช่วยสอนการฝึกอบรมหลาย GPU ก็เกี่ยวข้องเช่นกันเนื่องจากบทช่วยสอนนี้ใช้รูปแบบเดียวกัน

ติดตั้ง

ขั้นแรกตั้งค่า TensorFlow และการนำเข้าที่จำเป็น

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json

ฟังก์ชั่นการป้อนข้อมูล

บทช่วยสอนนี้ใช้ชุดข้อมูล MNIST จาก TensorFlow Datasets รหัสที่นี่คล้ายกับ บทช่วยสอนการฝึกอบรม GPU แบบหลาย GPU ที่ มีข้อแตกต่างที่สำคัญประการหนึ่งคือเมื่อใช้เครื่องมือประเมินสำหรับการฝึกอบรมผู้ปฏิบัติงานหลายคนจำเป็นต้องแบ่งชุดข้อมูลตามจำนวนคนงานเพื่อให้แน่ใจว่าโมเดลจะบรรจบกัน ข้อมูลอินพุตจะถูกแบ่งตามดัชนีของผู้ปฏิบัติงานเพื่อให้ผู้ปฏิบัติงานแต่ละคนประมวลผลส่วนที่แตกต่างกันของชุดข้อมูล 1/num_workers

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

อีกวิธีหนึ่งที่สมเหตุสมผลเพื่อให้เกิดการบรรจบกันคือการสับเปลี่ยนชุดข้อมูลด้วยเมล็ดพันธุ์ที่แตกต่างกันในแต่ละคน

การกำหนดค่าหลายคน

ความแตกต่างที่สำคัญอย่างหนึ่งในบทช่วยสอนนี้ (เมื่อเทียบกับ บทช่วยสอนการฝึกอบรมหลาย GPU ) คือการตั้งค่าผู้ปฏิบัติงานหลายคน ตัวแปรสภาพแวดล้อม TF_CONFIG เป็นวิธีมาตรฐานในการระบุคอนฟิกูเรชันคลัสเตอร์ให้กับผู้ปฏิบัติงานแต่ละคนที่เป็นส่วนหนึ่งของคลัสเตอร์

TF_CONFIG มีสององค์ประกอบ: cluster และ task cluster ให้ข้อมูลเกี่ยวกับคลัสเตอร์ทั้งหมด ได้แก่ คนงานและเซิร์ฟเวอร์พารามิเตอร์ในคลัสเตอร์ task ให้ข้อมูลเกี่ยวกับงานปัจจุบัน cluster คอมโพเนนต์แรกจะเหมือนกันสำหรับผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์ทั้งหมดในคลัสเตอร์และ task คอมโพเนนต์ที่สองจะแตกต่างกันในแต่ละเซิร์ฟเวอร์ของผู้ปฏิบัติงานและพารามิเตอร์และระบุ type และ index ตนเอง ในตัวอย่างนี้ type งานคือ worker และ index งานคือ 0

เพื่อเป็นภาพประกอบบทช่วยสอนนี้จะแสดงวิธีการตั้งค่า TF_CONFIG กับคนงาน 2 คนใน localhost ในทางปฏิบัติคุณจะสร้างผู้ปฏิบัติงานหลายคนบนที่อยู่ IP ภายนอกและพอร์ตและตั้งค่า TF_CONFIG กับผู้ปฏิบัติงานแต่ละคนอย่างเหมาะสมเช่นแก้ไข index งาน

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

กำหนดรูปแบบ

เขียนเลเยอร์เครื่องมือเพิ่มประสิทธิภาพและฟังก์ชันการสูญเสียสำหรับการฝึกอบรม บทช่วยสอนนี้กำหนดโมเดลด้วยเลเยอร์ Keras คล้ายกับ บทช่วยสอนการฝึกอบรม Multi-GPU

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirrored ยุทธศาสตร์

ในการฝึกโมเดลให้ใช้อินสแตนซ์ของ tf.distribute.experimental.MultiWorkerMirroredStrategy MultiWorkerMirroredStrategy สร้างสำเนาของตัวแปรทั้งหมดในเลเยอร์ของโมเดลบนอุปกรณ์แต่ละเครื่องในพนักงานทั้งหมด มันใช้ CollectiveOps ซึ่งเป็น TensorFlow op สำหรับการสื่อสารแบบรวมเพื่อรวมการไล่ระดับสีและทำให้ตัวแปรซิงค์กัน คู่มือ tf.distribute.Strategy มีรายละเอียดเพิ่มเติมเกี่ยวกับกลยุทธ์นี้

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

ฝึกอบรมและประเมินโมเดล

จากนั้นระบุกลยุทธ์การกระจายใน RunConfig สำหรับตัวประมาณค่าและฝึกอบรมและประเมินโดยเรียกใช้ tf.estimator.train_and_evaluate บทแนะนำนี้กระจายเฉพาะการฝึกอบรมโดยระบุกลยุทธ์ผ่าน train_distribute นอกจากนี้ยังสามารถแจกจ่ายการประเมินผ่าน eval_distribute

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f4c6c18af98>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

Warning: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1389045.

INFO:tensorflow:Loss for final step: 1.1389045.

({'loss': 2.276255, 'global_step': 938}, [])

เพิ่มประสิทธิภาพการฝึกอบรม

ตอนนี้คุณมีแบบจำลองและเครื่องมือประมาณการที่มีความสามารถหลายคนซึ่งขับเคลื่อนโดย tf.distribute.Strategy คุณสามารถลองใช้เทคนิคต่อไปนี้เพื่อเพิ่มประสิทธิภาพการฝึกอบรมผู้ปฏิบัติงานหลายคน:

  • เพิ่มขนาดแบทช์: ขนาด แบทช์ที่ระบุที่นี่คือต่อ GPU โดยทั่วไปขอแนะนำให้ใช้ขนาดแบทช์ที่ใหญ่ที่สุดที่พอดีกับหน่วยความจำ GPU
  • แคสต์ตัวแปร: แคสต์ตัวแปรเป็น tf.float ถ้าเป็นไปได้ โมเดล ResNet อย่างเป็นทางการมี ตัวอย่าง วิธีการนี้
  • ใช้การสื่อสารแบบรวม: MultiWorkerMirroredStrategy ให้ การใช้งานการสื่อสารแบบรวม หลาย ๆ

    • RING สร้างกลุ่มที่ใช้วงแหวนโดยใช้ gRPC เป็นเลเยอร์การสื่อสารข้ามโฮสต์
    • NCCL ใช้ NCCL ของ Nvidia เพื่อใช้งาน Collectives
    • AUTO เลื่อนตัวเลือกไปที่รันไทม์

    ทางเลือกที่ดีที่สุดของการใช้งานร่วมกันขึ้นอยู่กับจำนวนและประเภทของ GPU และการเชื่อมต่อเครือข่ายในคลัสเตอร์ ในการแทนที่ตัวเลือกอัตโนมัติให้ระบุค่าที่ถูกต้องให้กับพารามิเตอร์ communication ของตัว MultiWorkerMirroredStrategy เช่น communication=tf.distribute.experimental.CollectiveCommunication.NCCL

ไปที่ ส่วนประสิทธิภาพ ในคำแนะนำเพื่อเรียนรู้เพิ่มเติมเกี่ยวกับกลยุทธ์และ เครื่องมือ อื่น ๆ ที่คุณสามารถใช้เพื่อเพิ่มประสิทธิภาพของโมเดล TensorFlow ของคุณ

ตัวอย่างโค้ดอื่น ๆ

  1. ตัวอย่าง end to end สำหรับการฝึกอบรมพนักงานหลายคนในเทนเซอร์โฟลว์ / ระบบนิเวศโดยใช้เทมเพลต Kubernetes ตัวอย่างนี้เริ่มต้นด้วยโมเดล Keras และแปลงเป็น Estimator โดยใช้ tf.keras.estimator.model_to_estimator API
  2. แบบจำลองอย่างเป็นทางการ ซึ่งหลายแบบสามารถกำหนดค่าให้เรียกใช้กลยุทธ์การกระจายได้หลายแบบ