עזרה להגן על שונית המחסום הגדולה עם TensorFlow על Kaggle הצטרפו אתגר

הכשרה מרובת עובדים עם קרס

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

הדרכה זו מדגימה כיצד לבצע אימונים מופצים רב עובדים עם מודל Keras ואת Model.fit API באמצעות tf.distribute.Strategy API-דווקא tf.distribute.MultiWorkerMirroredStrategy בכיתה. בעזרת אסטרטגיה זו, מודל Keras שתוכנן לפעול על עובד יחיד יכול לעבוד בצורה חלקה על מספר עובדים עם שינויי קוד מינימליים.

למעוניינים הבנה עמוקה יותר של tf.distribute.Strategy אפיס, הכשרה שהופצו TensorFlow מדריך זמין עבור סקירה של אסטרטגיות הפצה TensorFlow תומך.

כדי ללמוד כיצד להשתמש MultiWorkerMirroredStrategy עם Keras ו לולאת אימונים מותאמים אישית, מתייחס לולאת אימונים מותאמת אישית עם Keras ו MultiWorkerMirroredStrategy .

שימו לב שמטרת המדריך הזה היא להדגים דוגמה מינימלית של ריבוי עובדים עם שני עובדים.

להכין

התחל עם כמה יבוא נחוץ:

import json
import os
import sys

לפני ייבוא ​​TensorFlow, בצע מספר שינויים בסביבה:

  1. השבת את כל ה-GPUs. זה מונע שגיאות הנגרמות על ידי העובדים שכולם מנסים להשתמש באותו GPU. ביישום בעולם האמיתי, כל עובד יהיה על מכונה אחרת.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. אפס את TF_CONFIG משתנה הסביבה (תלמד עוד על זה בהמשך):
os.environ.pop('TF_CONFIG', None)
  1. ודא כי הספריה הנוכחית נמצאת של פיתון נתיב זה מאפשר את המחברת כדי לייבא את הקבצים שנכתבו על ידי %%writefile מאוחר:
if '.' not in sys.path:
  sys.path.insert(0, '.')

כעת ייבא את TensorFlow:

import tensorflow as tf

מערך נתונים והגדרת מודל

לאחר מכן, צור mnist.py קובץ עם מודל פשוט במערך ההתקנה. קובץ Python זה ישמש את תהליכי העבודה במדריך זה:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

הדרכת מודל על עובד בודד

נסה אימון המודל עבור מספר קטן של תקופות ולבחון את התוצאות של עובד יחיד לוודא שהכל פועל כהלכה. ככל שהאימון מתקדם, ההפסד אמור לרדת והדיוק צריך לעלות.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

תצורה של ריבוי עובדים

עכשיו בואו ניכנס לעולם ההכשרה מרובת עובדים.

אשכול עם משרות ומשימות

בשנת TensorFlow, הכשרה מופץ כורך בתוכו: 'cluster' עם מקומות עבודה שונים, וכל אחד מן העבודות עשויות להיות אחד או יותר 'task' s.

תזדקק TF_CONFIG משתנה הסביבה תצורה להכשרת על מספר מחשבים, שכל אחד מהם ואולי יש תפקיד שונה. TF_CONFIG הוא מחרוזת JSON משמש כדי לציין את תצורת אשכול עבור כל עובד כי הוא חלק באשכול.

ישנם שני מרכיבים של TF_CONFIG משתנה: 'cluster' ו 'task' .

  • 'cluster' הוא זהה עבור כל העובדים ומספק מידע על אשכול האימונים, שהינה dict המורכב מסוגים שונים של עבודות, כגון 'worker' או 'chief' .

    • בשנת הכשרה רבה-עובד עם tf.distribute.MultiWorkerMirroredStrategy , יש בדרך כלל אחד 'worker' שלוקח על עצמו אחריות, כגון שמירה במחסום וכתיבת קובץ סיכום עבור TensorBoard, בנוסף למה רגיל 'worker' עושה. כזה 'worker' נקרא העובד הראשי (עם שם עבודה 'chief' ).
    • נהוג עבור 'chief' יש 'index' 0 ימונו (למעשה, זהו איך tf.distribute.Strategy מיושם).
  • 'task' מספק מידע של המשימה הנוכחית היא שונה עבור כל עובד. זה מציין את 'type' ו 'index' של העובד כי.

להלן דוגמה לתצורה:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

הנה אותו TF_CONFIG בהמשכים כמחרוזת JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

