![]() | ![]() | ![]() | ![]() |
סקירה כללית
אימון לשרת פרמטרים הוא שיטה מקבילה לנתונים המקבילה להגדלת אימוני מודלים במספר מכונות. מקבץ אימונים לשרת פרמטרים מורכב מעובדים ושרתי פרמטרים. משתנים נוצרים בשרתי פרמטרים והם נקראים ומתעדכנים על ידי עובדים בכל שלב. כברירת מחדל, עובדים קוראים ומעדכנים משתנים אלה באופן עצמאי מבלי להסתנכרן זה עם זה. זו הסיבה שלעיתים אימונים בסגנון שרת פרמטרים נקראים אימון אסינכרוני.
הכשרת שרתים פרמטרים של TensorFlow 2 משתמשת במתאם מרכזי דרך המחלקה tf.distribute.experimental.coordinator.ClusterCoordinator
.
ביישום זה משימות parameter server
worker
parameter server
מריצות את tf.distribute.Server
שמאזינים לבקשות tf.distribute.Server
. הרכז יוצר משאבים, משגר משימות אימונים, כותב מחסומים ומתמודד עם כשלים במשימה.
אנו מאמינים שארכיטקטורה זו ClusterCoordinator
החדשה מספקים מודל תכנות גמיש ופשוט יותר.
ClusterCoordinator
הכיתה ClusterCoordinator
צריכה לעבוד בשיתוף עם אובייקט tf.distribute.Strategy
. יש צורך באובייקט tf.distribute.Strategy
כדי להעביר את המידע של האשכול ומשמש להגדרת צעד אימון כפי שראינו באימונים מותאמים אישית עם MirroredStrategy
. האובייקט ClusterCoordinator
ואז שולח את ביצוע שלבי ההדרכה האלה לעובדים מרוחקים. נכון לעכשיו, ClusterCoordinator
עובד רק עם tf.distribute.experimental.ParameterServerStrategy
.
ה- API החשוב ביותר שמספק האובייקט ClusterCoordinator
הוא schedule
. ה- API של schedule
צופה פונקציה tf.function
. ומחזיר RemoteValue
דמוי RemoteValue
באופן מיידי. הפונקציות בתור יישלחו לעובדים מרוחקים RemoteValue
הרקע ו- RemoteValue
שלהם RemoteValue
בצורה אסינכרונית. מכיוון schedule
אינו מצריך הקצאת עובדים, ניתן לבצע את tf.function
. על כל עובד זמין. אם העובד בו הוא מבוצע לא יהיה זמין לפני השלמתו, הפונקציה תנוסה שוב על עובד זמין אחר. בגלל עובדה זו והעובדה שביצוע פונקציות אינו אטומי, ניתן לבצע פונקציה יותר מפעם אחת.
בנוסף למשלוח פונקציות מרחוק, ClusterCoordinator
מסייע גם ליצור מערכי נתונים על כל העובדים ולבנות מחדש מערכי נתונים אלה כאשר עובד מתאושש מכישלון.
הגדרת הדרכה
pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl
הגדרת אשכול
כאמור לעיל, אשכול אימון לשרת פרמטרים מחייב מטלת רכז שמפעילה את תוכנית ההדרכה שלך, עובד אחד או כמה ומשימות שרת פרמטרים המפעילות שרתי TensorFlow, כלומר tf.distribute.Server
, ואולי מטלת הערכה נוספת המפעילה צד-רכב הערכה (ראה סעיף הערכת רכב צדדי בהמשך). הדרישות להגדרתן הן:
- משימת הרכז צריכה לדעת את הכתובות והיציאות של כל שאר השרתים של TensorFlow למעט המעריך.
- העובדים ושרתי הפרמטרים צריכים לדעת לאיזו פורט הם צריכים להאזין. למען הפשטות, בדרך כלל אנו מעבירים את מידע האשכול המלא כאשר אנו יוצרים שרתי TensorFlow במשימות אלה.
- משימת המעריך אינה חייבת להכיר את הגדרת אשכול ההדרכה. אם כן, הוא לא צריך לנסות להתחבר לאשכול האימונים.
- עובדים ושרתי פרמטרים צריכים שיהיו להם סוגי משימות כ"עובד "ו-" ps "בהתאמה. על הרכז להשתמש ב"צ'יף "כסוג המשימה מסיבות מורשת.
במדריך זה, ניצור אשכול בתהליך כך שניתן יהיה להריץ את כל אימוני שרת הפרמטרים ב- colab. נציג כיצד להקים אשכולות אמיתיים בחלק מאוחר יותר.
אשכול בתהליך
במדריך זה נתחיל מראש חבורה של שרתי TensorFlow ונתחבר אליהם מאוחר יותר:
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec, job_name="worker", task_index=i, config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec, job_name="ps", task_index=i, protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
אימון בלולאת אימונים בהתאמה אישית
לולאת אימון מותאמת אישית עם tf.distribute.Strategy
מספקת גמישות רבה להגדרת לולאות אימון. נכון לעכשיו לאימון שרת פרמטרים ב- TensorFlow 2, נתמך רק לולאת אימון מותאמת אישית. כאן אנו משתמשים ב- ParameterServerStrategy
כדי להגדיר שלב הכשרה ואז משתמשים ב- ClusterCoordinator
כדי להעביר את ביצוע שלבי ההדרכה לעובדים מרוחקים.
צור את ParameterServerStrategy
כדי לכתוב שלב אימונים בלולאת אימונים מותאמת אישית, הצעד הראשון הוא ליצור ParameterServerStrategy
. נסביר את variable_partitioner
בהמשך.
variable_partitioner = (
tf.distribute.experimental.partitioners.FixedShardsPartitioner(
num_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0' INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
לאחר מכן תיצור מודל, תגדיר מערך נתונים ופונקציה צעד כפי שראינו בלולאת האימונים עם tf.distribute.Strategy
אחרים. ניתן למצוא פרטים נוספים זה הדרכה . בואו ניצור רכיבים אלה בשלבים הבאים:
הגדר את הנתונים
ראשית, כתוב פונקציה שיוצרת מערך נתונים הכולל לוגיקה של עיבוד מקדים המיושם על ידי שכבות עיבוד מקדימות של Keras. ניצור שכבות אלה מחוץ ל- dataset_fn
אך נפעיל את השינוי בתוך dataset_fn
מכיוון שתעטוף את dataset_fn
tf.function
שלא מאפשרת ליצור משתנים בתוכו.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
"wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)
label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = keras.layers.Input(
shape=(3,), dtype=tf.string, name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = keras.Model(
{"features": raw_feature_input}, feature_id_input)
raw_label_input = keras.layers.Input(
shape=(1,), dtype=tf.string, name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)
צור דוגמאות צעצוע במערך נתונים:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
לאחר מכן אנו יוצרים את מערך האימונים עטוף ב- dataset_fn:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
בנה את המודל
שנית, אנו יוצרים את המודל ואובייקטים אחרים. הקפד ליצור את כל המשתנים תחת strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with KPLs.
model_input = keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = keras.Model({"features": model_input}, dense_output)
optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = keras.metrics.Accuracy()
הגדירו את שלב האימון
שלישית, צור את שלב האימון העטוף tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
בפונקציית הצעד לעיל, קריאה strategy.run
strategy.reduce
ב- step_fn
שימושיות לתמיכה ב- GPUs או במספר רב של עובדי העתקים בעתיד, אם כי יש להם יישום טריוויאלי ברגע זה.
העברת שלבי הדרכה לעובדים מרוחקים
לאחר שכל החישובים מוגדרים על ידי ParameterServerStrategy
, נשתמש במחלקת ClusterCoordinator
כדי ליצור משאבים ולהפיץ את שלבי ההדרכה לעובדים מרוחקים.
בואו ניצור תחילה אובייקט ClusterCoordinator
לאובייקט האסטרטגיה:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
לאחר מכן אנו יוצרים מערך נתונים לעובד ואיטרטור. ב- per_worker_dataset_fn
למטה, עטיפת מערך dataset_fn
strategy.distribute_datasets_from_function
dataset_fn
היא אופציונלית, אך היא תאפשר תמיכה dataset_fn
מראש יעילה ל- GPUs בצורה חלקה בעתיד כאשר GPUs נתמכים על ידי ParameterServerStrategy
.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
השלב האחרון הוא הפצת החישוב לעובדים מרוחקים לפי schedule
. שיטת schedule
מציגה פונקציה tf.function
. ומחזירה באופן מיידי RemoteValue
. הפונקציות בתור יישלחו לעובדים מרוחקים RemoteValue
הרקע וה- RemoteValue
יתמלא בצורה אסינכרונית. ניתן להשתמש בשיטת join
להמתין עד שכל הפונקציות המתוזמנות יוצגו.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). Finished epoch 0, accuracy is 0.462500. Finished epoch 1, accuracy is 0.925000. Finished epoch 2, accuracy is 1.000000. Finished epoch 3, accuracy is 1.000000.
כך תוכל להביא את התוצאה של RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665
לחלופין, אתה יכול להפעיל את כל השלבים ולעשות משהו בזמן ההמתנה לסיום:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
לקבלת תהליך ההדרכה וההגשה המלא עבור דוגמה מסוימת זו, עיין במבחן זה.
מידע נוסף על יצירת מערך נתונים
מערך הנתונים בקוד הנ"ל נוצר באמצעות ה- API של create_per_worker_dataset
. זה יוצר מערך נתונים אחד לעובד ומחזיר אובייקט מכולה. אתה יכול לקרוא לשיטת iter
עליו כדי ליצור איטרטור לעובד. האיטרטור לעובד מכיל איטרטור אחד לעובד והפרוסה המתאימה של עובד תוחלף בארגומנט הקלט של הפונקציה שהועברה לשיטת schedule
לפני ביצוע הפונקציה על עובד מסוים.
נכון לעכשיו, שיטת schedule
מניחה שהעובדים שווים ולכן הם מעריכים שמערכי הנתונים על עובדים שונים זהים, אלא שהם עשויים להיות מעורבבים אחרת אם הם מכילים פעולת dataset.shuffle מסיבה זו, אנו ממליצים גם לחזור על מערכי הנתונים ללא הגבלת זמן ולתזמן מספר סופי של צעדים במקום להסתמך על OutOfRangeError
ממערך נתונים.
הערה חשובה נוספת היא tf.data
נתונים של tf.data
אינם תומכים tf.data
מרומזים מעבר לגבולות המשימה. לכן חשוב ליצור את כל מערך הנתונים בתוך הפונקציה המועברת ל- create_per_worker_dataset
.
רסיסים משתנים
רסיסה משתנה מתייחסת לפיצול משתנה למספר משתנים קטנים יותר. אנו מכנים את המשתנים הקטנים האלה רסיסים . רטוש משתנה עשוי להיות שימושי להפצת עומס הרשת בעת גישה לרסיסים אלה. כדאי גם להפיץ חישוב ואחסון של משתנה רגיל על פני שרתי פרמטרים מרובים.
כדי לאפשר sharding משתנה, אתה יכול לעבור בתוך variable_partitioner
בעת בניית ParameterServerStrategy
האובייקט. variable_partitioner
יופעל בכל פעם שייווצר משתנה והוא צפוי להחזיר את מספר השברים לאורך כל מימד של המשתנה. חלק tf.distribute.experimental.partitioners.FixedShardsPartitioner
variable_partitioner
מחוץ לקופסה, כגון tf.distribute.experimental.partitioners.FixedShardsPartitioner
.
בדוגמה שלעיל אנו משתמשים ב- FixedShardsPartitioner
אשר FixedShardsPartitioner
את כל המשתנים לשני רסיסים וכל רסיס יוקצה לשרתי פרמטרים שונים:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
כאשר variable_partitioner
מועבר ואם אתה יוצר משתנה ישירות תחת strategy.scope()
, היא תהפוך לסוג מכל עם variables
רכוש אשר מספק גישה לרשימת שברים. ברוב המקרים, מיכל זה יומר אוטומטית לטנסור על ידי שרשור כל הרסיסים. כתוצאה מכך, ניתן להשתמש בו כמשתנה רגיל. מצד שני, כמה משיטות TensorFlow כגון tf.nn.embedding_lookup
מספקות יישום יעיל לסוג מכולה זה ובשיטות אלה tf.nn.embedding_lookup
אוטומטי.
לקבלת פרטים נוספים, עיין בתיעוד ה- API של ParameterServerStrategy
.
הַעֲרָכָה
יש יותר מדרך אחת להגדיר ולהפעיל לולאת הערכה באימונים מבוזרים. לכל אחד יתרונות וחסרונות משלו כמתואר להלן. מומלץ להשתמש בשיטת הערכה מקוונת אם אין לך העדפה.
הערכה מוטבעת
בשיטה זו הרכז מתחלף בין הכשרה להערכה ולכן אנו מכנים אותה הערכה מוטבעת. ישנם מספר יתרונות של הערכה מקוונת. לדוגמה, הוא יכול לתמוך במודלי הערכה גדולים ובמערכות נתונים של הערכה שמשימה אחת אינה יכולה להחזיק. לדוגמא אחרת, ניתן להשתמש בתוצאות ההערכה לקבלת החלטות להכשרה בעידן הבא.
ישנן שתי דרכים ליישום הערכה מקוונת:
- הערכה ישירה - עבור מודלים קטנים ומערכי נתונים של הערכה הרכז יכול להריץ הערכה ישירות על המודל המבוזר עם מערך ההערכה על הרכז:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- הערכה מבוזרת - עבור מודלים גדולים או מערכי נתונים שאינם ניתנים להפעלה ישירות על הרכז, משימת הרכז יכולה להפיץ מטלות הערכה לעובדים באמצעות שיטות
schedule
/join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
הערכת רכב צדדי
שיטה אחרת נקראת הערכת מכונית צדדית שהיא יצירת משימת מעריך ייעודית שקוראת שוב ושוב מחסומים ומריצה הערכה במחסום אחרון. זה מאפשר לתוכנית האימונים שלך להסתיים מוקדם אם אינך צריך לשנות את לולאת האימונים שלך על סמך תוצאות הערכה. עם זאת, נדרשת משימת מעריך נוספת ומחסום תקופתי בכדי להפעיל את ההערכה. להלן לולאת הערכת רכב צדדית אפשרית:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
אשכולות בעולם האמיתי
בסביבת ייצור אמיתית, תריץ את כל המשימות בתהליכים שונים במכונות שונות. הדרך הפשוטה ביותר להגדיר מידע על אשכול בכל משימה היא להגדיר משתני סביבה "TF_CONFIG" ולהשתמש ב- TFConfigClusterResolver
לניתוח "TF_CONFIG". לתיאור כללי אודות משתני הסביבה "TF_CONFIG", עיין במדריך ההדרכה המבוזר .
אם אתה מתחיל את משימות האימון שלך באמצעות Kubernetes או תבניות תצורה אחרות, סביר מאוד להניח שתבניות אלה כבר הגדירו לך "TF_CONFIG".
הגדר את משתנה הסביבה "TF_CONFIG"
נניח שיש לך 3 עובדים ו -2 שרתי פרמטרים, "TF_CONFIG" של עובד 1 יכול להיות:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
"TF_CONFIG" של המעריך יכול להיות:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
החלק "אשכול" במחרוזת "TF_CONFIG" לעיל עבור המעריך הוא אופציונלי.
אם אתה משתמש באותו בינארי לכל המשימות
אם אתה מעדיף להפעיל את כל המשימות האלה באמצעות בינארי יחיד, יהיה עליך לתת לתוכנית שלך להסתעף בתפקידים שונים כבר בהתחלה:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# run side-car evaluation
else:
# run the coordinator.
הקוד הבא מפעיל שרת TensorFlow ומחכה:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
טיפול בכישלון משימות
כישלון עובדים
כאמור לעיל, ל- ClusterCoordinator
יש סובלנות תקלות מובנית לכישלון עובדים. לאחר התאוששות העובד, נתח create_per_worker_dataset
הנתונים שנוצר על ידי create_per_worker_dataset
שעדיין נמצא בתחום ייצור מחדש על ידי הפעלת dataset_fn
המקורי שהועבר ל- create_per_worker_dataset
.
שרת פרמטרים או כשל הרכז
עם זאת, כאשר המתאם יראה שגיאת שרת פרמטרים, הוא יעלה שגיאה UnavailableError
או AbortedError
באופן מיידי. אתה יכול להפעיל מחדש את הרכז במקרה זה. הרכז עצמו יכול גם להיות לא זמין. לכן, כדי לא לאבד הרבה מהתקדמות האימון, חשוב לבדוק את משתני המודל מעת לעת ולהעמיס משתני מודלים ממחסום, אם בכלל, לפני תחילת האימון. ניתן להסיק את התקדמות האימון בערך ממיטוב optimizer.iterations
אם אופטימיזציה מסומנת.
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
להביא ערך RemoteValue
מובטח כי RemoteValue
תצליח אם פונקציה תבוצע בהצלחה. הסיבה לכך היא שכרגע ערך ההחזר מועתק מיד לרכז לאחר ביצוע פונקציה. אם ישנו כשל עובד במהלך ההעתקה, הפונקציה תנוסה שוב על עובד זמין אחר. לכן, אם ברצונך לבצע אופטימיזציה לביצועים, תוכל לתזמן פונקציות ללא ערך החזר.
דיווח שגיאות
ברגע InvalidArgument
יראה שגיאה כגון UnavailableError
משרתי פרמטרים או שגיאות יישום אחרות כגון InvalidArgument
מ- tf.debugging.check_numerics
, הוא יבטל את כל הפונקציות הממתינות tf.debugging.check_numerics
לפני העלאת השגיאה. שליפת המקביל שלהם RemoteValue
הים תעלה CancelledError
.
לאחר העלאת שגיאה, הרכז לא יעלה את אותה שגיאה או שגיאה כלשהי מפונקציות שבוטלו.
שיפור ביצועים
ישנן מספר סיבות אפשריות אם אתה רואה בעיות ביצועים כאשר אתה מתאמן עם ParameterServerStrategy
ו- ClusterResolver
.
אחת הסיבות הנפוצות היא ששרתי פרמטרים הם בעלי עומס לא מאוזן וכמה שרתי פרמטרים עמוסים בכבדות הגיעו לקיבולת. יכולות להיות גם גורמי שורש מרובים. כמה שיטות פשוטות למתן בעיה זו הן
- שבר את משתני המודל הגדולים שלך באמצעות ציון
variable_partitioner
בעת בנייתParameterServerStrategy
. - הימנע מיצירת משתנה נקודה חמה הנדרש על ידי כל שרתי הפרמטרים בשלב יחיד במידת האפשר. לדוגמה, השתמש בקצב למידה קבוע או
tf.keras.optimizers.schedules.LearningRateSchedule
משנהtf.keras.optimizers.schedules.LearningRateSchedule
אופטימיזציה מכיוון שהתנהגות ברירת המחדל היא שקצב הלמידה יהפוך למשתנה המוצב בשרת פרמטרים מסויםtf.keras.optimizers.schedules.LearningRateSchedule
ידי כל שרתי הפרמטרים האחרים בכל שלב. . - ערבב את אוצר המילים הגדול שלך לפני שתעביר אותם לשכבות העיבוד המקדימות של Keras.
סיבה אפשרית נוספת לבעיות ביצוע היא הרכז. היישום הראשון שלנו של schedule
/ join
הוא מבוסס פיתון ולכן עשוי להיות משורשר. גם האיחור בין הרכז לעובדים יכול להיות גדול. אם זה המקרה, אתה יכול לארוז מספר שלבים tf.function
אחת:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
אנו נמשיך לייעל את הרכז ומקווה שרוב המשתמשים לא יצטרכו לארוז צעדים ידנית בעתיד.
בנוסף, טריק קטן לשיפור ביצועים הוא לתזמן פונקציות ללא ערך החזר כמוסבר בסעיף כישלון משימות הטיפול לעיל.
מגבלות ידועות
רוב המגבלות הידועות מכוסות בסעיפים לעיל. הנה סיכום:
-
os.environment["grpc_fail_fast"]="use_caller"
נדרש בכל משימה, כולל הרכז, כדי לגרום לסובלנות תקלות לעבוד כראוי. - עובדי GPU אינם נתמכים.
- אימון שרת פרמטרים סינכרוני אינו נתמך.
-
ParameterServerStrategy
לא עובד עם Kerascompile
ואתfit
APIs. -
ClusterCoordinator.schedule
אינו תומך בערבויות ביקור עבור מערך נתונים. - כאשר משתמשים ב-
ClusterCoordinator.create_per_worker_dataset
, יש ליצור את כל מערך הנתונים בתוך הפונקציה המועברת אליו. - בדרך כלל יש צורך לארוז מספר שלבים לפונקציה אחת כדי להשיג ביצועים מיטביים.
- זה לא נתמך לטעון את הציל_מודל באמצעות
tf.saved_model.load
המכיל משתנים מרוסקים. הערה טעינה של הצלת דוגמה כזו באמצעות הגשת TensorFlow צפויה לעבוד. - אין תמיכה בהטענת משתני חריץ אופטימיזציה מרוסקים מחסום למספר רסיסים אחר.
- לא נתמך להתאושש מכשל בשרת הפרמטרים מבלי להפעיל מחדש את משימת הרכז.