การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ด้วย ParameterServerStrategy

จัดทุกอย่างให้เป็นระเบียบอยู่เสมอด้วยคอลเล็กชัน บันทึกและจัดหมวดหมู่เนื้อหาตามค่ากำหนดของคุณ

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

ภาพรวม

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ เป็นวิธีคู่ขนานข้อมูลทั่วไปในการขยายขนาดการฝึกโมเดลบนเครื่องหลายเครื่อง

คลัสเตอร์การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ประกอบด้วย ผู้ปฏิบัติงาน และ เซิร์ฟเวอร์พารามิเตอร์ ตัวแปรถูกสร้างขึ้นบนเซิร์ฟเวอร์พารามิเตอร์ และจะถูกอ่านและอัปเดตโดยผู้ปฏิบัติงานในแต่ละขั้นตอน โดยค่าเริ่มต้น ผู้ปฏิบัติงานจะอ่านและอัปเดตตัวแปรเหล่านี้อย่างอิสระโดยไม่ทำข้อมูลให้ตรงกัน นี่คือเหตุผลที่บางครั้งการฝึกแบบพารามิเตอร์เซิร์ฟเวอร์เรียกว่า การฝึกแบบอะซิงโครนัส

ใน TensorFlow 2 การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ขับเคลื่อนโดยคลาส tf.distribute.experimental.ParameterServerStrategy ซึ่งกระจายขั้นตอนการฝึกอบรมไปยังคลัสเตอร์ที่ขยายได้ถึงหลายพันคน (พร้อมกับเซิร์ฟเวอร์พารามิเตอร์)

วิธีการฝึกอบรมที่รองรับ

มีสองวิธีการฝึกอบรมที่ได้รับการสนับสนุนหลัก:

คลัสเตอร์ที่มีงานและงาน

โดยไม่คำนึงถึง API ที่เลือก ( Model.fit หรือลูปการฝึกอบรมแบบกำหนดเอง) การฝึกอบรมแบบกระจายใน TensorFlow 2 เกี่ยวข้องกับ: 'cluster' ที่มี 'jobs' หลายรายการ และแต่ละงานอาจมี 'tasks' อย่างน้อยหนึ่งรายการ

เมื่อใช้การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ ขอแนะนำให้มี:

  • หนึ่ง งานผู้ประสาน งาน (ซึ่งมีชื่องาน chief )
  • งานของผู้ปฏิบัติงานหลาย คน (ชื่อ worker ); และ
  • งาน เซิร์ฟเวอร์พารามิเตอร์ หลายรายการ (ชื่องาน ps )

ในขณะที่ผู้ ประสานงาน สร้างทรัพยากร ส่งงานฝึกอบรม เขียนจุดตรวจสอบ และจัดการกับความล้มเหลวของ งาน ผู้ปฏิบัติงาน และ เซิร์ฟเวอร์พารามิเตอร์ จะเรียกใช้ tf.distribute.Server ที่รับฟังคำขอจากผู้ประสานงาน

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ด้วย Model.fit API

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ด้วย Model.fit API ต้องการให้ผู้ประสานงานใช้อ็อบเจ็กต์ tf.distribute.experimental.ParameterServerStrategy และ tf.keras.utils.experimental.DatasetCreator เป็นอินพุต คล้ายกับการใช้งาน Model.fit โดยไม่มีกลยุทธ์หรือกับกลยุทธ์อื่นๆ เวิร์กโฟลว์เกี่ยวข้องกับการสร้างและรวบรวมโมเดล การเตรียมการเรียกกลับ ตามด้วยการเรียก Model.fit

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์พร้อมลูปการฝึกแบบกำหนดเอง

ด้วยลูปการฝึกแบบกำหนดเอง คลาส tf.distribute.experimental.coordinator.ClusterCoordinator เป็นองค์ประกอบหลักที่ใช้สำหรับผู้ประสานงาน

  • คลาส ClusterCoordinator ต้องทำงานร่วมกับอ็อบเจ็กต์ tf.distribute.Strategy
  • ออบเจ็กต์ tf.distribute.Strategy นี้จำเป็นสำหรับการจัดเตรียมข้อมูลของคลัสเตอร์ และใช้เพื่อกำหนดขั้นตอนการฝึก ดังที่แสดงไว้ใน การฝึกแบบกำหนดเองด้วย tf.distribute.Strategy
  • วัตถุ ClusterCoordinator จะส่งการดำเนินการของขั้นตอนการฝึกอบรมเหล่านี้ไปยังผู้ปฏิบัติงานระยะไกล
  • สำหรับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ ClusterCoordinator จำเป็นต้องทำงานกับ tf.distribute.experimental.ParameterServerStrategy