שים לב tf_config הוא רק משתנה מקומי ב Python. כדי להיות מסוגל להשתמש בו עבור תצורה אימונים, dict זו חייבת להיעשות כמו JSON והושמו TF_CONFIG משתנה הסביבה.

בתצורת הדוגמא הנ"ל, אתה מגדיר את המשימה 'type' ל 'worker' ואת המשימה 'index' אל 0 . לכן, המכשיר הזה הוא העובד הראשון. זה ימונה כמו 'chief' העובד לעשות עבודה יותר מהאחרים.

לשם המחשה, מופעי הדרכה זו כיצד ייתכן להקים TF_CONFIG משתנים עם שני עובדים על localhost .

בפועל, תיצור עובדים מרובים על כתובות IP חיצוני / יציאות ולקבוע TF_CONFIG משתנה על כל עובד בהתאם.

במדריך זה, תשתמש בשני עובדים:

  • הראשון ( 'chief' ) של העובד TF_CONFIG מוצגת לעיל.
  • עבור העובד השני, תוכלו להגדיר tf_config['task']['index']=1

משתני סביבה ותתי תהליכים במחברות

תהליכי משנה יורשים משתני סביבה מהאב שלהם.

לדוגמה, אתה יכול להגדיר משתנה סביבה בתהליך זה של Jupyter Notebook באופן הבא:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

לאחר מכן, תוכל לגשת למשתנה הסביבה מתת-תהליכים:

echo ${GREETINGS}
Hello TensorFlow!

בחלק הבא, תוכל להשתמש בשיטה דומה כדי להעביר את TF_CONFIG אל subprocesses עובד. בתרחיש של העולם האמיתי, לא היית משיק את העבודות שלך בדרך זו, אבל זה מספיק בדוגמה זו.

בחר את האסטרטגיה הנכונה

ב-TensorFlow, קיימות שתי צורות עיקריות של הכשרה מבוזרת:

  • אימון סינכרוני, שבו הצעדים של אימונים יסונכרנו בין העובדים לבין ההעתקים, ו
  • אימון אסינכרוני, שבו צעדי האימונים לא מסונכרנים לחלוטין (למשל, הכשרה לשרת פרמטר ).

הדרכה זו מדגימה כיצד לבצע אימון סינכרוני רב עובד באמצעות מופע של tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy יוצר עותקים של כל המשתנים שכבות של מודל על כל מכשיר בכל העובדים. היא משתמשת CollectiveOps , אופ TensorFlow לתקשורת קולקטיבית, הדרגתיים המצרפי ולשמור המשתנים מסונכרנים. tf.distribute.Strategy המדריך קיים פרטים נוספים על האסטרטגיה הזאת.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy מספקת יישומים מרובים דרך CommunicationOptions פרמטר: 1) RING סככה הקולקטיבים מבוססי טבעת באמצעות gRPC כשכבת תקשורת חוצי המארח; 2) NCCL משתמשת ספריית התקשורת הקיבוצית NVIDIA ליישם הקולקטיבים; ו 3) AUTO דוחה את בחירת הריצה. הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר וסוג ה-GPUs, ובחיבור הרשת באשכול.

אימון הדגם

עם השילוב של tf.distribute.Strategy API לתוך tf.keras , היחיד לשנות לך תעשה להפיץ את האימונים כדי ועמיתים מרובים מצרף בניין המודל model.compile() שיחה בתוך strategy.scope() . מכתיבת ההיקף של חלוק האסטרטגיה איך ואיפה המשתנה נוצרים, ובמקרה של MultiWorkerMirroredStrategy , המשתנים שנוצרה MirroredVariable ים, והם משוכפלים על כול אחד מהעובדים.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

למעשה, כדי לרוץ עם MultiWorkerMirroredStrategy עליך להפעיל תהליכי העובד לעבור TF_CONFIG אליהם.

כמו mnist.py קובץ שנכתב קודם לכן, כאן הוא main.py שכל העובדים יפעל:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

בקטע הקוד הנ"ל פתק כי global_batch_size , אשר מועברת אל Dataset.batch , מוגדר per_worker_batch_size * num_workers . מבטיחה זה כי כל עובד מעבדת אצוות של per_worker_batch_size דוגמאות ללא קשר למספר העובדים.

הספרייה הנוכחית מכילה כעת את שני קבצי Python:

ls *.py
main.py
mnist.py

אז JSON-בהמשכים את TF_CONFIG ולהוסיף אותו משתני סביבה:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

