עזרה להגן על שונית המחסום הגדולה עם TensorFlow על Kaggle הצטרפו אתגר

אימון שרת פרמטרים עם ParameterServerStrategy

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

אימון שרת פרמטרים הוא שיטה נפוצה מקבילה לנתונים להגדלת הדרכת מודלים במספר מכונות.

אשכול הדרכה לשרת פרמטרים מורכב מעובדים ושרתי פרמטרים . משתנים נוצרים בשרתי פרמטרים והם נקראים ומתעדכנים על ידי העובדים בכל שלב. כברירת מחדל, עובדים קוראים ומעדכנים משתנים אלה באופן עצמאי מבלי לסנכרן זה עם זה. זו הסיבה שלפעמים אימון בסגנון שרת פרמטר נקרא אימון אסינכרוני .

ב-TensorFlow 2, הכשרת שרת פרמטרים מופעלת על ידי מחלקה tf.distribute.experimental.ParameterServerStrategy , אשר מפיצה את שלבי ההדרכה לאשכול המתרחב לאלפי עובדים (בליווי שרתי פרמטרים).

שיטות אימון נתמכות

ישנן שתי שיטות אימון עיקריות נתמכות:

אשכול עם משרות ומשימות

ללא קשר ל-API הנבחר ( Model.fit או לולאת הדרכה מותאמת אישית), הכשרה מבוזרת ב-TensorFlow 2 כוללת: 'cluster' עם מספר 'jobs' , ולכל אחת מהעבודות עשויות להיות 'tasks' אחת או יותר.

בעת שימוש באימון שרת פרמטרים, מומלץ להצטייד ב:

  • תפקיד רכז אחד (שיש לו את שם התפקיד chief )
  • משרות עובדים מרובות ( worker שם עבודה); ו
  • משימות שרת פרמטרים מרובות (שם עבודה ps )

בזמן שהרכז יוצר משאבים, שולח משימות הדרכה, כותב מחסומים ומטפל בתקלות משימות, עובדים ושרתי פרמטרים מפעילים את tf.distribute.Server שמקשיבים לבקשות מהרכז.

אימון שרת פרמטרים עם Model.fit API

אימון שרת פרמטרים עם ה-API של Model.fit מחייב את המתאם להשתמש באובייקט tf.distribute.experimental.ParameterServerStrategy , ו- tf.keras.utils.experimental.DatasetCreator כקלט. בדומה לשימוש ב- Model.fit ללא אסטרטגיה, או עם אסטרטגיות אחרות, זרימת העבודה כוללת יצירה והידור של המודל, הכנת החזרות, ולאחר מכן קריאת Model.fit .

אימון שרת פרמטרים עם לולאת אימון מותאמת אישית

עם לולאות אימון מותאמות אישית, כיתת tf.distribute.experimental.coordinator.ClusterCoordinator היא מרכיב המפתח המשמש את הרכז.

ה-API החשוב ביותר שסופק על ידי אובייקט ClusterCoordinator הוא schedule :

  • ה-API של schedule מעמיד בתור פונקציה tf. ומחזיר מיד tf.function דמוי RemoteValue .
  • הפונקציות בתור יישלחו לעובדים מרוחקים בשרשורי רקע וה- RemoteValue שלהם יתמלאו באופן אסינכרוני.
  • מאחר schedule אינו מצריך הקצאת עובד, ניתן לבצע את tf.function . המועברת על כל עובד זמין.
  • אם העובד שעליו הוא מבוצע לא יהיה זמין לפני השלמתו, הפונקציה תבוצע שוב על עובד זמין אחר.
  • בגלל עובדה זו והעובדה שביצוע פונקציה אינו אטומי, פונקציה עשויה להתבצע יותר מפעם אחת.

בנוסף לשליחת פונקציות מרחוק, ClusterCoordinator עוזר גם ליצור מערכי נתונים על כל העובדים ולבנות מחדש את מערכי הנתונים הללו כאשר עובד מתאושש מכשל.

הגדרת הדרכה