API ที่สำคัญที่สุดที่จัดเตรียมโดยอ็อบเจ็กต์ ClusterCoordinator คือ schedule :

  • API schedule คิว tf.function และส่งคืน RemoteValue ที่เหมือนอนาคตทันที
  • ฟังก์ชันที่อยู่ในคิวจะถูกส่งไปยังผู้ปฏิบัติงานระยะไกลในเธรดพื้นหลัง และ RemoteValue ของพวกเขาจะถูกเติมแบบอะซิงโครนัส
  • เนื่องจาก schedule ไม่ต้องการการมอบหมายผู้ปฏิบัติงาน tf.function ที่ส่งผ่านจึงสามารถดำเนินการกับผู้ปฏิบัติงานที่มีอยู่ได้
  • หากผู้ปฏิบัติงานที่ดำเนินการอยู่ไม่พร้อมใช้งานก่อนที่จะเสร็จสมบูรณ์ ฟังก์ชันจะถูกลองอีกครั้งกับผู้ปฏิบัติงานอื่นที่พร้อมใช้งาน
  • เนื่องจากข้อเท็จจริงนี้และการเรียกใช้ฟังก์ชันไม่ใช่อะตอมมิก ฟังก์ชันอาจถูกเรียกใช้งานมากกว่าหนึ่งครั้ง

นอกจากการส่งฟังก์ชันระยะไกลแล้ว ClusterCoordinator ยังช่วยสร้างชุดข้อมูลบนผู้ปฏิบัติงานทั้งหมด และสร้างชุดข้อมูลเหล่านี้ใหม่เมื่อผู้ปฏิบัติงานกู้คืนจากความล้มเหลว

ตั้งค่าการสอน

บทช่วยสอนจะแยกสาขาออกเป็น Model.fit และเส้นทางวนรอบการฝึกแบบกำหนดเอง และคุณสามารถเลือกเส้นทางที่เหมาะกับความต้องการของคุณได้ ส่วนอื่นที่ไม่ใช่ "การฝึกกับ X" ใช้ได้กับทั้งสองเส้นทาง

pip install portpicker

การตั้งค่าคลัสเตอร์

ดังที่กล่าวไว้ข้างต้น คลัสเตอร์การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ต้องการงานผู้ประสานงานที่รันโปรแกรมการฝึกอบรมของคุณ ผู้ปฏิบัติงานหนึ่งรายหรือหลายราย และงานเซิร์ฟเวอร์พารามิเตอร์ที่เรียกใช้เซิร์ฟเวอร์ tf.distribute.Server — และอาจเป็นงานการประเมินเพิ่มเติมที่เรียกใช้การประเมินแบบ side-car (ดูส่วนการประเมินรถด้านข้างด้านล่าง) ข้อกำหนดในการตั้งค่าคือ:

  • งานผู้ประสานงานจำเป็นต้องทราบที่อยู่และพอร์ตของเซิร์ฟเวอร์ TensorFlow อื่นๆ ทั้งหมด ยกเว้นผู้ประเมิน
  • ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์จำเป็นต้องรู้ว่าพอร์ตใดที่พวกเขาต้องรับฟัง เพื่อความง่าย คุณสามารถส่งข้อมูลคลัสเตอร์ที่สมบูรณ์เมื่อสร้างเซิร์ฟเวอร์ TensorFlow ในงานเหล่านี้
  • งานผู้ประเมินไม่จำเป็นต้องรู้การตั้งค่าของคลัสเตอร์การฝึกอบรม หากเป็นเช่นนั้น ก็ไม่ควรพยายามเชื่อมต่อกับคลัสเตอร์การฝึก
  • ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์ควรมีประเภทงานเป็น "worker" และ "ps" ตามลำดับ ผู้ประสานงานควรใช้ "chief" เป็นประเภทงานด้วยเหตุผลเดิม

ในบทช่วยสอนนี้ คุณจะสร้างคลัสเตอร์ระหว่างดำเนินการเพื่อให้สามารถเรียกใช้การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ทั้งหมดใน Colab คุณจะได้เรียนรู้วิธีตั้งค่า คลัสเตอร์จริง ในส่วนภายหลัง

คลัสเตอร์ในกระบวนการ

คุณจะเริ่มต้นด้วยการสร้างเซิร์ฟเวอร์ TensorFlow หลายตัวล่วงหน้าและเชื่อมต่อในภายหลัง โปรดทราบว่านี่เป็นเพียงเพื่อการสาธิตของบทช่วยสอนนี้เท่านั้น และในการฝึกอบรมจริง เซิร์ฟเวอร์จะเริ่มต้นบนเครื่อง "worker" และ "ps"

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

การตั้งค่าคลัสเตอร์ในกระบวนการมักใช้ในการทดสอบหน่วย เช่น ที่นี่

อีกทางเลือกหนึ่งสำหรับการทดสอบในเครื่องคือการเริ่มกระบวนการบนเครื่องในเครื่อง ดูตัวอย่างของแนวทางนี้ใน การฝึกอบรม Multi-worker กับ Keras

สร้างอินสแตนซ์ ParameterServerStrategy

ก่อนที่คุณจะดำดิ่งลงไปในโค้ดการฝึก เรามาสร้างอินสแตนซ์ของอ็อบเจกต์ ParameterServerStrategy กันเสียก่อน โปรดทราบว่าสิ่งนี้จำเป็น ไม่ว่าคุณจะใช้ Model.fit หรือ Training Loop แบบกำหนดเองก็ตาม อาร์กิวเมนต์ variable_partitioner จะอธิบายไว้ในส่วนการแบ่งกลุ่มย่อยของ ตัวแปร

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

