צפה בהערות מרכזיות, הפעלות מוצר, סדנאות ועוד מ- Google I / O ראה רשימת השמעה

הכשרת שרתים פרמטרים

צפה ב- TensorFlow.org הפעל בגוגל קולאב צפה במקור ב- GitHub הורד מחברת

סקירה כללית

אימון לשרת פרמטרים הוא שיטה מקובלת המקבילה נתונים להגדלת אימוני מודלים במספר מכונות. מקבץ אימונים לשרת פרמטרים מורכב מעובדים ושרתי פרמטרים. משתנים נוצרים בשרתי פרמטרים והם נקראים ומתעדכנים על ידי עובדים בכל שלב. כברירת מחדל, עובדים קוראים ומעדכנים משתנים אלה באופן עצמאי מבלי להסתנכרן זה עם זה. זו הסיבה שלעיתים אימונים בסגנון שרת פרמטרים נקראים אימון אסינכרוני.

ב- TF2, הכשרת שרתים של פרמטרים מופעלת על ידי מחלקת tf.distribute.experimental.ParameterServerStrategy , המפיצה את שלבי ההדרכה לאשכול tf.distribute.experimental.ParameterServerStrategy עד אלפי עובדים (בליווי שרתי פרמטרים). ישנם שני ממשקי API להכשרה תומכים עיקריים: Keras Training API, הידוע גם בשם Model.fit , ו- Custom Training Loop (CTL). Model.fit מומלץ כאשר משתמשים מעדיפים הפשטה גבוהה וטיפול באימונים, ואילו CTL מומלץ כאשר משתמשים מעדיפים להגדיר את פרטי לולאת האימונים שלהם.

ללא קשר לממשק ה- API הנבחר, הכשרה מבוזרת ב- TF2 כוללת "אשכול" עם מספר "משרות", ולכל אחת מהמשרות עשויה להיות אחת או יותר "משימות". כאשר משתמשים באימון של שרת פרמטרים, מומלץ שיהיה משרת רכז אחד (שיש לו chief שם התפקיד), משרות עובד מרובות (שם משרת worker ) ועבודות שרת פרמטרים מרובות (שם משרה ps ).

בעוד הרכז יוצר משאבים, משגר משימות הדרכה, כותב נקודות ביקורת ומתמודד עם כשלים במשימה, עובדים ושרתי פרמטרים מריצים את tf.distribute.Server שמאזין לבקשות tf.distribute.Server .

אימון לשרת פרמטרים עם Model.fit API

אימון של שרת פרמטרים עם Model.fit API מחייב את הרכז להשתמש באובייקט tf.distribute.experimental.ParameterServerStrategy , וב tf.keras.utils.experimental.DatasetCreator כקלט. בדומה לשימוש ב- Model.fit ללא אסטרטגיה, או עם אסטרטגיות אחרות, זרימת העבודה כוללת יצירה והידור של המודל, הכנת השיחות Model.fit , Model.fit שיחת Model.fit .

אימון לשרת פרמטרים עם ממשק אימון מותאם אישית (CTL)

עם CTL, מחלקת tf.distribute.experimental.coordinator.ClusterCoordinator היא מרכיב המפתח המשמש את הרכז. הכיתה ClusterCoordinator צריכה לעבוד בשיתוף עם אובייקט tf.distribute.Strategy . יש צורך באובייקט tf.distribute.Strategy כדי לספק את המידע של האשכול ומשמש להגדרת שלב אימונים כפי שראינו באימונים מותאמים אישית עם MirroredStrategy . האובייקט ClusterCoordinator משגר את ביצוע שלבי ההדרכה הללו לעובדים מרוחקים. לצורך הכשרה של שרת פרמטרים, ClusterCoordinator צריך לעבוד עם tf.distribute.experimental.ParameterServerStrategy .

ה- API החשוב ביותר שמספק האובייקט ClusterCoordinator הוא schedule . ה- API של schedule מצפה פונקציית tf.function . ומחזיר RemoteValue דמוי RemoteValue באופן מיידי. הפונקציות בתור ישלחו לעובדים מרוחקים RemoteValue הרקע ו- RemoteValue שלהם RemoteValue לא סינכרונית. מאחר schedule אינו מחייב הקצאת עובדים, ניתן לבצע את tf.function . על כל עובד זמין. אם העובד בו הוא מבוצע לא יהיה זמין לפני השלמתו, הפונקציה תנוסה שוב לעובד זמין אחר. בגלל עובדה זו והעובדה שביצוע פונקציות אינו אטומי, ניתן לבצע פונקציה יותר מפעם אחת.

