השב / י לאירוע TensorFlow Everywhere המקומי שלך היום!
דף זה תורגם על ידי Cloud Translation API.
Switch to English

הכשרת שרתים פרמטרים

צפה ב- TensorFlow.org הפעל בגוגל קולאב צפה במקור ב- GitHub הורד מחברת

סקירה כללית

אימון לשרת פרמטרים הוא שיטה מקבילה לנתונים המקבילה להגדלת אימוני מודלים במספר מכונות. מקבץ אימונים לשרת פרמטרים מורכב מעובדים ושרתי פרמטרים. משתנים נוצרים בשרתי פרמטרים והם נקראים ומתעדכנים על ידי עובדים בכל שלב. כברירת מחדל, עובדים קוראים ומעדכנים משתנים אלה באופן עצמאי מבלי להסתנכרן זה עם זה. זו הסיבה שלעיתים אימונים בסגנון שרת פרמטרים נקראים אימון אסינכרוני.

הכשרת שרתים פרמטרים של TensorFlow 2 משתמשת במתאם מרכזי דרך המחלקה tf.distribute.experimental.coordinator.ClusterCoordinator .

ביישום זה משימות parameter server worker parameter server מריצות את tf.distribute.Server שמאזינים לבקשות tf.distribute.Server . הרכז יוצר משאבים, משגר משימות אימונים, כותב מחסומים ומתמודד עם כשלים במשימה.

אנו מאמינים שארכיטקטורה זו ClusterCoordinator החדשה מספקים מודל תכנות גמיש ופשוט יותר.

ClusterCoordinator

הכיתה ClusterCoordinator צריכה לעבוד בשיתוף עם אובייקט tf.distribute.Strategy . יש צורך באובייקט tf.distribute.Strategy כדי להעביר את המידע של האשכול ומשמש להגדרת צעד אימון כפי שראינו באימונים מותאמים אישית עם MirroredStrategy . האובייקט ClusterCoordinator ואז שולח את ביצוע שלבי ההדרכה האלה לעובדים מרוחקים. נכון לעכשיו, ClusterCoordinator עובד רק עם tf.distribute.experimental.ParameterServerStrategy .

ה- API החשוב ביותר שמספק האובייקט ClusterCoordinator הוא schedule . ה- API של schedule צופה פונקציה tf.function . ומחזיר RemoteValue דמוי RemoteValue באופן מיידי. הפונקציות בתור יישלחו לעובדים מרוחקים RemoteValue הרקע ו- RemoteValue שלהם RemoteValue בצורה אסינכרונית. מכיוון schedule אינו מצריך הקצאת עובדים, ניתן לבצע את tf.function . על כל עובד זמין. אם העובד בו הוא מבוצע לא יהיה זמין לפני השלמתו, הפונקציה תנוסה שוב על עובד זמין אחר. בגלל עובדה זו והעובדה שביצוע פונקציות אינו אטומי, ניתן לבצע פונקציה יותר מפעם אחת.

בנוסף למשלוח פונקציות מרחוק, ClusterCoordinator מסייע גם ליצור מערכי נתונים על כל העובדים ולבנות מחדש מערכי נתונים אלה כאשר עובד מתאושש מכישלון.

הגדרת הדרכה

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

הגדרת אשכול

כאמור לעיל, אשכול אימון לשרת פרמטרים מחייב מטלת רכז שמפעילה את תוכנית ההדרכה שלך, עובד אחד או כמה ומשימות שרת פרמטרים המפעילות שרתי TensorFlow, כלומר tf.distribute.Server , ואולי מטלת הערכה נוספת המפעילה צד-רכב הערכה (ראה סעיף הערכת רכב צדדי בהמשך). הדרישות להגדרתן הן:

  • משימת הרכז צריכה לדעת את הכתובות והיציאות של כל שאר השרתים של TensorFlow למעט המעריך.
  • העובדים ושרתי הפרמטרים צריכים לדעת לאיזו פורט הם צריכים להאזין. למען הפשטות, בדרך כלל אנו מעבירים את מידע האשכול המלא כאשר אנו יוצרים שרתי TensorFlow במשימות אלה.
  • משימת המעריך אינה חייבת להכיר את הגדרת אשכול ההדרכה. אם כן, הוא לא צריך לנסות להתחבר לאשכול האימונים.
  • עובדים ושרתי פרמטרים צריכים שיהיו להם סוגי משימות כ"עובד "ו-" ps "בהתאמה. על הרכז להשתמש ב"צ'יף "כסוג המשימה מסיבות מורשת.