หากต้องการใช้ GPU ในการฝึกอบรม ให้จัดสรร GPU ที่พนักงานแต่ละคนมองเห็นได้ ParameterServerStrategy จะใช้ GPU ที่มีอยู่ทั้งหมดสำหรับผู้ปฏิบัติงานแต่ละคน โดยมีข้อจำกัดว่าพนักงานทุกคนควรมี GPU จำนวนเท่ากัน

การแบ่งส่วนตัวแปร

การแบ่งส่วนตัวแปรหมายถึงการแบ่งตัวแปรออกเป็นตัวแปรย่อยๆ หลายๆ ตัว ซึ่งเรียกว่า ชา ร์ด การแบ่งกลุ่มย่อยแบบแปรผันอาจมีประโยชน์ในการกระจายโหลดของเครือข่ายเมื่อเข้าถึงชาร์ดเหล่านี้ นอกจากนี้ยังเป็นประโยชน์ในการกระจายการคำนวณและการจัดเก็บตัวแปรปกติในเซิร์ฟเวอร์พารามิเตอร์หลายตัว

ในการเปิดใช้งานการแบ่งกลุ่มย่อยตัวแปร คุณสามารถส่งผ่าน variable_partitioner เมื่อสร้างอ็อบเจ็กต์ ParameterServerStrategy variable_partitioner จะถูกเรียกใช้ทุกครั้งที่สร้างตัวแปร และคาดว่าจะส่งคืนจำนวนส่วนแบ่งข้อมูลตามแต่ละส่วนของตัวแปร variable_partitioner บางตัวที่พร้อมใช้งาน เช่น tf.distribute.experimental.partitioners.MinSizePartitioner ขอแนะนำให้ใช้ตัวแบ่งพาร์ติชันตามขนาด เช่น tf.distribute.experimental.partitioners.MinSizePartitioner เพื่อหลีกเลี่ยงการแบ่งพาร์ติชันตัวแปรขนาดเล็ก ซึ่งอาจส่งผลเสียต่อความเร็วในการฝึกโมเดล

เมื่อ variable_partitioner ถูกส่งผ่านไป และหากคุณสร้างตัวแปรโดยตรงภายใต้ strategy.scope() ตัวแปรนั้นจะกลายเป็นประเภทคอนเทนเนอร์ที่มีคุณสมบัติ variables ซึ่งให้การเข้าถึงรายการชาร์ด ในกรณีส่วนใหญ่ คอนเทนเนอร์นี้จะถูกแปลงเป็นเทนเซอร์โดยอัตโนมัติโดยการต่อชาร์ดทั้งหมดเข้าด้วยกัน เป็นผลให้สามารถใช้เป็นตัวแปรปกติได้ ในทางกลับกัน บางวิธีของ TensorFlow เช่น tf.nn.embedding_lookup ให้การใช้งานที่มีประสิทธิภาพสำหรับคอนเทนเนอร์ประเภทนี้ และจะหลีกเลี่ยงการต่อกันอัตโนมัติในวิธีการเหล่านี้

โปรดดูเอกสาร API ของ tf.distribute.experimental.ParameterServerStrategy สำหรับรายละเอียดเพิ่มเติม

เทรนนิ่งกับ Model.fit

Keras จัดเตรียม API การฝึกอบรมที่ใช้งานง่ายผ่าน Model.fit ที่จัดการลูปการฝึกภายใต้ประทุน ด้วยความยืดหยุ่นของ train_step ที่เขียนทับได้ และการเรียกกลับ ซึ่งมีฟังก์ชันต่างๆ เช่น การบันทึกจุดตรวจหรือการบันทึกสรุปสำหรับ TensorBoard ด้วย Model.fit รหัสการฝึกอบรมเดียวกันสามารถใช้สำหรับกลยุทธ์อื่นด้วยการสลับวัตถุกลยุทธ์อย่างง่าย

ป้อนข้อมูล

Model.fit ที่มีการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์กำหนดให้ป้อนข้อมูลใน callable ที่รับอาร์กิวเมนต์ประเภท tf.distribute.InputContext และส่งคืน tf.data.Dataset จากนั้น สร้างอ็อบเจ็กต์ tf.keras.utils.experimental.DatasetCreator ที่รับ callable ดังกล่าว และอ็อบเจ็กต์ tf.distribute.InputOptions ทางเลือก ผ่านอาร์กิวเมนต์ input_options

โปรดทราบว่าขอแนะนำให้สับเปลี่ยนและทำซ้ำข้อมูลด้วยการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ และระบุ steps_per_epoch ในการเรียก fit เพื่อให้ไลบรารีทราบขอบเขตของยุค

โปรดดูบทแนะนำการ ป้อนข้อมูลแบบกระจาย สำหรับข้อมูลเพิ่มเติมเกี่ยวกับอาร์กิวเมนต์ InputContext

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

รหัสใน dataset_fn จะถูกเรียกใช้บนอุปกรณ์อินพุต ซึ่งมักจะเป็น CPU ในเครื่องของผู้ปฏิบัติงานแต่ละเครื่อง

การสร้างแบบจำลองและการรวบรวม