בנוסף למשלוח פונקציות מרחוק, ClusterCoordinator מסייע גם ליצור מערכי נתונים על כל העובדים ולבנות מחדש מערכי נתונים אלה כאשר עובד מתאושש מכישלון.

הגדרת הדרכה

ההדרכה Model.fit לנתיבי CTL או Model.fit , ותוכלו לבחור את המתאים לצורך שלכם. קטעים שאינם "אימון עם X" חלים על שני הנתיבים.

pip install -q portpicker
pip install -q tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

הגדרת אשכול

כאמור לעיל, אשכול אימון לשרת פרמטרים מחייב מטלת רכז שמפעילה את תוכנית ההדרכה שלך, עובד אחד או כמה ומשימות שרת פרמטרים המפעילות שרתי TensorFlow, כלומר tf.distribute.Server , ואולי משימת הערכה נוספת המפעילה רכב צדדי. הערכה (ראה סעיף הערכת רכב צדדי להלן). הדרישות להגדרתן הן:

  • משימת הרכז צריכה לדעת את הכתובות והיציאות של כל שאר השרתים של TensorFlow למעט המעריך.
  • העובדים ושרתי הפרמטרים צריכים לדעת לאיזו פורט הם צריכים להקשיב. למען הפשטות, בדרך כלל אנו מעבירים את מידע האשכול המלא כאשר אנו יוצרים שרתי TensorFlow במשימות אלה.
  • משימת המעריך אינה חייבת להכיר את הגדרת אשכול ההדרכה. אם כן, הוא לא צריך לנסות להתחבר לאשכול האימונים.
  • עובדים ושרתי פרמטרים צריכים להיות סוגי משימות כ- "עובד" ו- "ps" בהתאמה. על הרכז להשתמש ב"צ'יף "כסוג המשימה מסיבות מורשת.

במדריך זה ניצור אשכול בתהליך כך שניתן יהיה להריץ את כל אימוני שרת הפרמטרים ב- colab. נציג כיצד להקים אשכולות אמיתיים בחלק מאוחר יותר.

אשכול בתהליך

במדריך זה נתחיל מראש חבורה של שרתי TensorFlow ונתחבר אליהם בהמשך. שים לב שזה רק לצורך הדגמת הדרכה זו, ובאימון אמיתי השרתים יופעלו במכונות עובד ו- ps.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

הגדרת האשכול בתהליך משמשת לעתים קרובות בבדיקות היחידות שלנו. הנה דוגמה אחת .

הקם ParameterServerStrategy

לפני שנצלול לקוד האימונים, בואו נקים אובייקט ParameterServerStrategy . שים לב כי יש צורך בכך ללא קשר לשאלה אם אתה ממשיך בלולאת אימונים מותאמת אישית או ב- Model.fit . הטיעון variable_partitioner יוסבר בסעיף הבא .

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:24366', 'localhost:17071'], 'worker': ['localhost:17839', 'localhost:24811', 'localhost:19665']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:24366', 'localhost:17071'], 'worker': ['localhost:17839', 'localhost:24811', 'localhost:19665']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0', '/job:chief/replica:0/task:0/device:GPU:1', '/job:chief/replica:0/task:0/device:GPU:2', '/job:chief/replica:0/task:0/device:GPU:3', '/job:chief/replica:0/task:0/device:GPU:4', '/job:chief/replica:0/task:0/device:GPU:5'], variable_device = '/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

על מנת להשתמש ב- GPUs לאימון, הקצה GPUs הגלויים לכל עובד. ParameterServerStrategy ישתמש בכל GPUs הזמינים על כל עובד, עם המגבלה כי לכל העובדים יהיה מספר זהה של GPUs זמינים.

רסיסים משתנים

רסיסה משתנה מתייחסת לפיצול משתנה למספר משתנים קטנים יותר. אנו קוראים למשתנים קטנים יותר אלה שברים . רטוש משתנה עשוי להיות שימושי להפצת עומס הרשת בעת גישה לרסיסים אלה. כדאי גם להפיץ חישוב ואחסון של משתנה רגיל על פני שרתי פרמטרים מרובים.

