לולאת אימון מותאמת אישית עם Keras ו-MultiWorkerMirroredStrategy

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

מדריך זה מדגים הדרכה מרובת עובדים עם API של לולאת הדרכה מותאמת אישית, המופץ באמצעות MultiWorkerMirroredStrategy, כך שמודל Keras שנועד לרוץ על עובד יחיד יכול לעבוד בצורה חלקה על מספר עובדים עם שינוי קוד מינימלי.

אנו משתמשים בלולאות אימון מותאמות אישית כדי לאמן את המודל שלנו מכיוון שהם נותנים לנו גמישות ושליטה רבה יותר באימונים. יתר על כן, קל יותר לנפות באגים במודל ובלולאת האימון. מידע מפורט יותר זמין בכתיבת לולאת אימון מאפס .

אם אתה מחפש כיצד להשתמש ב- MultiWorkerMirroredStrategy עם keras model.fit , עיין במדריך זה במקום זאת.

מדריך הדרכה מבוזרת ב-TensorFlow זמין עבור סקירה כללית של אסטרטגיות ההפצה שבהן תומכת TensorFlow למי שמעוניין בהבנה מעמיקה יותר של ממשקי API של tf.distribute.Strategy .

להכין

ראשית, כמה יבוא נחוץ.

import json
import os
import sys

לפני ייבוא ​​TensorFlow, בצע מספר שינויים בסביבה.

השבת את כל ה-GPUs. זה מונע שגיאות הנגרמות על ידי העובדים שכולם מנסים להשתמש באותו GPU. עבור יישום אמיתי כל עובד יהיה על מכונה אחרת.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

אפס את משתנה הסביבה TF_CONFIG , תראה יותר על כך מאוחר יותר.

os.environ.pop('TF_CONFIG', None)

ודא שהספרייה הנוכחית נמצאת בנתיב של python. זה מאפשר למחברת לייבא את הקבצים שנכתבו על ידי %%writefile מאוחר יותר.

if '.' not in sys.path:
  sys.path.insert(0, '.')

כעת ייבא את TensorFlow.

import tensorflow as tf

מערך נתונים והגדרת מודל

לאחר מכן צור קובץ mnist.py עם מודל פשוט והגדרת מערך נתונים. קובץ פיתון זה ישמש את תהליכי העבודה במדריך זה:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

תצורת ריבוי עובדים

עכשיו בואו ניכנס לעולם ההכשרה מרובת עובדים. ב-TensorFlow, משתנה הסביבה TF_CONFIG נדרש לאימון במספר מכונות, שלכל אחת מהן יש אולי תפקיד אחר. TF_CONFIG בשימוש להלן, היא מחרוזת JSON המשמשת לציון תצורת האשכול בכל עובד שהוא חלק מהאשכול. זוהי שיטת ברירת המחדל לציון אשכול, באמצעות cluster_resolver.TFConfigClusterResolver , אך קיימות אפשרויות אחרות הזמינות במודול distribute.cluster_resolver .

תאר את האשכול שלך

להלן תצורה לדוגמה:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

הנה אותו TF_CONFIG כמחרוזת JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

ישנם שני מרכיבים של TF_CONFIG : cluster task .

  • cluster זהה עבור כל העובדים ומספק מידע על אשכול ההכשרה, שהוא גזרה המורכבת מסוגים שונים של משרות כגון worker . בהכשרה מרובת עובדים עם MultiWorkerMirroredStrategy , יש בדרך כלל worker אחד שלוקח על עצמו קצת יותר אחריות כמו שמירת מחסום וכתיבת קובץ סיכום עבור TensorBoard בנוסף למה worker רגיל עושה. עובד כזה מכונה העובד chief , ומקובל worker עם index 0 מתמנה worker הראשי (למעשה כך tf.distribute.Strategy ).

  • task מספקת מידע על המשימה הנוכחית והיא שונה בכל עובד. הוא מציין את type index של אותו עובד.

בדוגמה זו, אתה מגדיר את type המשימה "worker" ואת index המשימה ל 0 . מכונה זו היא העובדת הראשונה והיא תתמנה כעובדת הראשית ותעשה יותר עבודה מהאחרות. שים לב שלמכונות אחרות יהיה צורך להגדיר גם את משתנה הסביבה TF_CONFIG , והוא צריך להיות בעל אותה cluster , אך type משימות או index משימות שונה בהתאם לתפקידים של המכונות הללו.

למטרות המחשה, מדריך זה מראה כיצד ניתן להגדיר TF_CONFIG עם 2 עובדים על localhost . בפועל, משתמשים היו יוצרים מספר עובדים על כתובות IP חיצוניות/יציאות, TF_CONFIG על כל עובד בצורה מתאימה.

בדוגמה זו תשתמש ב-2 עובדים, ה- TF_CONFIG של העובד הראשון מוצג למעלה. עבור העובד השני היית מגדיר tf_config['task']['index']=1

למעלה, tf_config הוא רק משתנה מקומי ב-python. כדי להשתמש בו בפועל כדי להגדיר אימון, יש לבצע סדרה של מילון זה בתור JSON, ולהציב אותו במשתנה הסביבה TF_CONFIG .

משתני סביבה ותתי תהליכים במחברות

תהליכי משנה יורשים משתני סביבה מהאב שלהם. אז אם אתה מגדיר משתנה סביבה בתהליך jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

אתה יכול לגשת למשתנה הסביבה מתת-תהליכים:

echo ${GREETINGS}
Hello TensorFlow!