ตอนนี้ คุณจะสร้าง tf.keras.Modeltf.keras.models.Sequential model สำหรับจุดประสงค์ในการสาธิต—ตามด้วยการเรียก Model.compile เพื่อรวมส่วนประกอบต่างๆ เช่น ตัวเพิ่มประสิทธิภาพ ตัวชี้วัด หรือพารามิเตอร์ เช่น steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

โทรกลับและการฝึกอบรม

ก่อนที่คุณจะเรียกใช้ model.fit สำหรับการฝึกอบรมจริง มาเตรียมการเรียกกลับที่จำเป็นสำหรับงานทั่วไปก่อน เช่น:

  • ModelCheckpoint : เพื่อบันทึกน้ำหนักของแบบจำลอง
  • BackupAndRestore : เพื่อให้แน่ใจว่ามีการสำรองข้อมูลความคืบหน้าของการฝึกอบรมโดยอัตโนมัติ และกู้คืนได้หากคลัสเตอร์ประสบกับความไม่พร้อมใช้งาน (เช่น การยกเลิกหรือการสั่งจอง) หรือ
  • TensorBoard : เพื่อบันทึกรายงานความคืบหน้าลงในไฟล์สรุป ซึ่งแสดงเป็นภาพในเครื่องมือ TensorBoard
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

การใช้งานโดยตรงกับ ClusterCoordinator (ไม่บังคับ)

แม้ว่าคุณจะเลือกเส้นทางการฝึก Model.fit คุณยังสามารถสร้างอินสแตนซ์อ็อบเจ็กต์ tf.distribute.experimental.coordinator.ClusterCoordinator เพื่อจัดกำหนดการฟังก์ชันอื่นๆ ที่คุณต้องการให้ดำเนินการกับผู้ปฏิบัติงานได้ ดูส่วน การฝึกอบรมด้วยลูปการฝึกอบรมแบบกำหนดเอง สำหรับรายละเอียดเพิ่มเติมและตัวอย่าง

การฝึกด้วยลูปการฝึกแบบกำหนดเอง

การใช้ลูปการฝึกแบบกำหนดเองกับ tf.distribute.Strategy ให้ความยืดหยุ่นอย่างมากในการกำหนดลูปการฝึก ด้วย ParameterServerStrategy ที่กำหนดไว้ข้างต้น (เป็น strategy ) คุณจะใช้ tf.distribute.experimental.coordinator.ClusterCoordinator เพื่อส่งการดำเนินการตามขั้นตอนการฝึกอบรมไปยังผู้ปฏิบัติงานระยะไกล

จากนั้น คุณจะสร้างแบบจำลอง กำหนดชุดข้อมูลและฟังก์ชันขั้นตอน ดังที่คุณได้ทำในลูปการฝึกกับ tf.distribute.Strategy อื่น ๆ คุณสามารถดูรายละเอียดเพิ่มเติมได้ในการ ฝึกอบรมแบบกำหนดเองด้วยบทช่วยสอน tf.distribute.Strategy

เพื่อให้แน่ใจว่าการดึงข้อมูลชุดข้อมูลล่วงหน้ามีประสิทธิภาพ ให้ใช้ API การสร้างชุดข้อมูลแบบกระจายที่แนะนำที่กล่าวถึงในส่วน ขั้นตอนการฝึกอบรม Dispatch ไปยังผู้ปฏิบัติงานระยะไกล ด้านล่าง นอกจากนี้ อย่าลืมโทรหา Strategy.run ภายใน worker_fn เพื่อใช้ประโยชน์จาก GPU ที่จัดสรรให้กับพนักงานอย่างเต็มที่ ขั้นตอนที่เหลือจะเหมือนกันสำหรับการฝึกแบบมีหรือไม่มี GPU

มาสร้างส่วนประกอบเหล่านี้ในขั้นตอนต่อไปนี้:

ตั้งค่าข้อมูล

ขั้นแรก ให้เขียนฟังก์ชันที่สร้างชุดข้อมูลที่มีตรรกะการประมวลผลล่วงหน้าซึ่งใช้งานโดยเลเยอร์การประมวลผลล่วงหน้าของ Keras

คุณจะสร้างเลเยอร์เหล่านี้นอก dataset_fn แต่ใช้การแปลงภายใน dataset_fn เนื่องจากคุณจะรวม dataset_fn ไว้ใน tf.function ซึ่งไม่อนุญาตให้สร้างตัวแปรภายใน

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

สร้างตัวอย่างของเล่นในชุดข้อมูล:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

จากนั้น สร้างชุดข้อมูลการฝึกอบรมที่ห่อหุ้ม dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

สร้างแบบจำลอง

ถัดไป สร้างแบบจำลองและวัตถุอื่นๆ อย่าลืมสร้างตัวแปรทั้งหมดภายใต้ strategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

มายืนยันว่าการใช้ FixedShardsPartitioner แบ่งตัวแปรทั้งหมดออกเป็นสองส่วนย่อย และแต่ละส่วนถูกกำหนดให้กับเซิร์ฟเวอร์พารามิเตอร์ที่แตกต่างกัน:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

กำหนดขั้นตอนการฝึก