במדריך זה, ניצור אשכול בתהליך כך שניתן יהיה להריץ את כל אימוני שרת הפרמטרים ב- colab. נציג כיצד להקים אשכולות אמיתיים בחלק מאוחר יותר.

אשכול בתהליך

במדריך זה נתחיל מראש חבורה של שרתי TensorFlow ונתחבר אליהם מאוחר יותר:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

אימון בלולאת אימונים בהתאמה אישית

לולאת אימון מותאמת אישית עם tf.distribute.Strategy מספקת גמישות רבה להגדרת לולאות אימון. נכון לעכשיו לאימון שרת פרמטרים ב- TensorFlow 2, נתמך רק לולאת אימון מותאמת אישית. כאן אנו משתמשים ב- ParameterServerStrategy כדי להגדיר שלב הכשרה ואז משתמשים ב- ClusterCoordinator כדי להעביר את ביצוע שלבי ההדרכה לעובדים מרוחקים.

צור את ParameterServerStrategy

כדי לכתוב שלב אימונים בלולאת אימונים מותאמת אישית, הצעד הראשון הוא ליצור ParameterServerStrategy . נסביר את variable_partitioner בהמשך.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

לאחר מכן תיצור מודל, תגדיר מערך נתונים ופונקציה צעד כפי שראינו בלולאת האימונים עם tf.distribute.Strategy אחרים. ניתן למצוא פרטים נוספים זה הדרכה . בואו ניצור רכיבים אלה בשלבים הבאים:

הגדר את הנתונים

ראשית, כתוב פונקציה שיוצרת מערך נתונים הכולל לוגיקה של עיבוד מקדים המיושם על ידי שכבות עיבוד מקדימות של Keras. ניצור שכבות אלה מחוץ ל- dataset_fn אך נפעיל את השינוי בתוך dataset_fn מכיוון שתעטוף את dataset_fn tf.function שלא מאפשרת ליצור משתנים בתוכו.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

צור דוגמאות צעצוע במערך נתונים:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

לאחר מכן אנו יוצרים את מערך האימונים עטוף ב- dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

בנה את המודל

שנית, אנו יוצרים את המודל ואובייקטים אחרים. הקפד ליצור את כל המשתנים תחת strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

הגדירו את שלב האימון

שלישית, צור את שלב האימון העטוף tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

בפונקציית הצעד לעיל, קריאה strategy.run strategy.reduce ב- step_fn שימושיות לתמיכה ב- GPUs או במספר רב של עובדי העתקים בעתיד, אם כי יש להם יישום טריוויאלי ברגע זה.

העברת שלבי הדרכה לעובדים מרוחקים

לאחר שכל החישובים מוגדרים על ידי ParameterServerStrategy , נשתמש במחלקת ClusterCoordinator כדי ליצור משאבים ולהפיץ את שלבי ההדרכה לעובדים מרוחקים.

בואו ניצור תחילה אובייקט ClusterCoordinator לאובייקט האסטרטגיה:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

לאחר מכן אנו יוצרים מערך נתונים לעובד ואיטרטור. ב- per_worker_dataset_fn למטה, עטיפת מערך dataset_fn strategy.distribute_datasets_from_function dataset_fn היא אופציונלית, אך היא תאפשר תמיכה dataset_fn מראש יעילה ל- GPUs בצורה חלקה בעתיד כאשר GPUs נתמכים על ידי ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

השלב האחרון הוא הפצת החישוב לעובדים מרוחקים לפי schedule . שיטת schedule מציגה פונקציה tf.function . ומחזירה באופן מיידי RemoteValue . הפונקציות בתור יישלחו לעובדים מרוחקים RemoteValue הרקע וה- RemoteValue יתמלא בצורה אסינכרונית. ניתן להשתמש בשיטת join להמתין עד שכל הפונקציות המתוזמנות יוצגו.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

