บันทึกวันที่! Google I / O ส่งคืนวันที่ 18-20 พฤษภาคม ลงทะเบียนตอนนี้
หน้านี้ได้รับการแปลโดย Cloud Translation API
Switch to English

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์

ดูใน TensorFlow.org เรียกใช้ใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดสมุดบันทึก

ภาพรวม

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ เป็นวิธีการขนานข้อมูลทั่วไปในการปรับขนาดการฝึกอบรมแบบจำลองบนเครื่องหลายเครื่อง คลัสเตอร์การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ประกอบด้วยคนงานและเซิร์ฟเวอร์พารามิเตอร์ ตัวแปรถูกสร้างขึ้นบนเซิร์ฟเวอร์พารามิเตอร์และจะถูกอ่านและอัพเดตโดยผู้ปฏิบัติงานในแต่ละขั้นตอน ตามค่าเริ่มต้นคนงานจะอ่านและอัปเดตตัวแปรเหล่านี้อย่างอิสระโดยไม่ต้องซิงโครไนซ์ซึ่งกันและกัน นี่คือเหตุผลที่บางครั้งการฝึกอบรมรูปแบบเซิร์ฟเวอร์พารามิเตอร์เรียกว่าการฝึกอบรมแบบอะซิงโครนัส

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ TensorFlow 2 ใช้ตัวประสานกลางผ่านคลาส tf.distribute.experimental.coordinator.ClusterCoordinator

ในการนำไปใช้งานนี้ภารกิจของ worker และ parameter server รัน tf.distribute.Server ที่รับคำร้องขอจากผู้ประสานงาน ผู้ประสานงานสร้างทรัพยากรส่งงานฝึกอบรมเขียนจุดตรวจและจัดการกับความล้มเหลวของงาน

เราเชื่อว่าสถาปัตยกรรมนี้และคลาส ClusterCoordinator ใหม่ให้รูปแบบการเขียนโปรแกรมที่ยืดหยุ่นและง่ายขึ้น

ClusterCoordinator

คลาส ClusterCoordinator จำเป็นต้องทำงานร่วมกับวัตถุ tf.distribute.Strategy วัตถุ tf.distribute.Strategy นี้จำเป็นในการส่งผ่านข้อมูลของคลัสเตอร์และใช้เพื่อกำหนดขั้นตอนการฝึกอบรมตามที่เราเห็นใน การฝึกอบรมแบบกำหนดเองด้วย MirroredStrategy จากนั้นวัตถุ ClusterCoordinator จะส่งการดำเนินการตามขั้นตอนการฝึกอบรมเหล่านี้ไปยังผู้ปฏิบัติงานระยะไกล ปัจจุบัน ClusterCoordinator ใช้งานได้กับ tf.distribute.experimental.ParameterServerStrategy เท่านั้น

API ที่สำคัญที่สุดที่มีให้โดยอ็อบเจ็กต์ ClusterCoordinator คือ schedule schedule API จะสร้าง tf.function และส่งคืน RemoteValue เหมือน RemoteValue ทันที ฟังก์ชันที่อยู่ในคิวจะถูกส่งไปยังผู้ปฏิบัติงานระยะไกลในเธรดพื้นหลังและ RemoteValue ของพวกเขาจะถูกเติมแบบอะซิงโครนัส เนื่องจาก schedule ไม่จำเป็นต้องมีการมอบหมายผู้ปฏิบัติงาน tf.function ส่งผ่านจึงสามารถดำเนินการกับผู้ปฏิบัติงานที่มีอยู่ได้ หากผู้ปฏิบัติงานที่ถูกเรียกใช้งานไม่พร้อมใช้งานก่อนที่จะเสร็จสิ้นฟังก์ชันนี้จะถูกลองอีกครั้งกับผู้ปฏิบัติงานคนอื่นที่มีอยู่ เนื่องจากข้อเท็จจริงนี้และความจริงที่ว่าการเรียกใช้ฟังก์ชันไม่ใช่อะตอมฟังก์ชันจึงอาจถูกเรียกใช้งานได้มากกว่าหนึ่งครั้ง