ประการที่สาม สร้างขั้นตอนการฝึกอบรมที่รวมไว้ใน tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

ในฟังก์ชันขั้นตอนการฝึกอบรมด้านบน การเรียก Strategy.run และ Strategy.reduce ใน step_fn สามารถรองรับ GPU ได้หลายตัวต่อพนักงานหนึ่งคน หากผู้ปฏิบัติงานมีการจัดสรร GPU Strategy.run จะแจกจ่ายชุดข้อมูลบนแบบจำลองหลายรายการ

ส่งขั้นตอนการฝึกอบรมให้กับพนักงานทางไกล

หลังจากที่การคำนวณทั้งหมดถูกกำหนดโดย ParameterServerStrategy คุณจะใช้คลาส tf.distribute.experimental.coordinator.ClusterCoordinator เพื่อสร้างทรัพยากรและแจกจ่ายขั้นตอนการฝึกอบรมให้กับผู้ปฏิบัติงานระยะไกล

ขั้นแรกให้สร้างวัตถุ ClusterCoordinator และส่งผ่านในวัตถุกลยุทธ์:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

จากนั้น สร้างชุดข้อมูลต่อผู้ปฏิบัติงานและตัววนซ้ำ ใน per_worker_dataset_fn ด้านล่าง แนะนำให้ห่อ dataset_fn ลงใน strategy.distribute_datasets_from_function เพื่อให้การดึงข้อมูลล่วงหน้าไปยัง GPU ล่วงหน้าอย่างมีประสิทธิภาพได้อย่างราบรื่น

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

ขั้นตอนสุดท้ายคือการกระจายการคำนวณไปยังผู้ปฏิบัติงานระยะไกลโดยใช้ ClusterCoordinator.schedule :

  • วิธีการ schedule เข้าคิว tf.function และส่งคืน RemoteValue ที่เหมือนอนาคตทันที ฟังก์ชันที่อยู่ในคิวจะถูกส่งไปยังผู้ปฏิบัติงานระยะไกลในเธรดพื้นหลัง และ RemoteValue จะถูกเติมแบบอะซิงโครนัส
  • วิธีการ join ( ClusterCoordinator.join ) สามารถใช้เพื่อรอจนกว่าฟังก์ชันที่กำหนดเวลาไว้ทั้งหมดจะถูกดำเนินการ
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

นี่คือวิธีที่คุณสามารถดึงผลลัพธ์ของ RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
ตัวยึดตำแหน่ง23

อีกวิธีหนึ่ง คุณสามารถเริ่มขั้นตอนทั้งหมดและทำบางสิ่งในขณะที่รอให้เสร็จสิ้นได้:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

สำหรับเวิร์กโฟลว์การฝึกอบรมและการให้บริการที่สมบูรณ์สำหรับตัวอย่างนี้ โปรดดู การทดสอบ นี้

ข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูล

ชุดข้อมูลในโค้ดด้านบนถูกสร้างขึ้นโดยใช้ ClusterCoordinator.create_per_worker_dataset API) มันสร้างชุดข้อมูลหนึ่งชุดต่อผู้ปฏิบัติงานและส่งคืนวัตถุคอนเทนเนอร์ คุณสามารถเรียกใช้เมธอด iter เพื่อสร้างตัววนซ้ำต่อคนได้ ตัววนซ้ำต่อผู้ปฏิบัติงานประกอบด้วยตัววนซ้ำหนึ่งตัวต่อผู้ปฏิบัติงาน และส่วนที่เกี่ยวข้องของผู้ปฏิบัติงานจะถูกแทนที่ในอาร์กิวเมนต์อินพุตของฟังก์ชันที่ส่งผ่านไปยังเมธอด ClusterCoordinator.schedule ก่อนที่ฟังก์ชันจะดำเนินการกับผู้ปฏิบัติงานรายใดรายหนึ่ง

ในปัจจุบัน เมธอด ClusterCoordinator.schedule ถือว่าผู้ปฏิบัติงานเทียบเท่า ดังนั้นจึงถือว่าชุดข้อมูลของผู้ปฏิบัติงานต่างกันเหมือนกัน ยกเว้นอาจถูกสับเปลี่ยนต่างกันหากมีการดำเนินการ Dataset.shuffle ด้วยเหตุนี้ จึงขอแนะนำให้ใช้ชุดข้อมูลซ้ำโดยไม่มีกำหนด และคุณจัดกำหนดการขั้นตอนจำนวนจำกัด แทนที่จะอาศัย OutOfRangeError จากชุดข้อมูล

หมายเหตุสำคัญอีกประการหนึ่งคือ ชุดข้อมูล tf.data ไม่สนับสนุนการทำให้เป็นอนุกรมและดีซีเรียลไลซ์เซชั่นโดยปริยายข้ามขอบเขตงาน ดังนั้นจึงเป็นสิ่งสำคัญที่จะสร้างชุดข้อมูลทั้งหมดภายในฟังก์ชันที่ส่งผ่านไปยัง ClusterCoordinator.create_per_worker_dataset

การประเมิน