עכשיו, אתה יכול לפתוח בתהליך עובד שיריץ את main.py ולהשתמש TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

יש כמה דברים לשים לב לפקודה לעיל:

  1. היא משתמשת %%bash שהינה "קסם" מחברת לרוץ כמה פקודות bash.
  2. היא משתמשת --bg הדגל כדי להפעיל את bash תהליך ברקע, כי עובד זה לא לסיים. זה מחכה לכל העובדים לפני שהוא מתחיל.

התהליך עובד backgrounded לא יודפס פלט מחשב נייד זה, ולכן &> פניות התפוקה שלה לקובץ, כך שאתה יכול לבדוק מה קרה בקובץ יומן מאוחרים.

לכן, המתן מספר שניות עד שהתהליך יתחיל:

import time
time.sleep(10)

כעת, בדוק מה יצא לקובץ היומן של העובד עד כה:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

השורה האחרונה של קובץ היומן צריכה לומר: Started server with target: grpc://localhost:12345 . העובד הראשון מוכן כעת, ומחכה שכל שאר העובדים יהיו מוכנים להמשיך.

אז לעדכן את tf_config לתהליך של העובד השני להרים:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

הפעל את העובד השני. זה יתחיל את ההכשרה מכיוון שכל העובדים פעילים (לכן אין צורך לרקע תהליך זה):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

אם תבדוק שוב את היומנים שנכתבו על ידי העובד הראשון, תלמד שהוא השתתף בהכשרת המודל הזה:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

באופן לא מפתיע, רן זה איטי יותר מאשר במבחן לרוץ בתחילת מדריך זה.

הפעלת מספר עובדים על מכונה אחת רק מוסיפה תקורה.

המטרה כאן לא הייתה לשפר את זמן האימון, אלא רק לתת דוגמה לאימון רב עובדים.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

הכשרה רב עובדים לעומק

עד כה, למדת כיצד לבצע הגדרה בסיסית של ריבוי עובדים.

במהלך שאר המדריך, תלמד על גורמים אחרים, שעשויים להיות שימושיים או חשובים עבור מקרי שימוש אמיתיים, בפירוט.

פיצול מערכי נתונים

בשנת הכשרה רב-עובד, הנתונים sharding נדרש כדי להבטיח התכנסות וביצועים.

דוגמה בסעיף הקודם מסתמכת על autosharding ברירת המחדל שמספק tf.distribute.Strategy API. אתה יכול לשלוט על sharding ידי קביעת tf.data.experimental.AutoShardPolicy של tf.data.experimental.DistributeOptions .

כדי ללמוד עוד על-sharding האוטומטי, עיינו במדריך קלט מבוזרת .