นอกเหนือจากการจัดส่งฟังก์ชันระยะไกลแล้ว ClusterCoordinator ยังช่วยสร้างชุดข้อมูลสำหรับผู้ปฏิบัติงานทั้งหมดและสร้างชุดข้อมูลเหล่านี้ใหม่เมื่อผู้ปฏิบัติงานกู้คืนจากความล้มเหลว

การตั้งค่าบทช่วยสอน

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

การตั้งค่าคลัสเตอร์

ดังที่ได้กล่าวไว้ข้างต้นคลัสเตอร์การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ต้องการงานผู้ประสานงานที่รันโปรแกรมการฝึกอบรมของคุณคนงานหนึ่งคนหรือหลายคนและงานเซิร์ฟเวอร์พารามิเตอร์ที่รันเซิร์ฟเวอร์ tf.distribute.Server นั่นคือ tf.distribute.Server และอาจเป็นงานการประเมินเพิ่มเติมที่รันด้านข้างรถ การประเมินผล (ดูส่วนการประเมินด้านข้างรถด้านล่าง) ข้อกำหนดในการตั้งค่ามีดังนี้:

  • งานผู้ประสานงานจำเป็นต้องทราบที่อยู่และพอร์ตของเซิร์ฟเวอร์ TensorFlow อื่น ๆ ทั้งหมดยกเว้นผู้ประเมิน
  • ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์จำเป็นต้องทราบว่าพอร์ตใดที่พวกเขาต้องฟัง เพื่อความง่ายเรามักจะส่งผ่านข้อมูลคลัสเตอร์ทั้งหมดเมื่อเราสร้างเซิร์ฟเวอร์ TensorFlow สำหรับงานเหล่านี้
  • งานผู้ประเมินไม่จำเป็นต้องทราบการตั้งค่าของคลัสเตอร์การฝึกอบรม หากเป็นเช่นนั้นก็ไม่ควรพยายามเชื่อมต่อกับคลัสเตอร์การฝึกอบรม
  • ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์ควรมีประเภทงานเป็น "ผู้ปฏิบัติงาน" และ "ps" ตามลำดับ ผู้ประสานงานควรใช้ "หัวหน้า" เป็นประเภทงานด้วยเหตุผลเดิม

ในบทช่วยสอนนี้เราจะสร้างคลัสเตอร์ในกระบวนการเพื่อให้สามารถรันการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ทั้งหมดใน colab เราจะแนะนำวิธีตั้งค่า คลัสเตอร์จริง ในส่วนต่อไป

คลัสเตอร์ในกระบวนการ

ในบทช่วยสอนนี้เราจะเริ่มต้นเซิร์ฟเวอร์ TensorFlow จำนวนมากล่วงหน้าและเชื่อมต่อกับเซิร์ฟเวอร์เหล่านี้ในภายหลัง:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

การฝึกอบรมด้วย Custom Training Loop

ลูปการฝึกแบบกำหนดเองด้วย tf.distribute.Strategy ให้ความยืดหยุ่นอย่างมากในการกำหนดลูปการฝึกอบรม ขณะนี้สำหรับการฝึกเซิร์ฟเวอร์พารามิเตอร์ใน TensorFlow 2 รองรับเฉพาะลูปการฝึกอบรมที่กำหนดเองเท่านั้น ที่นี่เราใช้ ParameterServerStrategy เพื่อกำหนดขั้นตอนการฝึกอบรมจากนั้นใช้ ClusterCoordinator เพื่อส่งการดำเนินการตามขั้นตอนการฝึกอบรมไปยังผู้ปฏิบัติงานระยะไกล

สร้าง ParameterServerStrategy

ในการเขียนขั้นตอนการฝึกในลูปการฝึกแบบกำหนดเองขั้นตอนแรกคือการสร้าง ParameterServerStrategy เราจะอธิบาย variable_partitioner ภายหลัง

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

จากนั้นคุณจะสร้างแบบจำลองกำหนดชุดข้อมูลและฟังก์ชันขั้นตอนตามที่เราเห็นในลูปการฝึกอบรมกับ tf.distribute.Strategy อื่น ๆ คุณสามารถดูรายละเอียดเพิ่มเติมได้ใน บทช่วยสอน นี้ มาสร้างส่วนประกอบเหล่านี้ตามขั้นตอนต่อไปนี้:

ตั้งค่าข้อมูล