כדי לאפשר sharding משתנה, אתה יכול לעבור בתוך variable_partitioner בעת בניית ParameterServerStrategy האובייקט. variable_partitioner יופעל בכל פעם שייווצר משתנה והוא צפוי להחזיר את מספר הרסיסים לאורך כל מימד של המשתנה. חלקם מחוץ תיבת variable_partitioner ים ניתנים כגון tf.distribute.experimental.partitioners.FixedShardsPartitioner .

כאשר variable_partitioner מועבר ואם אתה יוצר משתנה ישירות תחת strategy.scope() , היא תהפוך לסוג מיכל עם variables רכוש אשר מספק גישה לרשימת שברים. ברוב המקרים, מיכל זה יומר אוטומטית לטנסור על ידי שרשור כל הרסיסים. כתוצאה מכך, ניתן להשתמש בו כמשתנה רגיל. מצד שני, כמה משיטות TensorFlow כגון tf.nn.embedding_lookup מספקות יישום יעיל לסוג מכולה זה ובשיטות אלה tf.nn.embedding_lookup אוטומטי.

אנא עיין בתיעוד ה- API של ParameterServerStrategy לפרטים נוספים.

אימונים עם Model.fit

Keras מספק ממשק API להכשרה קל לשימוש באמצעות Model.fit המטפל בלולאת האימון מתחת למכסה המנוע, עם גמישות של train_step ניתנת train_step , train_step חוזרות המספקות פונקציות כגון שמירת מחסום, או שמירת סיכום עבור TensorBoard. עם Model.fit , ניתן להשתמש באותו קוד אימונים לאסטרטגיות אחרות באמצעות החלפה פשוטה של ​​אובייקט האסטרטגיה.

נתוני קלט

Model.fit עם הכשרה של שרת פרמטרים דורש כי נתוני הקלט יסופקו tf.distribute.InputContext לארגומנט יחיד מהסוג tf.distribute.InputContext ומחזירהtf.data.Dataset . לאחר מכן, צור tf.keras.utils.experimental.DatasetCreator אובייקט שלוקח כזה callable , וכן אופציונלי tf.distribute.InputOptions להתנגד באמצעות input_options טיעון. שים לב שמומלץ לערבב ולחזור על הנתונים באמצעות אימון של שרת פרמטרים, ולציין את steps_per_epoch fit כך שהספרייה תדע את גבולות העידן.

אנא עיין במדריך הקלט המבוזר לקבלת מידע נוסף אודות הטיעון InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

הקוד ב- dataset_fn יופעל במכשיר הקלט, שהוא בדרך כלל המעבד, בכל אחת ממכונות העובד.

בניית מודלים וקומפילציה

כעת תיצור מודל tf.keras.Model עם ממשקי ה- API tf.keras.models.Sequential מודל טריוויאלי tf.keras.models.Sequential נעשה שימוש במודל tf.keras.models.Sequential כאן), ואחריו קריאה Model.compile כדי לשלב רכיבים כגון אופטימיזציה, מדדים, או פרמטרים כגון steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

התקשרות והדרכות

לפני שתתקשר model.fit לאימון בפועל, בואי model.fit השיחות model.fit הנדרשות למשימות נפוצות כגון:

  • ModelCheckpoint - לשמירת משקולות הדגם.

  • BackupAndRestore - כדי לוודא שהתקדמות האימונים מגובה באופן אוטומטי BackupAndRestore אם האשכול חווה זמינות (כגון הפלה או מניעה מקדימה), או

  • TensorBoard - כדי לשמור את דוחות ההתקדמות בקבצי סיכום אשר ניתנים לדמיון בכלי TensorBoard.

שים לב שבשל שיקול ביצועים, לא ניתן לבטל החזרה של רמת החזרה אישית ברמת אצווה כאשר משתמשים בה עם ParameterServerStrategy . אנא שנה את השיחות steps_per_epoch המותאמות אישית שלך כדי לבצע שיחות ברמת התקופה, והתאם את steps_per_epoch לערך מתאים. בנוסף, steps_per_epoch הוא ארגומנט נדרש עבור Model.fit כאשר משתמשים בו עם ParameterServerStrategy .

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 6s - loss: 0.9476
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.8812
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.5994
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f1e973a5d40> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f1e8c19eb90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.4205
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.3881
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<tensorflow.python.keras.callbacks.History at 0x7f236401be50>