มีมากกว่าหนึ่งวิธีในการกำหนดและเรียกใช้วงจรการประเมินในการฝึกอบรมแบบกระจาย แต่ละคนมีข้อดีและข้อเสียของตัวเองตามที่อธิบายไว้ด้านล่าง แนะนำให้ใช้วิธีการประเมินแบบอินไลน์หากคุณไม่ต้องการ

การประเมินแบบอินไลน์

ในวิธีนี้ ผู้ประสานงานจะสลับกันระหว่างการฝึกอบรมและการประเมิน ดังนั้นจึงเรียกว่า การประเมินแบบอินไลน์

การประเมินแบบอินไลน์มีประโยชน์หลายประการ ตัวอย่างเช่น:

  • สามารถรองรับแบบจำลองการประเมินขนาดใหญ่และชุดข้อมูลการประเมินที่งานเดียวไม่สามารถถือครองได้
  • ผลการประเมินสามารถใช้ในการตัดสินใจสำหรับการฝึกอบรมในยุคต่อไปได้

มีสองวิธีในการดำเนินการประเมินแบบอินไลน์: การประเมินโดยตรงและการประเมินแบบกระจาย

  • การประเมินโดยตรง : สำหรับแบบจำลองขนาดเล็กและชุดข้อมูลการประเมิน ผู้ประสานงานสามารถเรียกใช้การประเมินโดยตรงบนแบบจำลองแบบกระจายด้วยชุดข้อมูลการประเมินบนผู้ประสานงาน:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • การประเมินแบบกระจาย : สำหรับโมเดลขนาดใหญ่หรือชุดข้อมูลที่ไม่สามารถรันได้โดยตรงบนผู้ประสานงาน งานผู้ประสานงานสามารถแจกจ่ายงานการประเมินให้กับผู้ปฏิบัติงานผ่าน ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

การประเมินด้านข้างรถ

อีกวิธีหนึ่งเรียกว่า การประเมินรถด้านข้าง ซึ่งคุณสร้างงานผู้ประเมินเฉพาะที่อ่านจุดตรวจซ้ำๆ และดำเนินการประเมินบนจุดตรวจล่าสุด จะช่วยให้โปรแกรมการฝึกอบรมของคุณเสร็จสิ้นก่อนเวลาอันควร หากคุณไม่ต้องการเปลี่ยนรอบการฝึกตามผลการประเมิน อย่างไรก็ตาม จำเป็นต้องมีงานผู้ประเมินเพิ่มเติมและจุดตรวจเป็นระยะเพื่อกระตุ้นการประเมิน ต่อไปนี้เป็นวงจรการประเมินด้านข้างรถที่เป็นไปได้:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

กลุ่มในโลกแห่งความเป็นจริง

ในสภาพแวดล้อมการผลิตจริง คุณจะรันงานทั้งหมดในกระบวนการต่าง ๆ บนเครื่องที่แตกต่างกัน วิธีที่ง่ายที่สุดในการกำหนดค่าข้อมูลคลัสเตอร์ในแต่ละงานคือการตั้งค่าตัวแปรสภาพแวดล้อม "TF_CONFIG" และใช้ tf.distribute.cluster_resolver.TFConfigClusterResolver เพื่อแยกวิเคราะห์ "TF_CONFIG"

สำหรับคำอธิบายทั่วไปเกี่ยวกับตัวแปรสภาพแวดล้อม "TF_CONFIG" โปรดดูคู่มือ การฝึกอบรมแบบกระจาย

หากคุณเริ่มการฝึกอบรมโดยใช้ Kubernetes หรือเทมเพลตการกำหนดค่าอื่นๆ มีแนวโน้มสูงที่เทมเพลตเหล่านี้จะตั้งค่า “TF_CONFIG" ให้คุณแล้ว

ตั้งค่าตัวแปรสภาพแวดล้อม "TF_CONFIG"

สมมติว่าคุณมี 3 ผู้ปฏิบัติงานและ 2 เซิร์ฟเวอร์พารามิเตอร์ "TF_CONFIG" ของผู้ปฏิบัติงาน 1 สามารถเป็น:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" ของผู้ประเมินสามารถ:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

ส่วน "cluster" ในสตริง "TF_CONFIG" ด้านบนสำหรับผู้ประเมินเป็นทางเลือก

หากคุณใช้เลขฐานสองเดียวกันสำหรับงานทั้งหมด

หากคุณต้องการรันงานเหล่านี้ทั้งหมดโดยใช้ไบนารีเดียว คุณจะต้องปล่อยให้โปรแกรมของคุณแยกสาขาออกเป็นบทบาทต่างๆ ในตอนเริ่มต้น:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

รหัสต่อไปนี้เริ่มต้นเซิร์ฟเวอร์ TensorFlow และรอ:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

การจัดการความล้มเหลวของงาน

ความล้มเหลวของผู้ปฏิบัติงาน

tf.distribute.experimental.coordinator.ClusterCoordinator หรือ Model.fit ให้ความทนทานต่อข้อผิดพลาดในตัวสำหรับความล้มเหลวของผู้ปฏิบัติงาน เมื่อกู้คืนผู้ปฏิบัติงาน ฟังก์ชันชุดข้อมูลที่ให้ไว้ก่อนหน้านี้ (ทั้ง ClusterCoordinator.create_per_worker_dataset สำหรับลูปการฝึกที่กำหนดเอง หรือ tf.keras.utils.experimental.DatasetCreator สำหรับ Model.fit ) จะถูกเรียกใช้บนคนงานเพื่อสร้างชุดข้อมูลใหม่