ขั้นแรกเขียนฟังก์ชันที่สร้างชุดข้อมูลที่มีลอจิกก่อนการประมวลผลที่ดำเนินการโดยเลเยอร์ก่อนการประมวลผล Keras เราจะสร้างเลเยอร์เหล่านี้นอก dataset_fn แต่ใช้การแปลงภายใน dataset_fn เนื่องจากคุณจะรวม dataset_fn ไว้ใน tf.function ซึ่งไม่อนุญาตให้สร้างตัวแปรภายใน

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

สร้างตัวอย่างของเล่นในชุดข้อมูล:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

จากนั้นเราจะสร้างชุดข้อมูลการฝึกอบรมที่รวมอยู่ใน dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

สร้างแบบจำลอง

ประการที่สองเราสร้างแบบจำลองและวัตถุอื่น ๆ ตรวจสอบให้แน่ใจว่าได้สร้างตัวแปรทั้งหมดภายใต้ strategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

กำหนดขั้นตอนการฝึกอบรม

ประการที่สามสร้างขั้นตอนการฝึกอบรมที่รวมอยู่ใน tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

ในฟังก์ชันขั้นตอนข้างต้นการเรียก strategy.run และ strategy.reduce ใน step_fn มีประโยชน์ในการรองรับ GPU หรือผู้ปฏิบัติงานจำลองหลายคนในอนาคตแม้ว่าจะมีการใช้งานที่ไม่สำคัญในขณะนี้ก็ตาม

ส่งขั้นตอนการฝึกอบรมไปยังผู้ปฏิบัติงานระยะไกล

หลังจากการคำนวณทั้งหมดถูกกำหนดโดย ParameterServerStrategy เราจะใช้คลาส ClusterCoordinator เพื่อสร้างทรัพยากรและแจกจ่ายขั้นตอนการฝึกอบรมให้กับผู้ปฏิบัติงานระยะไกล

ก่อนอื่นเรามาสร้างวัตถุ ClusterCoordinator และส่งผ่านวัตถุกลยุทธ์:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

จากนั้นเราจะสร้างชุดข้อมูลต่อผู้ปฏิบัติงานและตัววนซ้ำ ใน per_worker_dataset_fn ด้านล่างการรวม dataset_fn ไว้ใน strategy.distribute_datasets_from_function เป็นทางเลือก แต่จะช่วยให้รองรับการดึงข้อมูลล่วงหน้าที่มีประสิทธิภาพไปยัง GPU ได้อย่างราบรื่นในอนาคตเมื่อ ParameterServerStrategy รองรับ GPU

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

ขั้นตอนสุดท้ายคือการแจกจ่ายการคำนวณไปยังผู้ปฏิบัติงานระยะไกลโดยใช้ schedule เมธอด schedule สร้าง tf.function และส่งคืน RemoteValue เหมือน RemoteValue ทันที ฟังก์ชันที่อยู่ในคิวจะถูกส่งไปยังผู้ปฏิบัติงานระยะไกลในเธรดพื้นหลังและ RemoteValue จะถูกเติมแบบอะซิงโครนัส สามารถใช้วิธีการ join เพื่อรอจนกว่าฟังก์ชันที่กำหนดเวลาไว้ทั้งหมดจะถูกแยกออก

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

นี่คือวิธีที่คุณสามารถดึงผลลัพธ์ของ RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

หรือคุณสามารถเปิดขั้นตอนทั้งหมดและทำบางอย่างในขณะที่รอให้เสร็จสิ้น:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

สำหรับขั้นตอนการฝึกอบรมและการให้บริการที่สมบูรณ์สำหรับตัวอย่างนี้โปรดดู การทดสอบ นี้

ข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูล

ชุดข้อมูลในโค้ดด้านบนสร้างขึ้นโดยใช้ create_per_worker_dataset API สร้างชุดข้อมูลหนึ่งชุดต่อผู้ปฏิบัติงานและส่งคืนอ็อบเจ็กต์คอนเทนเนอร์ คุณสามารถโทรหา iter วิธีการที่มันจะสร้าง iterator ต่องาน ตัววนซ้ำต่อผู้ปฏิบัติงานประกอบด้วยตัววนซ้ำหนึ่งตัวต่อผู้ปฏิบัติงานและชิ้นงานที่เกี่ยวข้องจะถูกแทนที่ในอาร์กิวเมนต์อินพุตของฟังก์ชันที่ส่งผ่านไปยังเมธอด schedule ก่อนที่ฟังก์ชันจะถูกเรียกใช้กับผู้ปฏิบัติงานหนึ่งคน