שימוש ישיר עם ClusterCoordinator (אופציונלי)

גם אם תבחר Model.fit אימונים Model.fit , באפשרותך להפעיל אופציה של ClusterCoordinator כדי לתזמן פונקציות אחרות שברצונך לבצע על העובדים. ראה להלן אימון עם סיבוב אימונים מותאם אישית לקבלת פרטים ודוגמאות נוספים.

אימונים בלולאת אימונים בהתאמה אישית

לולאת אימונים מותאמת אישית עם tf.distribute.Strategy מספקת גמישות רבה להגדרת לולאות אימון. עם ההגדרה ParameterServerStrategy לעיל, תשתמש ב- ClusterCoordinator כדי להעביר את ביצוע שלבי ההדרכה לעובדים מרוחקים.

לאחר מכן, תיצור מודל, תגדיר מערך נתונים ופונקציה צעד כפי שראינו בלולאת האימון עם tf.distribute.Strategy אחרים. ניתן למצוא פרטים נוספים זה הדרכה .

כדי להבטיח הגדרה מוקדמת של מערך נתונים, השתמש בממשקי ה- API המומלצים ליצירת מערך נתונים המוזכרים בשלבים הדרכה לשיגור לעובדים מרוחקים להלן. כמו כן, דאג להתקשר ל- strategy.run בתוך worker_fn כדי לנצל את מלוא היתרונות של GPUs המוקצים לעובדים. שאר הצעדים זהים לאימונים עם או בלי GPUs.

בואו ניצור רכיבים אלה בשלבים הבאים:

הגדר את הנתונים

ראשית, כתוב פונקציה שיוצרת מערך נתונים הכולל לוגיקה לעיבוד מקדים המיושמת על ידי שכבות עיבוד מקדימות של Keras. ניצור שכבות אלה מחוץ ל- dataset_fn אך נפעיל את השינוי בתוך dataset_fn מכיוון שתעטוף את dataset_fn tf.function שלא מאפשרת ליצור משתנים בתוכה.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab,
                                          mask_token=None)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

צור דוגמאות לצעצועים במערך נתונים:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

לאחר מכן אנו יוצרים את מערך האימונים עטוף ב- dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

בנה את המודל

שנית, אנו יוצרים את המודל ואובייקטים אחרים. הקפד ליצור את כל המשתנים תחת strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

בואו נאשר שהשימוש ב- FixedShardsPartitioner פיצל את כל המשתנים לשני רסיסים וכל רסיס הוקצה לשרתי פרמטרים שונים:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

הגדירו את שלב האימון

שלישית, צור את שלב האימון עטוף tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

בפונקציית הצעד לעיל, קריאה strategy.run strategy.reduce ב- step_fn יכולה לתמוך במספר GPUs לעובד. אם לעובדים הוקצו GPUs, strategy.run תפיץ את מערכי הנתונים במספר עותקים משוכפלים.

העבר שלבי הדרכה לעובדים מרוחקים

לאחר שכל החישובים מוגדרים על ידי ParameterServerStrategy , נשתמש במחלקת ClusterCoordinator כדי ליצור משאבים ולהפיץ את שלבי ההדרכה לעובדים מרוחקים.

בואו ניצור תחילה אובייקט ClusterCoordinator לאובייקט האסטרטגיה:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

לאחר מכן אנו יוצרים מערך נתונים לעובד ואיטרטור. ב per_worker_dataset_fn להלן, מומלץ לעטוף את dataset_fn strategy.distribute_datasets_from_function dataset_fn כדי לאפשר dataset_fn מראש יעיל ל- GPUs בצורה חלקה.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

השלב האחרון הוא הפצת החישוב לעובדים מרוחקים לפי schedule . שיטת schedule מציגה פונקציה tf.function . ומחזירה RemoteValue דמוי RemoteValue באופן מיידי. הפונקציות בתור יועברו לעובדים מרוחקים RemoteValue הרקע וה- RemoteValue ימולא בצורה אסינכרונית. ניתן להשתמש בשיטת join להמתין עד שכל הפונקציות המתוזמנות יוחרגו.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.743750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