เซิร์ฟเวอร์พารามิเตอร์หรือความล้มเหลวของผู้ประสานงาน

อย่างไรก็ตาม เมื่อผู้ประสานงานเห็นข้อผิดพลาดของเซิร์ฟเวอร์พารามิเตอร์ จะทำให้เกิด UnavailableError หรือ AbortedError ทันที คุณสามารถรีสตาร์ทผู้ประสานงานได้ในกรณีนี้ ผู้ประสานงานเองก็อาจใช้งานไม่ได้เช่นกัน ดังนั้นจึงแนะนำให้ใช้เครื่องมือบางอย่างเพื่อไม่ให้สูญเสียความคืบหน้าในการฝึกอบรม:

  • สำหรับ Model.fit คุณควรใช้การเรียกกลับ BackupAndRestore ซึ่งจัดการการบันทึกและกู้คืนความคืบหน้าโดยอัตโนมัติ ดูตัวอย่างการ โทรกลับและการฝึกอบรม ด้านบน

  • สำหรับลูปการฝึกแบบกำหนดเอง คุณควรตรวจสอบตัวแปรโมเดลเป็นระยะๆ และโหลดตัวแปรโมเดลจากจุดตรวจสอบ หากมี ก่อนที่การฝึกจะเริ่มต้น ความคืบหน้าของการฝึกอบรมสามารถอนุมานได้โดยประมาณจาก optimizer.iterations การวนซ้ำ หากจุดตรวจสอบเครื่องมือเพิ่มประสิทธิภาพ:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

กำลังดึงค่า RemoteValue

การดึงข้อมูล RemoteValue รับประกันว่าจะสำเร็จหากดำเนินการฟังก์ชันสำเร็จ เนื่องจากขณะนี้ค่าตอบแทนจะถูกคัดลอกไปยังผู้ประสานงานทันทีหลังจากดำเนินการฟังก์ชัน หากมีข้อผิดพลาดของผู้ปฏิบัติงานในระหว่างการคัดลอก ฟังก์ชันนี้จะถูกลองอีกครั้งกับผู้ปฏิบัติงานอื่นที่มีอยู่ ดังนั้น ถ้าคุณต้องการปรับให้เหมาะสมสำหรับประสิทธิภาพ คุณสามารถกำหนดเวลาฟังก์ชันโดยไม่มีค่าส่งคืนได้

การรายงานข้อผิดพลาด

เมื่อผู้ประสานงานพบข้อผิดพลาด เช่น UnavailableError จากเซิร์ฟเวอร์พารามิเตอร์ หรือข้อผิดพลาดของแอปพลิเคชันอื่นๆ เช่น InvalidArgument จาก tf.debugging.check_numerics ระบบจะยกเลิกฟังก์ชันที่รอดำเนินการและอยู่ในคิวทั้งหมดก่อนที่จะสร้างข้อผิดพลาด การดึงข้อมูล RemoteValue ที่เกี่ยวข้องจะทำให้เกิด CancelledError

หลังจากเกิดข้อผิดพลาด ผู้ประสานงานจะไม่แจ้งข้อผิดพลาดเดิมหรือข้อผิดพลาดใดๆ จากฟังก์ชันที่ถูกยกเลิก

การปรับปรุงประสิทธิภาพ

มีสาเหตุที่เป็นไปได้หลายประการ หากคุณพบปัญหาด้านประสิทธิภาพเมื่อคุณฝึกด้วย ParameterServerStrategy และ ClusterResolver

สาเหตุทั่วไปประการหนึ่งคือเซิร์ฟเวอร์พารามิเตอร์มีโหลดที่ไม่สมดุล และเซิร์ฟเวอร์พารามิเตอร์ที่โหลดหนักบางตัวมีความจุถึงขีดจำกัดแล้ว นอกจากนี้ยังสามารถมีได้หลายสาเหตุ วิธีง่ายๆ ในการแก้ไขปัญหานี้คือ:

  1. แบ่งส่วนตัวแปรโมเดลขนาดใหญ่ของคุณผ่านการระบุ variable_partitioner เมื่อสร้าง ParameterServerStrategy
  2. หลีกเลี่ยงการสร้างตัวแปรฮอตสปอตที่เซิร์ฟเวอร์พารามิเตอร์ทั้งหมดต้องการในขั้นตอนเดียว ถ้าเป็นไปได้ ตัวอย่างเช่น ใช้อัตราการเรียนรู้คงที่หรือคลาสย่อย tf.keras.optimizers.schedules.LearningRateSchedule ในตัวเพิ่มประสิทธิภาพเนื่องจากพฤติกรรมเริ่มต้นคืออัตราการเรียนรู้จะกลายเป็นตัวแปรที่วางอยู่บนเซิร์ฟเวอร์พารามิเตอร์เฉพาะและร้องขอโดยเซิร์ฟเวอร์พารามิเตอร์อื่นทั้งหมดในแต่ละขั้นตอน .
  3. สับเปลี่ยนคำศัพท์ขนาดใหญ่ของคุณก่อนที่จะส่งต่อไปยังเลเยอร์การประมวลผลล่วงหน้าของ Keras