המדריך יסתעף למסלולי לולאת הדרכה Model.fit והתאמה אישית, ותוכל לבחור את זה שמתאים לצרכים שלך. סעיפים שאינם "אימון עם X" חלים על שני המסלולים.

pip install portpicker

הגדרת אשכול

כפי שצוין לעיל, אשכול אימון שרת פרמטרים דורש משימת מתאם שמפעילה את תוכנית ההדרכה שלך, עובד אחד או כמה משימות שרת פרמטרים המריצים שרתי TensorFlow— tf.distribute.Server — ואולי משימת הערכה נוספת שמפעילה הערכת רכב צדדי. (ראה סעיף הערכת רכב צד להלן). הדרישות להגדרתם הן:

  • משימת הרכז צריכה לדעת את הכתובות והיציאות של כל שרתי TensorFlow האחרים מלבד המעריך.
  • העובדים ושרתי הפרמטרים צריכים לדעת לאיזו יציאה הם צריכים להאזין. למען הפשטות, אתה יכול בדרך כלל להעביר את מידע האשכול המלא בעת יצירת שרתי TensorFlow במשימות אלו.
  • משימת המעריך לא חייבת לדעת את ההגדרה של אשכול ההדרכה. אם כן, הוא לא צריך לנסות להתחבר לאשכול ההדרכה.
  • עובדים ושרתי פרמטרים צריכים להיות בעלי סוגי משימות כמו "worker" ו- "ps" , בהתאמה. על הרכז להשתמש "chief" כסוג המשימה מסיבות מדור קודם.

במדריך זה, תיצור אשכול בתהליך כך שניתן יהיה להריץ את כל אימון שרת הפרמטרים ב-Colab. תלמד כיצד להגדיר אשכולות אמיתיים בחלק מאוחר יותר.

אשכול בתהליך

תתחיל ביצירת מספר שרתי TensorFlow מראש ותתחבר אליהם מאוחר יותר. שימו לב שזה רק למטרת הדגמה של הדרכה זו, ובאימון אמיתי השרתים יופעלו על מכונות "worker" ו- "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

הגדרת האשכול בתהליך משמשת לעתים קרובות בבדיקות יחידות, כגון כאן .

אפשרות נוספת לבדיקה מקומית היא להפעיל תהליכים במכונה המקומית - בדוק את הדרכה של Multi-worker עם Keras לקבלת דוגמה לגישה זו.

יצירת ParameterServerStrategy

לפני שאתם צוללים לתוך קוד האימון, בואו ניצור אובייקט של ParameterServerStrategy . שים לב שזה נחוץ ללא קשר אם אתה ממשיך עם Model.fit או לולאת אימון מותאמת אישית. הארגומנט variable_partitioner יוסבר בסעיף ריסוק משתנה .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

על מנת להשתמש ב-GPUs להדרכה, הקצו גרפי GPU גלויים לכל עובד. ParameterServerStrategy ישתמש בכל ה-GPUs הזמינים בכל עובד, עם ההגבלה שלכל העובדים יהיה אותו מספר של GPUs זמין.

ריסוק משתנה

ריסוק משתנים מתייחס לפיצול משתנה למספר משתנים קטנים יותר, הנקראים רסיסים . פיצול משתנים עשוי להיות שימושי כדי להפיץ את עומס הרשת בעת גישה לרסיסים אלה. זה גם שימושי כדי להפיץ חישוב ואחסון של משתנה רגיל על פני שרתי פרמטרים מרובים.

כדי לאפשר פיצול משתנים, אתה יכול להעביר ב- variable_partitioner בעת בניית אובייקט ParameterServerStrategy . ה- variable_partitioner יופעל בכל פעם כאשר משתנה נוצר והוא צפוי להחזיר את מספר הרסיסים לאורך כל מימד של המשתנה. חלק מ- variable_partitioner מחוץ לקופסה מסופקים כגון tf.distribute.experimental.partitioners.MinSizePartitioner . מומלץ להשתמש במחיצות מבוססות גודל כמו tf.distribute.experimental.partitioners.MinSizePartitioner כדי להימנע מחלוקת משתנים קטנים, שעלולים להשפיע לרעה על מהירות אימון המודל.

