ช่วยปกป้อง Great Barrier Reef กับ TensorFlow บน Kaggle เข้าร่วมท้าทาย

การฝึกอบรมพนักงานหลายคนด้วยเครื่องมือประมาณการ

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

ภาพรวม

กวดวิชานี้จะแสดงให้เห็นถึงวิธีการ tf.distribute.Strategy สามารถนำมาใช้สำหรับการฝึกอบรมหลายงานกระจายกับ tf.estimator ถ้าคุณเขียนรหัสของคุณโดยใช้ tf.estimator และคุณกำลังสนใจในการปรับเกินเครื่องเดียวที่มีประสิทธิภาพสูง, กวดวิชานี้เหมาะสำหรับคุณ

ก่อนที่จะเริ่มต้นโปรดอ่าน กลยุทธ์การกระจาย คู่มือ multi-GPU ฝึกอบรมกวดวิชา นี้ยังเกี่ยวข้องเพราะการกวดวิชานี้ใช้รูปแบบเดียวกัน

ติดตั้ง

ขั้นแรก ตั้งค่า TensorFlow และการนำเข้าที่จำเป็น

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

ฟังก์ชันอินพุต

กวดวิชานี้ใช้ชุดข้อมูล MNIST จาก TensorFlow ชุดข้อมูล รหัสที่นี่จะคล้ายกับ การกวดวิชาของการฝึกอบรม multi-GPU มีความแตกต่างที่สำคัญอย่างหนึ่ง: เมื่อใช้ประมาณการสำหรับการฝึกอบรมหลายงานก็เป็นสิ่งจำเป็นที่จะ Shard ชุดโดยจำนวนของแรงงานเพื่อให้แน่ใจว่าการบรรจบรุ่น ป้อนข้อมูลเป็น sharded โดยดัชนีผู้ปฏิบัติงานเพื่อให้พนักงานแต่ละคนจะประมวลผล 1/num_workers ส่วนที่แตกต่างของชุดข้อมูล

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

แนวทางที่สมเหตุสมผลอีกวิธีหนึ่งในการบรรลุการบรรจบกันคือการสับเปลี่ยนชุดข้อมูลด้วยเมล็ดพันธุ์ที่แตกต่างกันในแต่ละคนงาน

การกำหนดค่าผู้ปฏิบัติงานหลายคน

หนึ่งในความแตกต่างที่สำคัญในการกวดวิชานี้ (เมื่อเทียบกับ การฝึกอบรม multi-GPU กวดวิชา ) คือการติดตั้งหลายงาน TF_CONFIG ตัวแปรสภาพแวดล้อมเป็นวิธีมาตรฐานในการระบุการกำหนดค่าคลัสเตอร์เพื่อพนักงานแต่ละคนที่เป็นส่วนหนึ่งของกลุ่ม

มีสององค์ประกอบของการมี TF_CONFIG : cluster และ task cluster ให้ข้อมูลเกี่ยวกับทั้งคลัสเตอร์คือคนงานและเซิร์ฟเวอร์พารามิเตอร์ในคลัสเตอร์ task ให้ข้อมูลเกี่ยวกับงานปัจจุบัน องค์ประกอบแรก cluster จะเหมือนกันสำหรับคนงานทุกคนและเซิร์ฟเวอร์พารามิเตอร์ในคลัสเตอร์และส่วนที่สอง task ที่แตกต่างกันในแต่ละคนงานและพารามิเตอร์ของเซิร์ฟเวอร์และระบุของตัวเอง type และ index ในตัวอย่างนี้งาน type คือ worker และงาน index เป็น 0

เพื่อวัตถุประสงค์ในภาพประกอบนี้แสดงให้เห็นว่าการสอนวิธีการตั้งค่า TF_CONFIG มี 2 คนงานใน localhost ในทางปฏิบัติคุณจะสร้างคนงานหลายคนในที่อยู่ภายนอก IP และพอร์ตและชุด TF_CONFIG ในแต่ละคนงานอย่างเหมาะสมเช่นการปรับเปลี่ยนงาน index

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

กำหนดรูปแบบ

เขียนเลเยอร์ เครื่องมือเพิ่มประสิทธิภาพ และฟังก์ชันการสูญเสียสำหรับการฝึก กวดวิชานี้จะกำหนดรูปแบบด้วยชั้น Keras คล้ายกับ การฝึกอบรม multi-GPU กวดวิชา

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