อีกสาเหตุที่เป็นไปได้สำหรับปัญหาด้านประสิทธิภาพคือผู้ประสานงาน การใช้งาน schedule / join ครั้งแรกของคุณเป็นแบบ Python และอาจมีค่าใช้จ่ายในการทำเธรด นอกจากนี้ เวลาแฝงระหว่างผู้ประสานงานและพนักงานอาจมีขนาดใหญ่ หากเป็นกรณีนี้

  • สำหรับ Model.fit คุณสามารถตั้งค่าอาร์กิวเมนต์ steps_per_execution ที่ Model.compile เป็นค่าที่มากกว่า 1

  • สำหรับลูปการฝึกแบบกำหนดเอง คุณสามารถแพ็คหลายขั้นตอนไว้ใน tf.function เดียว:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

เนื่องจากไลบรารีได้รับการปรับให้เหมาะสมยิ่งขึ้น หวังว่าผู้ใช้ส่วนใหญ่จะไม่ต้องแพ็คขั้นตอนด้วยตนเองอีกในอนาคต

นอกจากนี้ เคล็ดลับเล็กๆ น้อยๆ สำหรับการปรับปรุงประสิทธิภาพคือการจัดกำหนดการฟังก์ชันโดยไม่มีค่าส่งคืนตามที่อธิบายไว้ในส่วนการจัดการความล้มเหลวของงานด้านบน

ข้อจำกัดที่ทราบ

ข้อจำกัดที่ทราบส่วนใหญ่มีอยู่แล้วในหัวข้อข้างต้น ส่วนนี้ให้ข้อมูลสรุป

ParameterServerStrategy ทั่วไป

  • os.environment["grpc_fail_fast"]="use_caller" เป็นสิ่งจำเป็นในทุกงานรวมถึงผู้ประสานงาน เพื่อให้การทนต่อข้อผิดพลาดทำงานได้อย่างถูกต้อง
  • ไม่รองรับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์แบบซิงโครนัส
  • โดยปกติจำเป็นต้องรวมหลายขั้นตอนไว้ในฟังก์ชันเดียวเพื่อให้ได้ประสิทธิภาพสูงสุด
  • ไม่รองรับการโหลด save_model ผ่าน tf.saved_model.load ที่มีตัวแปรชาร์ด หมายเหตุการโหลด save_model โดยใช้ TensorFlow Serving นั้นคาดว่าจะใช้งานได้
  • ไม่รองรับการโหลดจุดตรวจสอบที่มีตัวแปรช่องเครื่องมือเพิ่มประสิทธิภาพการแบ่งส่วนข้อมูลในส่วนแบ่งข้อมูลจำนวนอื่น
  • ไม่สนับสนุนการกู้คืนจากความล้มเหลวของเซิร์ฟเวอร์พารามิเตอร์โดยไม่ต้องรีสตาร์ทงานผู้ประสานงาน
  • การใช้ tf.lookup.StaticHashTable (ซึ่งโดยทั่วไปจะใช้โดยเลเยอร์การประมวลผลล่วงหน้าของ Keras เช่น tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup และ tf.keras.layers.TextVectorization ) ส่งผลให้เกิดทรัพยากรที่วางอยู่บน ผู้ประสานงานในเวลานี้กับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ สิ่งนี้มีผลกระทบด้านประสิทธิภาพสำหรับการค้นหา RPC จากผู้ปฏิบัติงานไปยังผู้ประสานงาน นี้เป็นลำดับความสำคัญสูงในปัจจุบันที่จะกล่าวถึง

Model.fit เฉพาะ

  • จำเป็นต้องมีอาร์กิวเมนต์ steps_per_epoch ใน Model.fit คุณสามารถเลือกค่าที่ให้ช่วงเวลาที่เหมาะสมในยุคหนึ่งได้
  • ParameterServerStrategy ไม่รองรับการเรียกกลับแบบกำหนดเองที่มีการเรียกระดับแบทช์เพื่อเหตุผลด้านประสิทธิภาพ คุณควรแปลงการโทรเหล่านั้นเป็นการเรียกระดับ epoch โดยเลือก steps_per_epoch อย่างเหมาะสม เพื่อให้ถูกเรียกทุกจำนวน steps_per_epoch การเรียกกลับในตัวจะไม่ได้รับผลกระทบ: การเรียกระดับแบทช์ของพวกเขาได้รับการแก้ไขเพื่อให้มีประสิทธิภาพ กำลังวางแผนรองรับการเรียกระดับชุดงานสำหรับ ParameterServerStrategy
  • ด้วยเหตุผลเดียวกัน แถบความคืบหน้าและเมตริกต่างจากกลยุทธ์อื่นๆ ที่บันทึกที่ขอบเขตยุคเท่านั้น
  • ไม่รองรับ run_eagerly

ข้อมูลเฉพาะของลูปการฝึกแบบกำหนดเอง