כאשר variable_partitioner מועברים ואם אתה יוצר משתנה ישירות תחת strategy.scope() , הוא יהפוך לסוג מיכל עם מאפיין variables המספק גישה לרשימת הרסיסים. ברוב המקרים, מיכל זה יומר אוטומטית לטנזור על ידי שרשור כל הרסיסים. כתוצאה מכך, ניתן להשתמש בו כמשתנה נורמלי. מצד שני, חלק משיטות TensorFlow כמו tf.nn.embedding_lookup מספקות הטמעה יעילה עבור סוג מיכל זה ובשיטות אלו תימנעה שרשור אוטומטי.

אנא עיין במסמכי ה-API של tf.distribute.experimental.ParameterServerStrategy לפרטים נוספים.

אימון עם Model.fit

Keras מספקת API קל לשימוש לאימון דרך Model.fit המטפל בלולאת האימון שמתחת למכסה המנוע, עם הגמישות של train_step , והתקשרויות חוזרות, המספקות פונקציונליות כמו שמירת נקודות ביקורת או שמירת סיכום עבור TensorBoard. עם Model.fit , ניתן להשתמש באותו קוד אימון עבור אסטרטגיות אחרות עם החלפה פשוטה של ​​אובייקט האסטרטגיה.

קלט נתונים

Model.fit עם אימון שרת פרמטרים מחייב את נתוני הקלט להיות מסופקים ב-callable שלוקח ארגומנט בודד מסוג tf.distribute.InputContext , ומחזיר tf.data.Dataset . לאחר מכן, צור אובייקט tf.keras.utils.experimental.DatasetCreator שלוקח אובייקט קריא כזה, tf.distribute.InputOptions callable באמצעות ארגומנט input_options .

שימו לב שמומלץ לערבב ולחזור על הנתונים עם אימון שרת פרמטרים, ולציין steps_per_epoch ב- fit call כדי שהספרייה תדע את גבולות העידן.

אנא עיין במדריך הקלט המבוזר למידע נוסף על הארגומנט InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

הקוד ב- dataset_fn יופעל בהתקן הקלט, שהוא בדרך כלל ה-CPU, בכל אחת ממכונות העבודה.

בניית מודל והידור

כעת, תיצור tf.keras.Model — מודל tf.keras.models.Sequential טריוויאלי למטרות הדגמה — ואחריו קריאה Model.compile לשילוב רכיבים, כגון מייעל, מדדים או פרמטרים כגון steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

התקשרות והדרכה

לפני שאתה מתקשר ל- model.fit לאימון בפועל, בואו נכין את ההתקשרות הנחוצות למשימות נפוצות, כגון:

  • ModelCheckpoint : כדי לשמור את משקלי הדגם.
  • BackupAndRestore : כדי לוודא שהתקדמות האימון מגובת אוטומטית, ומשוחזרת אם האשכול חווה חוסר זמינות (כגון הפסקה או מניעת היעדרות); אוֹ
  • TensorBoard : כדי לשמור את דוחות ההתקדמות בקבצי סיכום, המוצגים בכלי TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

שימוש ישיר עם ClusterCoordinator (אופציונלי)

גם אם תבחר בנתיב ההדרכה Model.fit , אתה יכול לחלופין ליצור אובייקט tf.distribute.experimental.coordinator.ClusterCoordinator כדי לתזמן פונקציות אחרות שאתה רוצה שיבוצעו על העובדים. עיין בסעיף אימון עם לולאת אימון מותאמת אישית לפרטים נוספים ודוגמאות.

אימון עם לולאת אימון מותאמת אישית

שימוש בלולאות אימון מותאמות אישית עם tf.distribute.Strategy מספק גמישות רבה להגדרת לולאות אימון. עם ParameterServerStrategy שהוגדרה לעיל ( strategy ), תשתמש ב- tf.distribute.experimental.coordinator.ClusterCoordinator כדי לשלוח את ביצוע שלבי ההדרכה לעובדים מרוחקים.