כך תוכל להביא את התוצאה של RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.005490

לחלופין, אתה יכול להפעיל את כל השלבים ולעשות משהו בזמן ההמתנה לסיום:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

לקבלת תהליך העבודה של ההדרכה וההגשה המלאה לדוגמא ספציפית זו, עיין במבחן זה.

מידע נוסף על יצירת מערך נתונים

מערך הנתונים בקוד הנ"ל נוצר באמצעות ה- API של create_per_worker_dataset . הוא יוצר מערך נתונים אחד לעובד ומחזיר אובייקט מכולה. אתה יכול לקרוא לשיטת iter עליו כדי ליצור איטרטור לעובד. האיטרטור לעובד מכיל איטרטור אחד לעובד והפרוסה המתאימה של עובד תוחלף בארגומנט הקלט של הפונקציה שהועברה לשיטת schedule לפני ביצוע הפונקציה על עובד מסוים.

נכון לעכשיו, שיטת schedule מניחה שהעובדים שווים, ובכך מניחה שמערכי הנתונים על עובדים שונים זהים, אלא שהם עשויים להיות מדשדשים אחרת אם הם מכילים פעולת dataset.shuffle . מסיבה זו, אנו ממליצים גם לחזור על מערכי הנתונים ללא הגבלת זמן ולקבוע מספר סופי של צעדים במקום להסתמך על ה- OutOfRangeError ממערך נתונים.

הערה חשובה נוספת היא tf.data נתונים של tf.data אינם תומכים tf.data מרומז על פני גבולות המשימה. לכן חשוב ליצור את כל מערך הנתונים בתוך הפונקציה המועברת ל- create_per_worker_dataset .

הַעֲרָכָה

יש יותר מדרך אחת להגדיר ולהפעיל לולאת הערכה באימונים מבוזרים. לכל אחד יתרונות וחסרונות משלו כמתואר להלן. מומלץ להשתמש בשיטת הערכה מקוונת אם אין לך העדפה.

הערכה מוטבעת

בשיטה זו המתאם מתחלף בין הכשרה להערכה ולכן אנו מכנים אותה הערכה מוטבעת. ישנם מספר יתרונות של הערכה מקוונת. לדוגמה, הוא יכול לתמוך במודלי הערכה גדולים ובמערכות נתונים של הערכה שמשימה אחת אינה יכולה להחזיק. לדוגמא אחרת, ניתן להשתמש בתוצאות ההערכה לקבלת החלטות להכשרה בעידן הבא.

ישנן שתי דרכים ליישום הערכה מקוונת:

  • הערכה ישירה - עבור מודלים קטנים ומערכי נתונים של הערכה הרכז יכול להריץ הערכה ישירות על המודל המבוזר עם מערך ההערכה על הרכז:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • הערכה מבוזרת - עבור מודלים גדולים או מערכי נתונים שאינם ניתנים להפעלה ישירות על הרכז, משימת הרכז יכולה להפיץ משימות הערכה לעובדים באמצעות שיטות schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

הערכת רכב צדדי

שיטה אחרת נקראת הערכת מכונית צדדית שהיא יצירת משימת מעריך ייעודית שקוראת שוב ושוב מחסומים ומריצה הערכה במחסום אחרון. זה מאפשר לתוכנית האימונים שלך להסתיים מוקדם אם אינך צריך לשנות את לולאת האימונים שלך על סמך תוצאות הערכה. עם זאת, נדרשת משימת מעריך נוספת ובדיקה תקופתית בכדי להפעיל הערכה. להלן לולאה אפשרית להערכת רכב צדדי:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

אשכולות בעולם האמיתי

בסביבת ייצור אמיתית, תריץ את כל המשימות בתהליכים שונים במכונות שונות. הדרך הפשוטה ביותר להגדיר מידע על אשכול בכל משימה היא להגדיר משתני סביבה "TF_CONFIG" ולהשתמש במערכת tf.distribute.cluster_resolver.TFConfigClusterResolver לניתוח "TF_CONFIG". לתיאור כללי אודות משתני הסביבה "TF_CONFIG", עיין במדריך ההדרכה המבוזר .