ขณะนี้วิธีการ schedule ถือว่าคนงานมีความเท่าเทียมกันดังนั้นจึงถือว่าชุดข้อมูลของผู้ปฏิบัติงานต่างกันเหมือนกันยกเว้นว่าอาจมีการสับเปลี่ยนต่างกันหากมีการดำเนินการ dataset.shuffle ด้วยเหตุนี้เราจึงแนะนำให้ทำซ้ำชุดข้อมูลไปเรื่อย ๆ และกำหนดจำนวนขั้นตอนที่ จำกัด แทนที่จะใช้ OutOfRangeError จากชุดข้อมูล

ข้อสังเกตที่สำคัญอีกประการหนึ่งคือชุดข้อมูล tf.data ไม่รองรับการทำให้เป็นอนุกรมโดยปริยายและการแยกสายข้อมูลข้ามขอบเขตงาน ดังนั้นจึงเป็นเรื่องสำคัญที่จะต้องสร้างชุดข้อมูลทั้งหมดภายในฟังก์ชันที่ส่งผ่านไปยัง create_per_worker_dataset

การแบ่งตัวแปร

Variable sharding หมายถึงการแบ่งตัวแปรออกเป็นตัวแปรย่อย ๆ เราเรียกตัวแปรเล็ก ๆ เหล่านี้ว่า shard s การชาร์ดแบบแปรผันอาจเป็นประโยชน์ในการกระจายโหลดเครือข่ายเมื่อเข้าถึงชาร์ดเหล่านี้ นอกจากนี้ยังมีประโยชน์ในการกระจายการคำนวณและการจัดเก็บตัวแปรปกติในเซิร์ฟเวอร์พารามิเตอร์หลายตัว

ในการเปิดใช้งานการแบ่งตัวแปรคุณสามารถส่งผ่าน variable_partitioner เมื่อสร้างออบเจ็กต์ ParameterServerStrategy variable_partitioner จะถูกเรียกทุกครั้งเมื่อมีการสร้างตัวแปรและคาดว่าจะส่งคืนจำนวนชาร์ดตามแต่ละมิติของตัวแปร variable_partitioner tf.distribute.experimental.partitioners.FixedShardsPartitioner บาง variable_partitioner มีให้เช่น tf.distribute.experimental.partitioners.FixedShardsPartitioner

ในตัวอย่างข้างต้นเราใช้ FixedShardsPartitioner ซึ่งจะแบ่งตัวแปรทั้งหมดออกเป็นสองส่วนและแต่ละชาร์ดจะถูกกำหนดให้กับเซิร์ฟเวอร์พารามิเตอร์ที่แตกต่างกัน:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

เมื่อมีการส่ง variable_partitioner มาและหากคุณสร้างตัวแปรโดยตรงภายใต้ strategy.scope() ตัวแปรนั้นจะกลายเป็นประเภทคอนเทนเนอร์ที่มีคุณสมบัติของ variables ซึ่งให้การเข้าถึงรายการของชาร์ด ในกรณีส่วนใหญ่คอนเทนเนอร์นี้จะถูกแปลงเป็น Tensor โดยอัตโนมัติโดยการต่อชิ้นส่วนทั้งหมดเข้าด้วยกัน เป็นผลให้สามารถใช้เป็นตัวแปรปกติได้ ในทางกลับกันเมธอด TensorFlow บางวิธีเช่น tf.nn.embedding_lookup ให้การใช้งานที่มีประสิทธิภาพสำหรับคอนเทนเนอร์ประเภทนี้และในวิธีการเหล่านี้จะหลีกเลี่ยงการต่อข้อมูลอัตโนมัติ

โปรดดูเอกสาร API ของ ParameterServerStrategy สำหรับรายละเอียดเพิ่มเติม

การประเมินผล

มีมากกว่าหนึ่งวิธีในการกำหนดและเรียกใช้ลูปการประเมินผลในการฝึกอบรมแบบกระจาย แต่ละข้อมีข้อดีข้อเสียตามที่อธิบายไว้ด้านล่าง ขอแนะนำให้ใช้วิธีการประเมินแบบอินไลน์หากคุณไม่มีความชอบ