לאחר מכן, תיצור מודל, תגדיר מערך נתונים ופונקציית צעד, כפי שעשית בלולאת האימון עם s tf.distribute.Strategy אחרים. תוכל למצוא פרטים נוספים בהדרכה מותאמת אישית עם tf.distribute.Strategy .

כדי להבטיח שליפה מוקדמת של מערך נתונים יעילה, השתמש בממשקי API ליצירת מערך נתונים מבוזר המומלצים המוזכרים בסעיף שלבי הדרכה לשיגור לעובדים מרוחקים להלן. כמו כן, הקפד להתקשר ל- Strategy.run בתוך worker_fn כדי לנצל את מלוא ה-GPUs שהוקצו לעובדים. שאר השלבים זהים לאימון עם או בלי GPUs.

בואו ניצור את הרכיבים האלה בשלבים הבאים:

הגדר את הנתונים

ראשית, כתוב פונקציה שיוצרת מערך נתונים הכולל לוגיקת עיבוד מקדים המיושמת על ידי שכבות עיבוד מוקדם של Keras .

אתה תיצור את השכבות האלה מחוץ ל- dataset_fn אבל תחיל את הטרנספורמציה בתוך ה- dataset_fn , מכיוון שאתה תעטוף את dataset_fn לתוך tf.function , שלא מאפשרת ליצור משתנים בתוכו.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

צור דוגמאות צעצועים במערך נתונים:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

לאחר מכן, צור את מערך ההדרכה עטוף ב- dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

בנה את הדגם

לאחר מכן, צור את המודל ואובייקטים אחרים. הקפד ליצור את כל המשתנים תחת strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

בואו נאשר שהשימוש ב- FixedShardsPartitioner פיצל את כל המשתנים לשני רסיסים וכל רסיס הוקצה לשרתי פרמטרים שונים:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

הגדר את שלב האימון

שלישית, צור את שלב האימון עטוף ב- tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

בפונקציית שלב האימון לעיל, קריאה ל- Strategy.run ו- Strategy.reduce ב- step_fn יכולה לתמוך במספר GPUs לכל עובד. אם לעובדים הוקצו GPUs, Strategy.run יפיץ את מערכי הנתונים על מספר העתקים.

שליחת שלבי הדרכה לעובדים מרוחקים

לאחר שכל החישובים יוגדרו על ידי ParameterServerStrategy , תשתמש בכיתה tf.distribute.experimental.coordinator.ClusterCoordinator כדי ליצור משאבים ולהפיץ את שלבי ההדרכה לעובדים מרוחקים.

בואו ניצור תחילה אובייקט ClusterCoordinator ונעביר את אובייקט האסטרטגיה:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

לאחר מכן, צור מערך נתונים לכל עובד ואיטרטור. ב- per_worker_dataset_fn למטה, מומלץ לעטוף את dataset_fn לתוך strategy.distribute_datasets_from_function כדי לאפשר שליפה יעילה מראש למעבדי GPU בצורה חלקה.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

השלב האחרון הוא להפיץ את החישוב לעובדים מרוחקים באמצעות ClusterCoordinator.schedule :

  • שיטת schedule מעמידה בתור פונקציה tf. ומחזירה מיד tf.function דמוי RemoteValue . הפונקציות בתור יישלחו לעובדים מרוחקים בשרשורי רקע וה- RemoteValue יתמלא באופן אסינכרוני.
  • ניתן להשתמש בשיטת join ( ClusterCoordinator.join ) כדי להמתין עד לביצוע כל הפונקציות המתוזמנות.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

כך תוכל להביא את התוצאה של RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

לחלופין, אתה יכול להפעיל את כל השלבים ולעשות משהו בזמן ההמתנה להשלמה:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

לקבלת זרימת העבודה המלאה של ההדרכה וההגשה עבור דוגמה מסוימת זו, בדוק את המבחן הזה.

עוד על יצירת מערך נתונים