אם אתה מתחיל את משימות האימון שלך באמצעות Kubernetes או תבניות תצורה אחרות, סביר מאוד להניח שתבניות אלה כבר הגדירו לך "TF_CONFIG".

הגדר את משתנה הסביבה "TF_CONFIG"

נניח שיש לך 3 עובדים ו -2 שרתי פרמטרים, "TF_CONFIG" של עובד 1 יכול להיות:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" של המעריך יכול להיות:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

החלק "אשכול" במחרוזת "TF_CONFIG" לעיל עבור המעריך הוא אופציונלי.

אם אתה משתמש באותו בינארי לכל המשימות

אם אתה מעדיף להפעיל את כל המשימות הללו באמצעות בינארי יחיד, יהיה עליך לתת לתוכנית שלך להסתעף בתפקידים שונים כבר בהתחלה:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

הקוד הבא מפעיל שרת TensorFlow ומחכה:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

טיפול בכישלון משימות

כישלון עובדים

ClusterCoordinator או Model.fit מספק סובלנות תקלות מובנית לכישלון עובדים. עם התאוששות העובד, פונקציית מערך הנתונים שסופקה בעבר (או כדי create_per_worker_dataset עבור CTL, או DatasetCreator עבור Model.fit ) תופעל על העובדים כדי ליצור מחדש את מערכי הנתונים.

כישלון שרת פרמטרים או רכז

עם זאת, כאשר הרכז יראה שגיאת שרת פרמטרים, הוא יעלה UnavailableError או AbortedError באופן מיידי. אתה יכול להפעיל מחדש את הרכז במקרה זה. הרכז עצמו יכול גם להיות זמין. לכן, כלים מסוימים מומלצים על מנת לא לאבד את התקדמות האימונים:

  • עבור Model.fit , עליכם להשתמש BackupAndRestore חוזרת של BackupAndRestore , המטפלת בחיסכון ובשחזור ההתקדמות באופן אוטומטי. ראה סעיף התקשרות והדרכה לעיל לדוגמא.

  • עבור CTL, עליך לבדוק את משתני המודל מעת לעת ולהעמיס משתני מודל ממחסום, אם בכלל, לפני תחילת האימון. ניתן להסיק את התקדמות האימונים בערך ממיטוב optimizer.iterations אם אופטימיזציה מסומנת:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

להביא ערך RemoteValue

מובטח כי RemoteValue תצליח אם פונקציה תבוצע בהצלחה. הסיבה לכך היא שכרגע ערך ההחזר מועתק מיד לרכז לאחר ביצוע פונקציה. אם יש כשל עובד במהלך ההעתקה, הפונקציה תנסה שוב לעובד זמין אחר. לכן, אם ברצונך לבצע אופטימיזציה לביצועים, תוכל לתזמן פונקציות ללא ערך החזר.

דיווח שגיאות

ברגע InvalidArgument יראה שגיאה כגון UnavailableError משרתי פרמטרים או שגיאות יישום אחרות כגון InvalidArgument מ- tf.debugging.check_numerics , הוא יבטל את כל הפונקציות הממתינות tf.debugging.check_numerics לפני העלאת השגיאה. שליפת המקביל שלהם RemoteValue הים תעלה CancelledError .

לאחר העלאת שגיאה, הרכז לא יעלה את אותה שגיאה או שגיאה כלשהי מפונקציות שבוטלו.

שיפור ביצועים

ישנן מספר סיבות אפשריות אם אתה רואה בעיות ביצועים כאשר אתה מתאמן עם ParameterServerStrategy ו- ClusterResolver .

אחת הסיבות הנפוצות היא ששרתי פרמטרים הם בעלי עומס לא מאוזן וכמה שרתי פרמטרים עמוסים בכבדות הגיעו לקיבולת. יכולות להיות גם סיבות שורש מרובות. כמה שיטות פשוטות למתן בעיה זו הן

  1. שבר את משתני המודל הגדולים שלך באמצעות ציון variable_partitioner בעת בניית ParameterServerStrategy .
  2. הימנע מיצירת משתנה נקודה חמה הנדרש על ידי כל שרתי הפרמטרים בשלב יחיד במידת האפשר. לדוגמה, השתמש בקצב למידה קבוע או tf.keras.optimizers.schedules.LearningRateSchedule מחלקה tf.keras.optimizers.schedules.LearningRateSchedule אופטימיזציה מכיוון שהתנהגות ברירת המחדל היא שקצב הלמידה יהפוך למשתנה המוצב בשרת פרמטרים מסוים tf.keras.optimizers.schedules.LearningRateSchedule ידי כל שרתי הפרמטרים האחרים בכל שלב. .
  3. ערבב את אוצר המילים הגדול שלך לפני שתעביר אותם לשכבות העיבוד המקדימות של Keras.