בסעיף הבא, תשתמש בזה כדי להעביר את ה- TF_CONFIG לתת-תהליכי העבודה. לעולם לא היית באמת משיק את העבודות שלך בדרך זו, אבל זה מספיק למטרות המדריך הזה: כדי להדגים דוגמה מינימלית של ריבוי עובדים.

אסטרטגיית MultiWorkerMirrored

כדי לאמן את המודל, השתמש במופע של tf.distribute.MultiWorkerMirroredStrategy , היוצר עותקים של כל המשתנים בשכבות המודל בכל מכשיר על פני כל העובדים. במדריך tf.distribute.Strategy יש פרטים נוספים על אסטרטגיה זו.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

השתמש tf.distribute.Strategy.scope כדי לציין שיש להשתמש באסטרטגיה בעת בניית המודל שלך. זה מכניס אותך ל"הקשר צולב העתקים " עבור אסטרטגיה זו, כלומר האסטרטגיה ניתנת לשליטה על דברים כמו מיקום משתנה.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

חלוקה אוטומטית של הנתונים שלך בין עובדים

בהכשרה מרובת עובדים, אין בהכרח צורך בפיצול מערכי נתונים, אולם זה נותן לך סמנטיקה חד פעמית בדיוק, מה שהופך את ההכשרה לניתנת יותר לשחזור, כלומר הכשרה על מספר עובדים צריכה להיות זהה לאימון על עובד אחד. הערה: הביצועים עשויים להיות מושפעים במקרים מסוימים.

ראה: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

הגדר לולאת אימון מותאמת אישית ואימון המודל

ציין אופטימיזציה

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

הגדר שלב אימון באמצעות tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

שמירה ושחזור של מחסומים

הטמעת נקודת ביקורת בלולאת אימון מותאמת אישית מחייבת את המשתמש לטפל בה במקום להשתמש בהתקשרות חוזרת של Keras. זה מאפשר לך לשמור משקלים של הדגם ולשחזר אותם מבלי לשמור את כל הדגם.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

כאן, תיצור tf.train.Checkpoint אחד שעוקב אחר הדגם, המנוהל על ידי tf.train.CheckpointManager כך שרק נקודת המחסום העדכנית נשמרת.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

כעת, כאשר אתה צריך לשחזר, אתה יכול למצוא את המחסום האחרון שנשמר באמצעות הפונקציה הנוחה tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

לאחר שחזור המחסום, תוכל להמשיך באימון לולאת האימון המותאמת אישית שלך.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

הגדרת קוד מלאה על עובדים

כדי לפעול בפועל עם MultiWorkerMirroredStrategy תצטרך להפעיל תהליכי עבודה ולהעביר להם TF_CONFIG .

כמו הקובץ mnist.py שנכתב קודם לכן, הנה ה- main.py שמכיל את אותו הקוד שעברנו עליו שלב אחר שלב קודם לכן בקולאב הזה, אנחנו פשוט כותבים אותו לקובץ כך שכל אחד מהעובדים יריץ אותו:

קובץ: main.py

Writing main.py

לאמן ולהעריך

הספרייה הנוכחית מכילה כעת את שני קבצי Python:

ls *.py
main.py
mnist.py

אז ה-json הסדר את ה- TF_CONFIG והוסף אותו למשתני הסביבה:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

כעת, אתה יכול להפעיל תהליך עבודה שיריץ את ה- main.py ולהשתמש ב- TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

יש לשים לב לכמה דברים לגבי הפקודה לעיל:

  1. הוא משתמש ב- %%bash שהוא "קסם" של מחברת כדי להפעיל כמה פקודות bash.
  2. הוא משתמש בדגל --bg כדי להפעיל את תהליך ה- bash ברקע, מכיוון שהעובד הזה לא יסתיים. זה מחכה לכל העובדים לפני שהוא מתחיל.

תהליך העבודה ברקע לא ידפיס פלט למחברת זו, אז &> מפנה את הפלט שלו לקובץ, כדי שתוכל לראות מה קרה.

לכן, המתן מספר שניות עד שהתהליך יתחיל:

import time
time.sleep(20)

כעת תראה מה יצא לקובץ היומן של העובד עד כה:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

השורה האחרונה של קובץ היומן צריכה לומר: Started server with target: grpc://localhost:12345 . העובד הראשון מוכן כעת, ומחכה שכל שאר העובדים יהיו מוכנים להמשיך.

אז עדכן את tf_config כדי שהתהליך של העובד השני יאסוף:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

כעת הפעל את העובד השני. זה יתחיל את ההכשרה מכיוון שכל העובדים פעילים (לכן אין צורך לרקע תהליך זה):

python main.py > /dev/null 2>&1

כעת, אם תבדוק שוב את היומנים שנכתבו על ידי העובד הראשון, תראה שהוא השתתף בהכשרת המודל הזה:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

הכשרה רב עובדים לעומק

מדריך זה הדגים זרימת עבודה של Custom Training Loop של ההגדרה מרובת עובדים. תיאור מפורט של נושאים אחרים זמין model.fit's guide ההגדרה מרובת עובדים וישים ל-CTLs.

ראה גם

  1. מדריך הדרכה מבוזרת ב- TensorFlow מספק סקירה כללית של אסטרטגיות ההפצה הזמינות.
  2. מודלים רשמיים , שרבים מהם יכולים להיות מוגדרים להפעלת אסטרטגיות הפצה מרובות.
  3. קטע הביצועים במדריך מספק מידע על אסטרטגיות וכלים אחרים שבהם אתה יכול להשתמש כדי לייעל את הביצועים של דגמי TensorFlow שלך.