การประเมินผลแบบอินไลน์

ในวิธีนี้ผู้ประสานงานจะสลับไปมาระหว่างการฝึกอบรมและการประเมินผลดังนั้นเราจึงเรียกว่าการประเมินผลแบบอินไลน์ มีประโยชน์หลายประการของการประเมินแบบอินไลน์ ตัวอย่างเช่นสามารถรองรับโมเดลการประเมินผลขนาดใหญ่และชุดข้อมูลการประเมินที่งานเดียวไม่สามารถเก็บไว้ได้ อีกตัวอย่างหนึ่งผลการประเมินสามารถใช้ในการตัดสินใจสำหรับการฝึกอบรมในยุคถัดไป

มีสองวิธีในการใช้การประเมินแบบอินไลน์:

  • การประเมินโดยตรง - สำหรับแบบจำลองขนาดเล็กและชุดข้อมูลการประเมินผู้ประสานงานสามารถเรียกใช้การประเมินผลโดยตรงบนแบบจำลองแบบกระจายพร้อมชุดข้อมูลการประเมินบนผู้ประสานงาน:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • การประเมินแบบกระจาย - สำหรับแบบจำลองขนาดใหญ่หรือชุดข้อมูลที่ไม่สามารถเรียกใช้โดยตรงบนผู้ประสานงานได้งานผู้ประสานงานสามารถแจกจ่ายงานการประเมินผลให้กับผู้ปฏิบัติงานผ่านทาง schedule / วิธีการ join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

การประเมินด้านข้างรถ

อีกวิธีหนึ่งเรียกว่าการประเมินด้านรถซึ่งเป็นการสร้างงานผู้ประเมินเฉพาะที่อ่านจุดตรวจซ้ำ ๆ และเรียกใช้การประเมินผลในจุดตรวจล่าสุด ช่วยให้โปรแกรมการฝึกอบรมของคุณเสร็จสิ้นก่อนกำหนดหากคุณไม่จำเป็นต้องเปลี่ยนลูปการฝึกอบรมตามผลการประเมิน อย่างไรก็ตามต้องมีงานผู้ประเมินเพิ่มเติมและการตรวจสอบเป็นระยะเพื่อเริ่มการประเมิน ต่อไปนี้เป็นวงรอบการประเมินรถด้านข้างที่เป็นไปได้:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

กลุ่มในโลกแห่งความจริง

ในสภาพแวดล้อมการผลิตจริงคุณจะรันงานทั้งหมดในกระบวนการต่างๆบนเครื่องจักรที่แตกต่างกัน วิธีที่ง่ายที่สุดในการกำหนดค่าข้อมูลคลัสเตอร์ในแต่ละงานคือการตั้งค่าตัวแปรสภาพแวดล้อม "TF_CONFIG" และใช้ TFConfigClusterResolver เพื่อแยกวิเคราะห์ "TF_CONFIG" สำหรับคำอธิบายทั่วไปเกี่ยวกับตัวแปรสภาพแวดล้อม "TF_CONFIG" โปรดดู คู่มือการฝึกอบรมที่เผยแพร่

หากคุณเริ่มงานการฝึกอบรมโดยใช้ Kubernetes หรือเทมเพลตการกำหนดค่าอื่น ๆ เป็นไปได้มากว่าเทมเพลตเหล่านี้ได้ตั้งค่า“ TF_CONFIG” ให้คุณแล้ว

ตั้งค่าตัวแปรสภาพแวดล้อม“ TF_CONFIG”

สมมติว่าคุณมีผู้ปฏิบัติงาน 3 คนและเซิร์ฟเวอร์พารามิเตอร์ 2 ตัว“ TF_CONFIG” ของผู้ปฏิบัติงาน 1 สามารถเป็น:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

“ TF_CONFIG” ของผู้ประเมินสามารถ:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

ส่วน "คลัสเตอร์" ในสตริง "TF_CONFIG" ด้านบนสำหรับผู้ประเมินเป็นทางเลือก

หากคุณใช้ไบนารีเดียวกันสำหรับงานทั้งหมด

หากคุณต้องการรันงานเหล่านี้ทั้งหมดโดยใช้ไบนารีเดียวคุณจะต้องปล่อยให้โปรแกรมของคุณแตกแขนงออกเป็นบทบาทที่แตกต่างกันตั้งแต่เริ่มต้น:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