מערך הנתונים בקוד לעיל נוצר באמצעות ה-API של ClusterCoordinator.create_per_worker_dataset ). הוא יוצר מערך נתונים אחד לכל עובד ומחזיר אובייקט מיכל. אתה יכול לקרוא לשיטת iter על זה כדי ליצור איטרטור לכל עובד. האיטרטור לכל עובד מכיל איטרטור אחד לכל עובד והחתך המתאים של עובד יוחלף בארגומנט הקלט של הפונקציה המועברת לשיטת ClusterCoordinator.schedule לפני ביצוע הפונקציה על עובד מסוים.

נכון לעכשיו, שיטת ClusterCoordinator.schedule מניחה שהעובדים שווים, ולכן מניחה שמערכי הנתונים של עובדים שונים זהים, אלא שהם עשויים להיות ערבובים בצורה שונה אם הם מכילים פעולת Dataset.shuffle . בשל כך, מומלץ גם לחזור על מערכי הנתונים ללא הגבלת זמן ולתזמן מספר סופי של שלבים במקום להסתמך על OutOfRangeError ממערך נתונים.

הערה חשובה נוספת היא tf.data נתונים של tf.data אינם תומכים בסריאליזציה מרומזת וסידריאליזציה על פני גבולות המשימות. לכן חשוב ליצור את כל מערך הנתונים בתוך הפונקציה המועברת ל- ClusterCoordinator.create_per_worker_dataset .

הַעֲרָכָה

יש יותר מדרך אחת להגדיר ולהפעיל לולאת הערכה בהדרכה מבוזרת. לכל אחד יש יתרונות וחסרונות משלו כפי שמתואר להלן. שיטת ההערכה המוטבעת מומלצת אם אין לך העדפה.

הערכה מוטבעת

בשיטה זו, הרכז מחליף בין אימון להערכה וכך היא נקראת הערכה מוטבעת .

ישנם מספר יתרונות של הערכה מוטבעת. לדוגמה:

  • זה יכול לתמוך במודלים גדולים של הערכה ובמערכי נתונים של הערכה שמשימה אחת לא יכולה להחזיק.
  • ניתן להשתמש בתוצאות ההערכה כדי לקבל החלטות לאימון העידן הבא.

ישנן שתי דרכים ליישם הערכה מוטבעת: הערכה ישירה והערכה מבוזרת.

  • הערכה ישירה : עבור מודלים קטנים ומערכי הערכה, הרכז יכול להפעיל הערכה ישירות על המודל המבוזר עם מערך ההערכה על הרכז:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • הערכה מבוזרת : עבור מודלים או מערכי נתונים גדולים שלא ניתן להפעיל ישירות על הרכז, משימת הרכז יכולה להפיץ משימות הערכה לעובדים באמצעות שיטות ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

הערכת רכב צד

שיטה נוספת נקראת הערכת מכוניות צד, שבה אתה יוצר משימת מעריך ייעודית שקוראת שוב ושוב מחסומים ומפעילה הערכה על מחסום אחרון. זה מאפשר לתוכנית האימונים שלך להסתיים מוקדם אם אינך צריך לשנות את לולאת האימון שלך בהתבסס על תוצאות הערכה. עם זאת, זה דורש משימת מעריך נוספת ובדיקה תקופתית כדי להפעיל הערכה. להלן לולאת הערכה אפשרית של רכב צד:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

אשכולות בעולם האמיתי

בסביבת ייצור אמיתית, תפעיל את כל המשימות בתהליכים שונים על מכונות שונות. הדרך הפשוטה ביותר להגדיר מידע על אשכולות בכל משימה היא להגדיר משתני סביבה "TF_CONFIG" ולהשתמש ב- tf.distribute.cluster_resolver.TFConfigClusterResolver כדי לנתח את "TF_CONFIG" .

לתיאור כללי על משתני סביבה "TF_CONFIG" , עיין במדריך ההדרכה המבוזר .

אם אתה מתחיל את משימות ההדרכה שלך באמצעות Kubernetes או תבניות תצורה אחרות, סביר מאוד להניח שתבניות אלו כבר הגדירו “TF_CONFIG" .

הגדר את משתנה הסביבה "TF_CONFIG"