כך תוכל להביא את התוצאה של RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

לחלופין, אתה יכול להפעיל את כל השלבים ולעשות משהו בזמן ההמתנה לסיום:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

לקבלת תהליך ההדרכה וההגשה המלא עבור דוגמה מסוימת זו, עיין במבחן זה.

מידע נוסף על יצירת מערך נתונים

מערך הנתונים בקוד הנ"ל נוצר באמצעות ה- API של create_per_worker_dataset . זה יוצר מערך נתונים אחד לעובד ומחזיר אובייקט מכולה. אתה יכול לקרוא לשיטת iter עליו כדי ליצור איטרטור לעובד. האיטרטור לעובד מכיל איטרטור אחד לעובד והפרוסה המתאימה של עובד תוחלף בארגומנט הקלט של הפונקציה שהועברה לשיטת schedule לפני ביצוע הפונקציה על עובד מסוים.

נכון לעכשיו, שיטת schedule מניחה שהעובדים שווים ולכן הם מעריכים שמערכי הנתונים על עובדים שונים זהים, אלא שהם עשויים להיות מעורבבים אחרת אם הם מכילים פעולת dataset.shuffle מסיבה זו, אנו ממליצים גם לחזור על מערכי הנתונים ללא הגבלת זמן ולתזמן מספר סופי של צעדים במקום להסתמך על OutOfRangeError ממערך נתונים.

הערה חשובה נוספת היא tf.data נתונים של tf.data אינם תומכים tf.data מרומזים מעבר לגבולות המשימה. לכן חשוב ליצור את כל מערך הנתונים בתוך הפונקציה המועברת ל- create_per_worker_dataset .

רסיסים משתנים

רסיסה משתנה מתייחסת לפיצול משתנה למספר משתנים קטנים יותר. אנו מכנים את המשתנים הקטנים האלה רסיסים . רטוש משתנה עשוי להיות שימושי להפצת עומס הרשת בעת גישה לרסיסים אלה. כדאי גם להפיץ חישוב ואחסון של משתנה רגיל על פני שרתי פרמטרים מרובים.

כדי לאפשר sharding משתנה, אתה יכול לעבור בתוך variable_partitioner בעת בניית ParameterServerStrategy האובייקט. variable_partitioner יופעל בכל פעם שייווצר משתנה והוא צפוי להחזיר את מספר השברים לאורך כל מימד של המשתנה. חלק tf.distribute.experimental.partitioners.FixedShardsPartitioner variable_partitioner מחוץ לקופסה, כגון tf.distribute.experimental.partitioners.FixedShardsPartitioner .

בדוגמה שלעיל אנו משתמשים ב- FixedShardsPartitioner אשר FixedShardsPartitioner את כל המשתנים לשני רסיסים וכל רסיס יוקצה לשרתי פרמטרים שונים:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

כאשר variable_partitioner מועבר ואם אתה יוצר משתנה ישירות תחת strategy.scope() , היא תהפוך לסוג מכל עם variables רכוש אשר מספק גישה לרשימת שברים. ברוב המקרים, מיכל זה יומר אוטומטית לטנסור על ידי שרשור כל הרסיסים. כתוצאה מכך, ניתן להשתמש בו כמשתנה רגיל. מצד שני, כמה משיטות TensorFlow כגון tf.nn.embedding_lookup מספקות יישום יעיל לסוג מכולה זה ובשיטות אלה tf.nn.embedding_lookup אוטומטי.

לקבלת פרטים נוספים, עיין בתיעוד ה- API של ParameterServerStrategy .

הַעֲרָכָה

יש יותר מדרך אחת להגדיר ולהפעיל לולאת הערכה באימונים מבוזרים. לכל אחד יתרונות וחסרונות משלו כמתואר להלן. מומלץ להשתמש בשיטת הערכה מקוונת אם אין לך העדפה.

הערכה מוטבעת

בשיטה זו הרכז מתחלף בין הכשרה להערכה ולכן אנו מכנים אותה הערכה מוטבעת. ישנם מספר יתרונות של הערכה מקוונת. לדוגמה, הוא יכול לתמוך במודלי הערכה גדולים ובמערכות נתונים של הערכה שמשימה אחת אינה יכולה להחזיק. לדוגמא אחרת, ניתן להשתמש בתוצאות ההערכה לקבלת החלטות להכשרה בעידן הבא.