ในการฝึกอบรมรูปแบบการใช้ตัวอย่างของ tf.distribute.experimental.MultiWorkerMirroredStrategy MultiWorkerMirroredStrategy สร้างสำเนาของตัวแปรทั้งหมดในชั้นของแบบจำลองบนอุปกรณ์ทั่วทุกคนแต่ละคน มันใช้ CollectiveOps , สหกรณ์ TensorFlow สำหรับการสื่อสารรวมเพื่อการไล่ระดับสีรวมและให้ตัวแปรในการซิงค์ tf.distribute.Strategy คู่มือ มีรายละเอียดเพิ่มเติมเกี่ยวกับกลยุทธ์นี้

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_1884/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

ฝึกและประเมินแบบจำลอง

ถัดไประบุกลยุทธ์การกระจายใน RunConfig สำหรับประมาณการและการฝึกอบรมและประเมินผลโดยการเรียก tf.estimator.train_and_evaluate กวดวิชานี้จะจัดจำหน่ายเฉพาะการฝึกอบรมโดยการระบุกลยุทธ์ที่ผ่าน train_distribute นอกจากนี้ยังเป็นไปได้ที่จะกระจายการประเมินผลผ่าน eval_distribute

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7fa86c4c8950>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:374: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2021-09-09 01:25:08.941607: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2021-09-09 01:25:08.942715: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.135354.
INFO:tensorflow:Loss for final step: 1.135354.
({'loss': 2.2692595, 'global_step': 938}, [])

เพิ่มประสิทธิภาพการฝึก

ตอนนี้คุณมีรูปแบบและหลายงานประมาณการความสามารถขับเคลื่อนโดย tf.distribute.Strategy คุณสามารถลองใช้เทคนิคต่อไปนี้เพื่อเพิ่มประสิทธิภาพการฝึกอบรมผู้ปฏิบัติงานหลายคนได้:

  • เพิ่มขนาดชุด: ชุดขนาดที่ระบุไว้ที่นี่เป็นต่อ GPU โดยทั่วไป แนะนำให้ใช้ขนาดแบทช์ที่ใหญ่ที่สุดที่เหมาะกับหน่วยความจำ GPU
  • ตัวแปรที่นักแสดง: โพลล์ตัวแปรเพื่อ tf.float ถ้าเป็นไปได้ รุ่น RESNET อย่างเป็นทางการรวมถึง ตัวอย่าง ของวิธีการนี้สามารถทำได้
  • ใช้การสื่อสารรวม: MultiWorkerMirroredStrategy ให้หลาย การใช้งานการสื่อสารโดยรวม

    • RING ดำเนินการสหกรณ์แหวนโดยใช้ gRPC เป็นชั้นการสื่อสารข้ามโฮสต์
    • NCCL ใช้ ของ Nvidia NCCL ในการดำเนินการสหกรณ์
    • AUTO คล้อยตามทางเลือกที่จะรันไทม์

    ทางเลือกที่ดีที่สุดของการใช้งานแบบรวมขึ้นอยู่กับจำนวนและประเภทของ GPU และการเชื่อมต่อเครือข่ายในคลัสเตอร์ เมื่อต้องการแทนที่ตัวเลือกอัตโนมัติระบุค่าที่ถูกต้องกับ communication พารามิเตอร์ของ MultiWorkerMirroredStrategy คอนสตรัค 's เช่น communication=tf.distribute.experimental.CollectiveCommunication.NCCL

เยี่ยมชม ส่วนผลการดำเนินงาน ในคู่มือที่จะเรียนรู้เพิ่มเติมเกี่ยวกับกลยุทธ์อื่น ๆ และ เครื่องมือที่ คุณสามารถใช้เพื่อเพิ่มประสิทธิภาพการทำงานของแบบจำลอง TensorFlow ของคุณ

ตัวอย่างโค้ดอื่นๆ

  1. จบท้ายตัวอย่าง สำหรับการฝึกอบรมคนงานหลายใน tensorflow / ระบบนิเวศโดยใช้แม่แบบ Kubernetes ตัวอย่างนี้เริ่มต้นด้วยรูปแบบ Keras และแปลงไปสู่การประมาณการใช้ tf.keras.estimator.model_to_estimator API
  2. รุ่นอย่างเป็นทางการ หลายแห่งซึ่งสามารถกำหนดค่าให้เรียกใช้กลยุทธ์การจัดจำหน่ายหลาย