נניח שיש לך 3 עובדים ו-2 שרתי פרמטרים, ה- "TF_CONFIG" של עובד 1 יכול להיות:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

ה- "TF_CONFIG" של המעריך יכול להיות:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

החלק "cluster" במחרוזת "TF_CONFIG" לעיל עבור המעריך הוא אופציונלי.

אם אתה משתמש באותו בינארי עבור כל המשימות

אם אתה מעדיף להפעיל את כל המשימות הללו באמצעות קובץ בינארי אחד, תצטרך לתת לתוכנית שלך להסתעף לתפקידים שונים כבר בהתחלה:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

הקוד הבא מפעיל שרת TensorFlow וממתין:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

טיפול בכשל במשימה

כישלון עובד

tf.distribute.experimental.coordinator.ClusterCoordinator או Model.fit מספקים סובלנות מובנית לתקלות עבור כשל עובד. לאחר שחזור עובד, פונקציית הנתונים שסופקו בעבר (או ל- ClusterCoordinator.create_per_worker_dataset עבור לולאת אימון מותאמת אישית, או tf.keras.utils.experimental.DatasetCreator for Model.fit ) תופעל על העובדים כדי ליצור מחדש את מערכי הנתונים.

כשל בשרת פרמטר או ברכז

עם זאת, כאשר המתאם יראה שגיאת שרת פרמטר, הוא יעלה מיידית UnavailableError או AbortedError . אתה יכול להפעיל מחדש את הרכז במקרה זה. גם הרכז עצמו עלול להיות בלתי זמין. לכן, מומלץ להשתמש בכלים מסוימים כדי לא לאבד את התקדמות האימון:

  • עבור Model.fit , עליך להשתמש בהתקשרות חוזרת של BackupAndRestore , המטפלת בשמירת ההתקדמות ובשחזור אוטומטית. עיין בסעיף התקשרויות והדרכה למעלה לדוגמא.

  • עבור לולאת אימון מותאמת אישית, עליך לבדוק את משתני המודל מעת לעת ולטעון משתני מודל מנקודת ביקורת, אם ישנה, ​​לפני תחילת האימון. ניתן להסיק את התקדמות האימון בקירוב מ- optimizer.iterations אם יש נקודת ביקורת לאופטימיזציה:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

שליפת ערך RemoteValue

RemoteValue מובטחת שתצליח אם פונקציה מבוצעת בהצלחה. הסיבה לכך היא שכרגע ערך ההחזרה מועתק מיד לרכז לאחר ביצוע פונקציה. אם יש כשל של עובד במהלך ההעתקה, הפונקציה תתנסה שוב בעובד זמין אחר. לכן, אם ברצונך לבצע אופטימיזציה לביצועים, תוכל לתזמן פונקציות ללא ערך החזרה.

דיווח שגיאות

ברגע שהמתאם יראה שגיאה כגון UnavailableError משרתי פרמטרים או שגיאות יישום אחרות כגון InvalidArgument מ- tf.debugging.check_numerics , הוא יבטל את כל הפונקציות הממתינות והתורן לפני העלאת השגיאה. RemoteValue התואמים שלהם תעלה שגיאת CancelledError .

לאחר העלאת שגיאה, הרכז לא יעלה את אותה שגיאה או כל שגיאה מפונקציות שבוטלו.

שיפור ביצועים

ישנן מספר סיבות אפשריות אם אתה רואה בעיות בביצועים כאשר אתה מתאמן עם ParameterServerStrategy ו- ClusterResolver .

אחת הסיבות הנפוצות היא שלשרתי פרמטרים יש עומס לא מאוזן וכמה שרתי פרמטרים בעלי עומסים כבדים הגיעו לקיבולת. יכולות להיות גם סיבות שורש מרובות. כמה שיטות פשוטות למתן בעיה זו הן:

  1. חלק את משתני המודל הגדולים שלך באמצעות ציון משתני_מחיצת variable_partitioner בעת בניית ParameterServerStrategy .
  2. הימנע מיצירת משתנה נקודה חמה שנדרש על ידי כל שרתי הפרמטרים בשלב אחד, אם אפשר. לדוגמה, השתמש בקצב למידה קבוע או תת-מחלקה tf.keras.optimizers.schedules.LearningRateSchedule באופטימייזרים שכן התנהגות ברירת המחדל היא שקצב הלמידה יהפוך למשתנה המוצב בשרת פרמטר מסוים ומתבקש על ידי כל שאר שרתי הפרמטרים בכל שלב .
  3. ערבב את אוצר המילים הגדולים שלך לפני שתעביר אותם לשכבות העיבוד המקדים של Keras.