รหัสต่อไปนี้เริ่มต้นเซิร์ฟเวอร์ TensorFlow และรอ:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

การจัดการงานล้มเหลว

คนงานล้มเหลว

ดังที่ได้กล่าวไว้ข้างต้น ClusterCoordinator มีความทนทานต่อความผิดพลาดในตัวสำหรับความล้มเหลวของผู้ปฏิบัติงาน เมื่อผู้ปฏิบัติงานกู้คืนชุดข้อมูลที่สอดคล้องกันที่สร้างโดย create_per_worker_dataset ที่ยังอยู่ในขอบเขตจะถูกสร้างขึ้นใหม่โดยเรียกใช้ dataset_fn เดิมที่ส่งผ่านไปยัง create_per_worker_dataset

เซิร์ฟเวอร์พารามิเตอร์หรือตัวประสานงานล้มเหลว

อย่างไรก็ตามเมื่อผู้ประสานงานเห็นข้อผิดพลาดของเซิร์ฟเวอร์พารามิเตอร์จะเพิ่ม UnavailableError หรือ AbortedError ทันที คุณสามารถรีสตาร์ทผู้ประสานงานได้ในกรณีนี้ ผู้ประสานงานเองก็ไม่สามารถใช้งานได้เช่นกัน ดังนั้นเพื่อที่จะไม่สูญเสียความคืบหน้าในการฝึกอบรมไปมากนักจึงควรตรวจสอบตัวแปรโมเดลเป็นระยะ ๆ และโหลดตัวแปรโมเดลจากจุดตรวจ (ถ้ามี) ก่อนเริ่มการฝึกอบรม ความคืบหน้าของการฝึกอบรมสามารถสรุปได้โดยประมาณจาก optimizer.iterations หากเครื่องมือเพิ่มประสิทธิภาพถูกตรวจสอบ

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

กำลังเรียก RemoteValue

การเรียก RemoteValue รับประกันว่าจะทำได้สำเร็จหากเรียกใช้ฟังก์ชันได้สำเร็จ เนื่องจากปัจจุบันค่าส่งคืนจะถูกคัดลอกไปยังตัวประสานงานทันทีหลังจากเรียกใช้ฟังก์ชัน หากมีความล้มเหลวของผู้ปฏิบัติงานในระหว่างการคัดลอกฟังก์ชันนี้จะถูกลองใหม่กับผู้ปฏิบัติงานคนอื่นที่มีอยู่ ดังนั้นหากคุณต้องการเพิ่มประสิทธิภาพการทำงานคุณสามารถกำหนดเวลาฟังก์ชันโดยไม่มีค่าส่งคืนได้

การรายงานข้อผิดพลาด

เมื่อผู้ประสานงานเห็นข้อผิดพลาดเช่น UnavailableError จากเซิร์ฟเวอร์พารามิเตอร์หรือข้อผิดพลาดของแอปพลิเคชันอื่น ๆ เช่น InvalidArgument จาก tf.debugging.check_numerics จะยกเลิกฟังก์ชันที่รอดำเนินการและอยู่ในคิวทั้งหมดก่อนที่จะเพิ่มข้อผิดพลาด การเรียกใช้ RemoteValue เกี่ยวข้องจะเพิ่ม CancelledError

หลังจากเกิดข้อผิดพลาดผู้ประสานงานจะไม่แจ้งข้อผิดพลาดเดียวกันหรือข้อผิดพลาดใด ๆ จากฟังก์ชันที่ถูกยกเลิก

การปรับปรุงประสิทธิภาพ

มีสาเหตุที่เป็นไปได้หลายประการหากคุณพบปัญหาด้านประสิทธิภาพเมื่อคุณฝึกกับ ParameterServerStrategy และ ClusterResolver