ישנן שתי דרכים ליישום הערכה מקוונת:

  • הערכה ישירה - עבור מודלים קטנים ומערכי נתונים של הערכה הרכז יכול להריץ הערכה ישירות על המודל המבוזר עם מערך ההערכה על הרכז:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • הערכה מבוזרת - עבור מודלים גדולים או מערכי נתונים שאינם ניתנים להפעלה ישירות על הרכז, משימת הרכז יכולה להפיץ מטלות הערכה לעובדים באמצעות שיטות schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

הערכת רכב צדדי

שיטה אחרת נקראת הערכת מכונית צדדית שהיא יצירת משימת מעריך ייעודית שקוראת שוב ושוב מחסומים ומריצה הערכה במחסום אחרון. זה מאפשר לתוכנית האימונים שלך להסתיים מוקדם אם אינך צריך לשנות את לולאת האימונים שלך על סמך תוצאות הערכה. עם זאת, נדרשת משימת מעריך נוספת ומחסום תקופתי בכדי להפעיל את ההערכה. להלן לולאת הערכת רכב צדדית אפשרית:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

אשכולות בעולם האמיתי

בסביבת ייצור אמיתית, תריץ את כל המשימות בתהליכים שונים במכונות שונות. הדרך הפשוטה ביותר להגדיר מידע על אשכול בכל משימה היא להגדיר משתני סביבה "TF_CONFIG" ולהשתמש ב- TFConfigClusterResolver לניתוח "TF_CONFIG". לתיאור כללי אודות משתני הסביבה "TF_CONFIG", עיין במדריך ההדרכה המבוזר .

אם אתה מתחיל את משימות האימון שלך באמצעות Kubernetes או תבניות תצורה אחרות, סביר מאוד להניח שתבניות אלה כבר הגדירו לך "TF_CONFIG".

הגדר את משתנה הסביבה "TF_CONFIG"

נניח שיש לך 3 עובדים ו -2 שרתי פרמטרים, "TF_CONFIG" של עובד 1 יכול להיות:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" של המעריך יכול להיות:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

החלק "אשכול" במחרוזת "TF_CONFIG" לעיל עבור המעריך הוא אופציונלי.

אם אתה משתמש באותו בינארי לכל המשימות

אם אתה מעדיף להפעיל את כל המשימות האלה באמצעות בינארי יחיד, יהיה עליך לתת לתוכנית שלך להסתעף בתפקידים שונים כבר בהתחלה:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

הקוד הבא מפעיל שרת TensorFlow ומחכה:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

טיפול בכישלון משימות

כישלון עובדים

כאמור לעיל, ל- ClusterCoordinator יש סובלנות תקלות מובנית לכישלון עובדים. לאחר התאוששות העובד, נתח create_per_worker_dataset הנתונים שנוצר על ידי create_per_worker_dataset שעדיין נמצא בתחום ייצור מחדש על ידי הפעלת dataset_fn המקורי שהועבר ל- create_per_worker_dataset .

שרת פרמטרים או כשל הרכז

עם זאת, כאשר המתאם יראה שגיאת שרת פרמטרים, הוא יעלה שגיאה UnavailableError או AbortedError באופן מיידי. אתה יכול להפעיל מחדש את הרכז במקרה זה. הרכז עצמו יכול גם להיות לא זמין. לכן, כדי לא לאבד הרבה מהתקדמות האימון, חשוב לבדוק את משתני המודל מעת לעת ולהעמיס משתני מודלים ממחסום, אם בכלל, לפני תחילת האימון. ניתן להסיק את התקדמות האימון בערך ממיטוב optimizer.iterations אם אופטימיזציה מסומנת.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

להביא ערך RemoteValue

מובטח כי RemoteValue תצליח אם פונקציה תבוצע בהצלחה. הסיבה לכך היא שכרגע ערך ההחזר מועתק מיד לרכז לאחר ביצוע פונקציה. אם ישנו כשל עובד במהלך ההעתקה, הפונקציה תנוסה שוב על עובד זמין אחר. לכן, אם ברצונך לבצע אופטימיזציה לביצועים, תוכל לתזמן פונקציות ללא ערך החזר.