סיבה אפשרית נוספת לבעיות ביצועים היא הרכז. ההטמעה הראשונה שלך של schedule / join היא מבוססת Python ולכן עשויה להיות תקורה של שרשור. כמו כן, ההשהיה בין הרכז לעובדים יכולה להיות גדולה. אם זה המקרה,

  • עבור Model.fit , אתה יכול להגדיר ארגומנט steps_per_execution המסופק ב- Model.compile לערך גדול מ-1.

  • עבור לולאת אימון מותאמת אישית, אתה יכול לארוז שלבים מרובים לתוך tf.function יחיד:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

ככל שהספרייה עוברת אופטימיזציה נוספת, אני מקווה שרוב המשתמשים לא יצטרכו לארוז ידנית שלבים בעתיד.

בנוסף, טריק קטן לשיפור ביצועים הוא לתזמן פונקציות ללא ערך החזרה כפי שהוסבר בסעיף כשל במשימה שלמעלה.

מגבלות ידועות

רוב המגבלות הידועות כבר מכוסות בסעיפים לעיל. סעיף זה מספק תקציר.

ParameterServerStrategy כללי

  • os.environment["grpc_fail_fast"]="use_caller" בכל משימה, כולל הרכז, כדי לגרום לסובלנות לתקלות לפעול כהלכה.
  • אין תמיכה באימון שרת פרמטר סינכרוני.
  • בדרך כלל יש צורך לארוז מספר שלבים לפונקציה אחת כדי להשיג ביצועים מיטביים.
  • אין תמיכה בטעינת saved_model דרך tf.saved_model.load המכיל משתנים מפוצלים. שים לב שטעינת saved_model כזה באמצעות TensorFlow Serving צפויה לעבוד.
  • אין תמיכה בטעינת נקודת ביקורת המכילה משתני חריצי אופטימיזציה מפוצלים למספר שונה של רסיסים.
  • אין תמיכה בהתאוששות מכשל בשרת פרמטרים מבלי להפעיל מחדש את משימת הרכז.
  • השימוש ב- tf.lookup.StaticHashTable (המועסק בדרך כלל על ידי כמה שכבות עיבוד מקדים של Keras, כגון tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup ו- tf.keras.layers.TextVectorization ) מביא למשאבים המוצבים ב- הרכז בשלב זה עם הכשרת שרת פרמטרים. יש לכך השלכות ביצועים על חיפוש RPCs מהעובדים לרכז. זוהי עדיפות גבוהה כיום לטיפול.

פרטי Model.fit

  • נדרש ארגומנט steps_per_epoch ב- Model.fit . אתה יכול לבחור ערך המספק מרווחים מתאימים בתקופה.
  • ל- ParameterServerStrategy אין תמיכה בהתקשרויות חוזרות מותאמות אישית שיש להן קריאות ברמת אצווה מטעמי ביצועים. אתה צריך להמיר את הקריאות האלה לקריאות ברמת עידן עם steps_per_epoch שנבחרו כהלכה, כך שהם נקראים בכל מספר steps_per_epoch -תקופה של צעדים. שיחות חוזרות מובנות אינן מושפעות: השיחות ברמת האצווה שלהם שונו לביצועים. תמיכה בקריאות ברמת אצווה עבור ParameterServerStrategy מתוכננת.
  • מאותה סיבה, בניגוד לאסטרטגיות אחרות, סרגל ההתקדמות והמדדים מתועדים רק בגבולות עידן.
  • run_eagerly אינו נתמך.

פרטים על לולאת אימון מותאמת אישית