สาเหตุทั่วไปประการหนึ่งคือเซิร์ฟเวอร์พารามิเตอร์มีโหลดไม่สมดุลและเซิร์ฟเวอร์พารามิเตอร์ที่โหลดหนักบางตัวถึงขีดความสามารถแล้ว นอกจากนี้ยังสามารถมีสาเหตุหลายประการ วิธีง่ายๆในการบรรเทาปัญหานี้คือการ

  1. แบ่งตัวแปรโมเดลขนาดใหญ่ของคุณผ่านการระบุ variable_partitioner เมื่อสร้าง ParameterServerStrategy
  2. หลีกเลี่ยงการสร้างตัวแปรฮอตสปอตที่เซิร์ฟเวอร์พารามิเตอร์ทั้งหมดต้องการในขั้นตอนเดียวถ้าเป็นไปได้ ตัวอย่างเช่นใช้อัตราการเรียนรู้คงที่หรือคลาสย่อย tf.keras.optimizers.schedules.LearningRateSchedule ในเครื่องมือเพิ่มประสิทธิภาพเนื่องจากพฤติกรรมเริ่มต้นคืออัตราการเรียนรู้จะกลายเป็นตัวแปรที่วางบนเซิร์ฟเวอร์พารามิเตอร์เฉพาะและร้องขอโดยเซิร์ฟเวอร์พารามิเตอร์อื่น ๆ ในแต่ละขั้นตอน .
  3. สลับคำศัพท์ขนาดใหญ่ของคุณก่อนที่จะส่งไปยัง Keras ก่อนการประมวลผลเลเยอร์

อีกสาเหตุหนึ่งที่เป็นไปได้สำหรับปัญหาด้านประสิทธิภาพคือผู้ประสานงาน การใช้งาน schedule / join ครั้งแรกของเรานั้นใช้ Python ดังนั้นจึงอาจมีค่าใช้จ่ายในการทำเธรด นอกจากนี้เวลาแฝงระหว่างผู้ประสานงานและคนงานอาจมีมาก ในกรณีนี้คุณสามารถรวมหลายขั้นตอนไว้ใน tf.function เดียว:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

เราจะเพิ่มประสิทธิภาพผู้ประสานงานต่อไปและหวังว่าผู้ใช้ส่วนใหญ่จะไม่ต้องบรรจุขั้นตอนด้วยตนเองในอนาคต

นอกจากนี้เคล็ดลับเล็ก ๆ สำหรับการปรับปรุงประสิทธิภาพคือการจัดกำหนดการฟังก์ชันโดยไม่มีค่าส่งคืนตามที่อธิบายไว้ในส่วนการจัดการงานล้มเหลวด้านบน

ข้อ จำกัด ที่ทราบ

ข้อ จำกัด ที่ทราบส่วนใหญ่ครอบคลุมอยู่ในส่วนข้างต้น นี่คือบทสรุป:

  • os.environment["grpc_fail_fast"]="use_caller" เป็นสิ่งจำเป็นในทุกงานรวมถึงผู้ประสานงานเพื่อให้การยอมรับข้อผิดพลาดทำงานได้อย่างถูกต้อง
  • ไม่รองรับคนงาน GPU
  • ไม่สนับสนุนการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ซิงโครนัส
  • ParameterServerStrategy ไม่ทำงานกับ Keras compile และ fit API
  • ClusterCoordinator.schedule ไม่รองรับการรับประกันการเยี่ยมชมสำหรับชุดข้อมูล
  • เมื่อใช้ ClusterCoordinator.create_per_worker_dataset ต้องสร้างชุดข้อมูลทั้งหมดภายในฟังก์ชันที่ส่งผ่านไป
  • โดยปกติจำเป็นต้องบรรจุหลายขั้นตอนไว้ในฟังก์ชันเดียวเพื่อให้ได้ประสิทธิภาพที่ดีที่สุด
  • ไม่สนับสนุนการโหลด save_model ผ่าน tf.saved_model.load ที่มีตัวแปร tf.saved_model.load หมายเหตุการโหลดรุ่นที่บันทึกไว้โดยใช้ TensorFlow Serving คาดว่าจะทำงานได้
  • ไม่สนับสนุนการโหลดจุดตรวจสอบที่เชื่อมต่อตัวแปรสล็อตเครื่องมือเพิ่มประสิทธิภาพที่แตกเป็นเศษเล็กเศษน้อยลงในจำนวนชิ้นส่วนที่แตกต่างกัน
  • ไม่สนับสนุนการกู้คืนจากความล้มเหลวของเซิร์ฟเวอร์พารามิเตอร์โดยไม่ต้องรีสตาร์ทงานผู้ประสานงาน