סיבה אפשרית נוספת לבעיות ביצוע היא הרכז. היישום הראשון שלנו של schedule / join מבוסס על פיתון ולכן עשוי להיות משורשר. גם האיחור בין הרכז לעובדים יכול להיות גדול. אם זה המקרה,

  • עבור Model.fit , אתה יכול להגדיר את הטיעון steps_per_execution המסופק ב- Model.compile לערך גדול מ -1.

  • עבור CTL, אתה יכול לארוז מספר שלבים tf.function אחת:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

כאשר אנו ממשיכים לבצע אופטימיזציה של הספרייה, אנו מקווים שרוב המשתמשים לא יצטרכו לארוז צעדים באופן ידני בעתיד.

בנוסף, טריק קטן לשיפור ביצועים הוא לתזמן פונקציות ללא ערך החזר כמוסבר בסעיף כישלון משימות הטיפול לעיל.

מגבלות ידועות

רוב המגבלות הידועות מכוסות בסעיפים לעיל. חלק זה מספק סיכום.

ParameterServerStrategy כללי

  • os.environment["grpc_fail_fast"]="use_caller" נדרש בכל משימה, כולל הרכז, בכדי לגרום לסובלנות תקלות לעבוד כראוי.
  • אימון שרת פרמטרים סינכרוני אינו נתמך.
  • בדרך כלל יש צורך לארוז מספר שלבים לפונקציה אחת כדי להשיג ביצועים מיטביים.
  • לא נתמך לטעון את הציל_מודל באמצעות tf.saved_model.load המכיל משתנים מרוסקים. הערה טעינה של הצלת דוגמה כזו באמצעות הגשת TensorFlow צפויה לעבוד.
  • לא נתמך לטעון משתני חריץ אופטימיזציה מרוסקים מחוספס למספר אחר של רסיסים.
  • לא נתמך להתאושש מכשל בשרת הפרמטרים מבלי להפעיל מחדש את משימת הרכז.
  • השימוש ב- tf.lookup.StaticHashTable (המופעל בדרך כלל על ידי שכבות tf.keras.layers.experimental.preprocessing שכבות, כגון IntegerLookup , StringLookup ו- TextVectorization ) גורם למשאבים המוצבים על הרכז בשלב זה עם אימון PS. יש לכך השלכה על ביצועי RPCs לחיפוש מהעובדים לרכז. זוהי עדיפות גבוהה הנוכחית לטיפול.

Model.fit

  • steps_per_epoch טיעון Model.fit ב- Model.fit . ניתן לבחור ערך המספק מרווחים מתאימים בעידן.
  • ParameterServerStrategy אין תמיכה להתקשרות חוזרות מותאמות אישית שיש להן שיחות ברמת אצווה מסיבות ביצועים. אתה צריך להמיר שיחות אלה לתוך שיחות עידן ברמה עם הרים כראוי steps_per_epoch , כך שהם נקראים כל steps_per_epoch מספר צעדים. השיחות החוזרות המובנות אינן מושפעות: השיחות ברמת האצווה שלהן שונו כך שהן ביצועיות. מתוכננת תמיכה בשיחות ברמת אצווה עבור ParameterServerStrategy .
  • מאותה סיבה, בניגוד לאסטרטגיות אחרות, סרגל ההתקדמות והמדדים נרשמים רק בגבולות העידן.
  • קלט עבור Model.fit לוקח רק את סוג DatasetCreator .
  • run_eagerly אינו נתמך.
  • הערכה ב- Model.fit עדיין אינה נתמכת. זה אחד העדיפויות.
  • Model.evaluate ו- Model.predict עדיין לא נתמכים.

פרטי אימון לולאה בהתאמה אישית