הנה דוגמה קטנה על איך להפוך את הרכב sharding off, כך שכל העתק ומעבדת כל דוגמה (לא מומלץ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

הַעֲרָכָה

אם אתה עובר את validation_data לתוך Model.fit , זה יהיה לסירוגין הכשרה והערכה עבור כל התקופה. ההערכה לוקחת את validation_data מופצת ברחבי אותה הקבוצה של עובדים ותוצאות ההערכה נצברות וזמינה עבור כול עובדים.

בדומה לאימון, מערך האימות נגזור אוטומטית ברמת הקובץ. אתה צריך להגדיר גודל אצווה עולמי במערך האימות ולהגדיר את validation_steps .

מומלץ גם מערך נתונים חוזר להערכה.

לחלופין, ניתן גם ליצור משימה נוספת שקוראת מעת לעת נקודות ביקורת ומריצה את ההערכה. זה מה שאומדן עושה. אבל זו לא דרך מומלצת לביצוע הערכה ולכן פרטיה נשמטים.

ביצועים

עכשיו יש לך מודל Keras כי הוא כל להגדיר לרוץ עובדים מרובים עם MultiWorkerMirroredStrategy .

כדי לכוונן את הביצועים של הכשרה מרובה עובדים, אתה יכול לנסות את הפעולות הבאות:

  • tf.distribute.MultiWorkerMirroredStrategy מספק מספר רב של הטמעות תקשורת קולקטיבית :

    • RING כלים מבוססי טבעת הקולקטיבים באמצעות gRPC כשכבת תקשורת חוצי המארח.
    • NCCL משתמשת ספריית התקשורת הקיבוצית NVIDIA ליישם הקולקטיבים.
    • AUTO דוחה את בחירת הריצה.

    הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר ה-GPUs, סוג ה-GPUs וחיבור הרשת באשכול. כדי לעקוף את הבחירה האוטומטית, לציין את communication_options פרמטר של MultiWorkerMirroredStrategy בנאי "s. לדוגמה:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • עופרת משתנה כדי tf.float אם אפשר:

    • מודל ResNet הרשמי כולל דוגמא של איך אפשר לעשות את זה.

סובלנות לתקלות

באימון סינכרוני, האשכול ייכשל אם אחד העובדים ייכשל ולא קיים מנגנון שחזור כשלים.

שימוש Keras עם tf.distribute.Strategy מגיע עם יתרון של עמידות בפני תקלות במקרים בהם עובדים למות או אחרת לא יציב. אתה יכול לעשות זאת על ידי שימור מצב האימון במערכת הקבצים המבוזרת לבחירתך, כך שעם הפעלה מחדש של המופע שנכשל בעבר או הקדים, מצב האימון ישוחזר.

כאשר עובד הופך לא זמין, עובדים אחרים ייכשלו (ייתכן לאחר פסק זמן). במקרים כאלה, יש להפעיל מחדש את העובד שאינו זמין, כמו גם עובדים אחרים שנכשלו.

Callback של ModelCheckpoint

ModelCheckpoint התקשרות כבר לא מספקת פונקציונליות עמידות בפני תקלות, השתמש BackupAndRestore התקשרות במקום.

ModelCheckpoint ההתקשרות עדיין ניתן להשתמש כדי לחסוך מחסומים. אבל עם זה, אם האימון הופסק או הסתיים בהצלחה, כדי להמשיך את האימון מהמחסום, המשתמש אחראי לטעון את הדגם באופן ידני.

לחלופין המשתמש יכול לבחור לשמור ולשחזר דגם / משקולות מחוץ ModelCheckpoint התקשרות.

שמירת דגם וטעינה

כדי לחסוך המודל שלך באמצעות model.save או tf.saved_model.save , הצרכים יעד השמירה להיות שונה עבור כל עובד.

  • עבור עובדים שאינם עובדים ראשיים, תצטרך לשמור את הדגם בספרייה זמנית.
  • עבור המפקד, תצטרך לשמור בספריית הדגמים המסופקת.

הספריות הזמניות של העובד צריכות להיות ייחודיות כדי למנוע שגיאות הנובעות ממספר עובדים שמנסים לכתוב לאותו מיקום.

הדגם שנשמר בכל המדריכים זהה, ובדרך כלל יש להפנות רק לדגם שנשמר על ידי המפקד לצורך שחזור או הגשה.

אמורה להיות לך היגיון ניקוי שמוחק את הספריות הזמניות שנוצרו על ידי העובדים לאחר השלמת ההכשרה שלך.

הסיבה לחיסכון במפקד ובעובדים בו-זמנית היא מכיוון שאתה עשוי לצבור משתנים במהלך נקודת הביקורת, מה שמחייב הן את המפקד והן מהעובדים להשתתף בפרוטוקול התקשורת של allreduce. מצד שני, מתן אפשרות למפקד ולעובדים לשמור באותו ספריית דגמים תגרום לשגיאות עקב מחלוקת.

שימוש MultiWorkerMirroredStrategy , התוכנית מנוהלת על כל עובד, וכדי לדעת אם העובד הנוכחי הוא ראש, זה מנצל את האובייקט פותרן אשכול שיש לו תכונות task_type ו task_id :

  • task_type אומר לך מה התפקיד הנוכחי הוא (למשל 'worker' ).
  • task_id אומר לך את המזהה של העובד.
  • עובד עם task_id == 0 מיועד ככל שהעובד הראשי.

בקטע הקוד להלן, write_filepath הפונקציה מספקת את נתיב הקובץ לכתוב, אשר תלוי של העובד task_id :

  • עבור העובד הראשי (עם task_id == 0 ), זה כותב את נתיב הקובץ המקורי.
  • עבור עובדים אחרים, היא יוצרת directory- זמני temp_dir -עם task_id בנתיב ספרייה לכתוב:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

עם זה, אתה מוכן כעת לשמור:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

כפי שתואר לעיל, בהמשך יש לטעון את המודל רק ממפקד הנתיב שנשמר אליו, אז בואו נסיר את הזמניים שהעובדים הלא ראשיים הצילו:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

עכשיו, כאשר הגיע הזמן לטעון, בואו להשתמש נוח tf.keras.models.load_model API, ולהמשיך עם עבודה נוספת.

הנה, רק להניח באמצעות עובד יחיד העומס ולהמשיך אימונים, ובמקרה כזה לא קורא tf.keras.models.load_model בתוך אחר strategy.scope() (שים לב strategy = tf.distribute.MultiWorkerMirroredStrategy() , כהגדרתו מוקדם ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

שמירה ושחזור של מחסומים

מצד שני, המחסום מאפשר לשמור את המשקולות של הדגם שלך ולשחזר אותם ללא צורך לשמור את כל הדגם.

הנה, תיצור אותו tf.train.Checkpoint העוקבת המודל, אשר מנוהל על ידי tf.train.CheckpointManager , כך שרק המחסום האחרון נשמר:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

לאחר CheckpointManager מוגדר, עכשיו אתה מוכן להציל ולהסיר את מחסומי העובדים הלא-הראש הצילו:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

עכשיו, כאשר אתה צריך לשחזר את המודל, אתה יכול למצוא את המחסום האחרון הציל באמצעות הנוחה tf.train.latest_checkpoint הפונקציה. לאחר שחזור המחסום, ניתן להמשיך בהדרכה.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

BackupAndRestore התקשרות חוזרת

tf.keras.callbacks.experimental.BackupAndRestore ההתקשרות מספקת את הפונקציונליות העמידה בפני תקלות באמצעות גיבוי הדגם ומספר עידן נוכחי בקובץ במחסום ארעי תחת backup_dir טיעון כדי BackupAndRestore . זה נעשה בסוף כל תקופה.

ברגע שעבודות מופרעות ומתחילות מחדש, ההתקשרות חזרה משחזרת את המחסום האחרון, והאימונים נמשכים מתחילת העידן שנקטע. כל אימון חלקי שכבר נעשה בעידן הבלתי נגמר לפני ההפרעה ייזרק, כך שהוא לא ישפיע על מצב הדגם הסופי.

כדי להשתמש בו, לספק מופע של tf.keras.callbacks.experimental.BackupAndRestore בבית Model.fit השיחה.

עם MultiWorkerMirroredStrategy , אם מקבל קטע עובד, באשכול כולו עוצר עד העובד נקטע מופעל מחדש. עובדים אחרים יתחילו גם הם, והעובד שהופסק מצטרף מחדש לאשכול. לאחר מכן, כל עובד קורא את קובץ המחסום שנשמר בעבר וקולט את מצבו הקודם, ובכך מאפשר לאשכול לחזור לסנכרון. לאחר מכן, האימונים נמשכים.

BackupAndRestore ההתקשרות משתמשת CheckpointManager להציל ולשחזר את מצב אימונים, אשר יוצר קובץ בשם במחסום כי מסלולים קיימים מחסומים יחד עם אחד האחרון. מסיבה זו, backup_dir לא צריך להיות שימוש חוזר לאחסון מחסומים אחרים כדי התנגשות שמות להימנע.

נכון לעכשיו, BackupAndRestore התקשרות תומך עובד יחיד ללא אסטרטגיה, MirroredStrategy, ו-עובד רב עם MultiWorkerMirroredStrategy. להלן שתי דוגמאות הן להכשרה מרובה עובדים והן להכשרה לעובד יחיד.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

אם לבדוק את הספרייה של backup_dir שציינת BackupAndRestore , אתה עשוי להבחין כמה קבצים במחסום שנוצר באופן זמני. קבצים אלה נדרשים לשחזור במקרים אבדו בעבר, והם יוסרו על ידי הספרייה בסוף Model.fit עם יציאה מוצלחת של האימונים שלך.

משאבים נוספים

  1. ההכשרה שהופצה TensorFlow מדריך מספקת סקירה של אסטרטגיות הפצה הזמינות.
  2. לולאת האימונים המותאמים אישית עם Keras ו MultiWorkerMirroredStrategy מופעי הדרכה כיצד להשתמש MultiWorkerMirroredStrategy עם Keras ו לולאת אימונים מותאמים אישית.
  3. בדקו את הדגמים הרשמיים , שרבים מהם יכולים להיות מוגדר להפעיל אסטרטגיות הפצה מרובות.
  4. ביצועים טובים יותר עם tf.function המדריך מספק מידע על אסטרטגיות אחרות וכלים, כגון Profiler TensorFlow אתה יכול להשתמש בו כדי למטב את הביצועים של דגמי TensorFlow שלך.