דיווח שגיאות

ברגע InvalidArgument יראה שגיאה כגון UnavailableError משרתי פרמטרים או שגיאות יישום אחרות כגון InvalidArgument מ- tf.debugging.check_numerics , הוא יבטל את כל הפונקציות הממתינות tf.debugging.check_numerics לפני העלאת השגיאה. שליפת המקביל שלהם RemoteValue הים תעלה CancelledError .

לאחר העלאת שגיאה, הרכז לא יעלה את אותה שגיאה או שגיאה כלשהי מפונקציות שבוטלו.

שיפור ביצועים

ישנן מספר סיבות אפשריות אם אתה רואה בעיות ביצועים כאשר אתה מתאמן עם ParameterServerStrategy ו- ClusterResolver .

אחת הסיבות הנפוצות היא ששרתי פרמטרים הם בעלי עומס לא מאוזן וכמה שרתי פרמטרים עמוסים בכבדות הגיעו לקיבולת. יכולות להיות גם גורמי שורש מרובים. כמה שיטות פשוטות למתן בעיה זו הן

  1. שבר את משתני המודל הגדולים שלך באמצעות ציון variable_partitioner בעת בניית ParameterServerStrategy .
  2. הימנע מיצירת משתנה נקודה חמה הנדרש על ידי כל שרתי הפרמטרים בשלב יחיד במידת האפשר. לדוגמה, השתמש בקצב למידה קבוע או tf.keras.optimizers.schedules.LearningRateSchedule משנה tf.keras.optimizers.schedules.LearningRateSchedule אופטימיזציה מכיוון שהתנהגות ברירת המחדל היא שקצב הלמידה יהפוך למשתנה המוצב בשרת פרמטרים מסוים tf.keras.optimizers.schedules.LearningRateSchedule ידי כל שרתי הפרמטרים האחרים בכל שלב. .
  3. ערבב את אוצר המילים הגדול שלך לפני שתעביר אותם לשכבות העיבוד המקדימות של Keras.

סיבה אפשרית נוספת לבעיות ביצוע היא הרכז. היישום הראשון שלנו של schedule / join הוא מבוסס פיתון ולכן עשוי להיות משורשר. גם האיחור בין הרכז לעובדים יכול להיות גדול. אם זה המקרה, אתה יכול לארוז מספר שלבים tf.function אחת:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

אנו נמשיך לייעל את הרכז ומקווה שרוב המשתמשים לא יצטרכו לארוז צעדים ידנית בעתיד.

בנוסף, טריק קטן לשיפור ביצועים הוא לתזמן פונקציות ללא ערך החזר כמוסבר בסעיף כישלון משימות הטיפול לעיל.

מגבלות ידועות

רוב המגבלות הידועות מכוסות בסעיפים לעיל. הנה סיכום:

  • os.environment["grpc_fail_fast"]="use_caller" נדרש בכל משימה, כולל הרכז, כדי לגרום לסובלנות תקלות לעבוד כראוי.
  • עובדי GPU אינם נתמכים.
  • אימון שרת פרמטרים סינכרוני אינו נתמך.
  • ParameterServerStrategy לא עובד עם Keras compile ואת fit APIs.
  • ClusterCoordinator.schedule אינו תומך בערבויות ביקור עבור מערך נתונים.
  • כאשר משתמשים ב- ClusterCoordinator.create_per_worker_dataset , יש ליצור את כל מערך הנתונים בתוך הפונקציה המועברת אליו.
  • בדרך כלל יש צורך לארוז מספר שלבים לפונקציה אחת כדי להשיג ביצועים מיטביים.
  • זה לא נתמך לטעון את הציל_מודל באמצעות tf.saved_model.load המכיל משתנים מרוסקים. הערה טעינה של הצלת דוגמה כזו באמצעות הגשת TensorFlow צפויה לעבוד.
  • אין תמיכה בהטענת משתני חריץ אופטימיזציה מרוסקים מחסום למספר רסיסים אחר.
  • לא נתמך להתאושש מכשל בשרת הפרמטרים מבלי להפעיל מחדש את משימת הרכז.