ترجمت واجهة Cloud Translation API‏ هذه الصفحة.
Switch to English

تدريب متعدد العمال مع Keras

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

نظرة عامة

يوضح هذا البرنامج التعليمي التدريب الموزع متعدد العمال باستخدام نموذج tf.distribute.Strategy باستخدام tf.distribute.Strategy API ، على وجه التحديد tf.distribute.MultiWorkerMirroredStrategy . بمساعدة هذه الإستراتيجية ، يمكن لنموذج Keras الذي تم تصميمه للعمل على عامل واحد أن يعمل بسلاسة على عدة عمال مع الحد الأدنى من تغيير الكود.

يتوفر التدريب الموزع في دليل TensorFlow للحصول على نظرة عامة على استراتيجيات التوزيع التي يدعمها TensorFlow للمهتمين بفهم أعمق لـ tf.distribute.Strategy APIs.

اقامة

أولا ، بعض الواردات الضرورية.

import json
import os
import sys

قبل استيراد TensorFlow ، قم بإجراء بعض التغييرات على البيئة.

قم بتعطيل كافة وحدات معالجة الرسومات. هذا يمنع الأخطاء التي يسببها جميع العمال الذين يحاولون استخدام نفس GPU. للحصول على تطبيق حقيقي ، سيكون كل عامل على جهاز مختلف.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG تعيين متغير البيئة TF_CONFIG ، وسترى المزيد حول هذا لاحقًا.

os.environ.pop('TF_CONFIG', None)

تأكد من أن الدليل الحالي على مسار بايثون. يسمح هذا للمفكرة باستيراد الملفات المكتوبة بواسطة %%writefile لاحقًا.

if '.' not in sys.path:
  sys.path.insert(0, '.')

الآن قم باستيراد TensorFlow.

import tensorflow as tf

تعريف النموذج ومجموعة البيانات

بعد ذلك ، قم بإنشاء ملف mnist.py بنموذج بسيط وإعداد مجموعة بيانات. سيتم استخدام ملف python بواسطة العمليات العاملة في هذا البرنامج التعليمي:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

جرب تدريب النموذج لعدد صغير من العصور ولاحظ نتائج عامل واحد للتأكد من أن كل شيء يعمل بشكل صحيح. مع تقدم التدريب ، يجب أن تنخفض الخسارة ويجب أن تزيد الدقة.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 2s 13ms/step - loss: 2.3003 - accuracy: 0.1326
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2360 - accuracy: 0.3211
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1608 - accuracy: 0.4944

<tensorflow.python.keras.callbacks.History at 0x7fa7f41a5400>

تكوين متعدد العمال

الآن دعنا ندخل عالم التدريب متعدد العمال. في TensorFlow ، TF_CONFIG متغير البيئة TF_CONFIG للتدريب على أجهزة متعددة ، لكل منها دور مختلف. TF_CONFIG عبارة عن سلسلة JSON تستخدم لتحديد تكوين الكتلة على كل عامل يمثل جزءًا من الكتلة.

فيما يلي مثال على التكوين:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

إليك نفس TF_CONFIG المتسلسلة كسلسلة JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

هناك نوعان من مكونات TF_CONFIG : cluster task .

  • cluster هي نفسها لجميع العمال وتوفر معلومات حول مجموعة التدريب ، والتي تتكون من أنواع مختلفة من الوظائف مثل worker . في التدريب متعدد العاملين باستخدام MultiWorkerMirroredStrategy ، عادة ما يكون هناك worker واحد يتحمل مسؤولية أكبر قليلاً مثل حفظ نقطة التفتيش وكتابة ملف ملخص لـ TensorBoard بالإضافة إلى ما يفعله worker العادي. يُشار إلى هذا العامل على أنه chief العمال ، ومن المعتاد أن يتم تعيين worker ذي index 0 باعتباره worker الرئيسي (في الواقع هذه هي الطريقة التي يتم بها تنفيذ tf.distribute.Strategy ).

  • task توفر معلومات عن المهمة الحالية وتختلف على كل عامل. يحدد type index ذلك العامل.

في هذا المثال ، تقوم بتعيين type المهمة إلى "worker" index المهام على 0 . هذه الآلة هي العامل الأول وسيتم تعيينه كرئيس للعمال ويقوم بعمل أكثر من الآخرين. لاحظ أن الأجهزة الأخرى ستحتاج إلى مجموعة متغير البيئة TF_CONFIG أيضًا ، ويجب أن يكون لها نفس cluster الإملاء ، ولكن type مهمة أو index مهام مختلف اعتمادًا على أدوار تلك الأجهزة.

لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيف يمكن للمرء تعيين TF_CONFIG مع عاملين على localhost TF_CONFIG . من الناحية العملية ، يمكن للمستخدمين إنشاء عدة عمال على عناوين / منافذ IP خارجية ، وتعيين TF_CONFIG على كل عامل بشكل مناسب.

في هذا المثال سوف تستخدم عاملين ، يظهر TF_CONFIG العامل الأول أعلاه. بالنسبة للعامل الثاني ، يمكنك تعيين tf_config['task']['index']=1

أعلاه ، tf_config هو مجرد متغير محلي في بيثون. لاستخدامه بالفعل لتكوين التدريب ، يحتاج هذا القاموس إلى التسلسل كـ JSON ، ووضعه في متغير البيئة TF_CONFIG .

متغيرات البيئة والعمليات الفرعية في أجهزة الكمبيوتر المحمولة

ترث العمليات الفرعية متغيرات البيئة من الوالدين. لذلك إذا قمت بتعيين متغير بيئة في عملية jupyter notebook هذه:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

يمكنك الوصول إلى متغير البيئة من عمليات فرعية:

echo ${GREETINGS}
Hello TensorFlow!

في القسم التالي ، ستستخدم هذا لتمرير TF_CONFIG إلى TF_CONFIG للعمال. لن تقوم مطلقًا بإطلاق وظائفك بهذه الطريقة ، لكنها كافية لأغراض هذا البرنامج التعليمي: لإظهار مثال بسيط متعدد العمال.

اختر الاستراتيجية الصحيحة

يوجد في TensorFlow نوعان رئيسيان من التدريب الموزع:

  • التدريب المتزامن ، حيث تتم مزامنة خطوات التدريب عبر العاملين والنسخ المتماثلة ، و
  • التدريب غير المتزامن ، حيث لا تتم مزامنة خطوات التدريب بشكل صارم.

MultiWorkerMirroredStrategy ، وهي الإستراتيجية الموصى بها للتدريب المتزامن متعدد العمال ، في هذا الدليل. لتدريب النموذج ، استخدم مثيل tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy بإنشاء نسخ من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. يستخدم CollectiveOps ، وهو TensorFlow op للاتصال الجماعي ، لتجميع التدرجات اللونية والحفاظ على تزامن المتغيرات. يحتوي دليل tf.distribute.Strategy على مزيد من التفاصيل حول هذه الاستراتيجية.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

توفر MultiWorkerMirroredStrategy متعددة عبر معلمة CommunicationOptions . تنفذ RING مجموعات قائمة على الحلقة باستخدام gRPC كطبقة اتصال عبر المضيف. يستخدم NCCL Nvidia لتنفيذ المجموعات. AUTO يؤجل الاختيار لوقت التشغيل. يعتمد أفضل خيار للتنفيذ الجماعي على عدد ونوع وحدات معالجة الرسومات ، والتوصيل البيني للشبكة في المجموعة.

تدريب النموذج

من خلال تكامل tf.distribute.Strategy API في tf.keras ، فإن التغيير الوحيد الذي ستقوم به لتوزيع التدريب على العديد من العاملين هو إرفاق مبنى النموذج model.compile() داخل strategy.scope() . يحدد نطاق استراتيجية التوزيع كيف وأين يتم إنشاء المتغيرات ، وفي حالة MultiWorkerMirroredStrategy ، فإن المتغيرات التي تم إنشاؤها هي MirroredVariable s ، ويتم تكرارها على كل من العمال.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

للتشغيل فعليًا باستخدام MultiWorkerMirroredStrategy ستحتاج إلى تشغيل عمليات العمال وتمرير TF_CONFIG إليهم.

مثل ملف mnist.py المكتوب مسبقًا ، إليك main.py التي main.py كل عامل:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

في مقتطف الشفرة أعلاه ، لاحظ أن global_batch_size ، الذي يتم تمريره إلى Dataset.batch ، تم تعيينه على per_worker_batch_size * num_workers . هذا يضمن أن كل عامل يعالج دفعات من الأمثلة per_worker_batch_size بغض النظر عن عدد العمال.

يحتوي الدليل الحالي الآن على كلا ملفي Python:

ls *.py
main.py
mnist.py

لذا ، قم بتسلسل TF_CONFIG إلى متغيرات البيئة:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن ، يمكنك تشغيل عملية عاملة من شأنها تشغيل main.py واستخدام TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.

python main.py &> job_0.log

هناك بعض الأشياء التي يجب ملاحظتها حول الأمر أعلاه:

  1. يستخدم %%bash وهو عبارة عن دفتر ملاحظات "سحري" لتشغيل بعض أوامر bash.
  2. يستخدم العلامة --bg لتشغيل عملية bash في الخلفية ، لأن هذا العامل لن ينتهي. ينتظر جميع العمال قبل أن يبدأ.

لن تقوم عملية العامل ذات الخلفية بطباعة الإخراج إلى هذا الكمبيوتر الدفتري ، لذا يقوم &> بإعادة توجيه الإخراج إلى ملف ، حتى تتمكن من رؤية ما حدث.

لذلك ، انتظر بضع ثوانٍ حتى تبدأ العملية:

import time
time.sleep(10)

انظر الآن إلى ما تم إخراج ملف سجل العامل حتى الآن:

cat job_0.log
2021-01-21 02:25:42.175986: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-21 02:25:43.946875: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:43.947856: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-21 02:25:45.054212: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-21 02:25:45.054293: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054344: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-21 02:25:45.054509: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-21 02:25:45.054517: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-21 02:25:45.055458: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-21 02:25:45.055932: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.056674: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.061293: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-21 02:25:45.061752: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

يجب أن يقول السطر الأخير من ملف السجل: تم Started server with target: grpc://localhost:12345 . العامل الأول جاهز الآن ، وينتظر جميع العمال الآخرين ليكونوا مستعدين للمضي قدمًا.

لذا قم بتحديث tf_config لعملية العامل الثاني لالتقاط:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن أطلق العامل الثاني. سيبدأ هذا التدريب نظرًا لأن جميع العمال نشطين (لذلك ليست هناك حاجة إلى خلفية هذه العملية):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.3042 - accuracy: 0.0955
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2330 - accuracy: 0.3486
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1501 - accuracy: 0.5445

2021-01-21 02:25:52.272702: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-21 02:25:54.022387: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:54.023369: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-21 02:25:55.313133: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-21 02:25:55.313203: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:55.313212: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:55.313332: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-21 02:25:55.313373: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-21 02:25:55.313386: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-21 02:25:55.314278: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-21 02:25:55.314696: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:55.315494: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:55.319659: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-21 02:25:55.320191: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-01-21 02:25:56.295743: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-01-21 02:25:56.548639: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-21 02:25:56.549007: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000185000 Hz

الآن إذا أعدت فحص السجلات التي كتبها العامل الأول ، فسترى أنه شارك في تدريب هذا النموذج:

cat job_0.log
2021-01-21 02:25:42.175986: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-21 02:25:43.946875: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:43.947856: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-21 02:25:45.054212: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-21 02:25:45.054293: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054344: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-21 02:25:45.054509: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-21 02:25:45.054517: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-21 02:25:45.055458: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-21 02:25:45.055932: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.056674: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.061293: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-21 02:25:45.061752: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-01-21 02:25:56.294854: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-01-21 02:25:56.539862: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-21 02:25:56.540266: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000185000 Hz
Epoch 1/3
70/70 [==============================] - 7s 54ms/step - loss: 2.3042 - accuracy: 0.0955
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2330 - accuracy: 0.3486
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1501 - accuracy: 0.5445

ليس من المستغرب أن يكون هذا أبطأ من الاختبار الذي تم تشغيله في بداية هذا البرنامج التعليمي. يؤدي تشغيل عدة عمال على جهاز واحد إلى إضافة النفقات العامة فقط. لم يكن الهدف هنا هو تحسين وقت التدريب ، ولكن فقط لإعطاء مثال على تدريب متعدد العمال.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

تدريب متعدد العمال في العمق

حتى الآن ، أظهر هذا البرنامج التعليمي إعدادًا أساسيًا متعدد العمال. تبحث بقية هذه الوثيقة بالتفصيل عن العوامل الأخرى التي قد تكون مفيدة أو مهمة لحالات الاستخدام الحقيقي.

تقسيم مجموعة البيانات

في التدريب متعدد العمال ، هناك حاجة إلى تجزئة مجموعة البيانات لضمان التقارب والأداء.

يعتمد المثال في القسم السابق على المشاركة التلقائية الافتراضية التي يوفرها tf.distribute.Strategy API. يمكنك التحكم في التجزئة عن طريق تعيين tf.data.experimental.AutoShardPolicy من tf.data.experimental.DistributeOptions . لمعرفة المزيد حول التجزئة التلقائية ، راجع دليل الإدخال الموزع .

فيما يلي مثال سريع على كيفية إيقاف تشغيل التجزئة التلقائية ، بحيث تعالج كل نسخة متماثلة كل مثال (غير مستحسن):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

تقييم

إذا قمت بتمرير validation_data إلى model.fit ، model.fit بين التدريب والتقييم لكل فترة. يتم توزيع بيانات validation_data التقييم عبر نفس مجموعة العمال ويتم تجميع نتائج التقييم وإتاحتها لجميع العمال. على غرار التدريب ، يتم تقسيم مجموعة بيانات التحقق تلقائيًا على مستوى الملف. تحتاج إلى تعيين حجم دفعة عام في مجموعة بيانات validation_steps وتعيين validation_steps . يوصى أيضًا بمجموعة بيانات متكررة للتقييم.

بدلاً من ذلك ، يمكنك أيضًا إنشاء مهمة أخرى تقرأ نقاط التحقق بشكل دوري وتدير التقييم. هذا ما يفعله المقدر. لكن هذه ليست طريقة موصى بها لإجراء التقييم وبالتالي يتم حذف تفاصيله.

تنبؤ

لا يعمل model.predict حاليًا مع MultiWorkerMirroredStrategy.

أداء

لديك الآن نموذج Keras الذي تم إعداده بالكامل للتشغيل في عدة عمال باستخدام MultiWorkerMirroredStrategy . يمكنك تجربة الأساليب التالية MultiWorkerMirroredStrategy أداء التدريب متعدد العمال باستخدام MultiWorkerMirroredStrategy .

  • توفر MultiWorkerMirroredStrategy اتصالات جماعية متعددة. تنفذ RING مجموعات قائمة على الحلقات باستخدام gRPC كطبقة اتصال عبر المضيف. يستخدم NCCL Nvidia لتنفيذ المجموعات. AUTO يؤجل الاختيار لوقت التشغيل. يعتمد أفضل خيار للتنفيذ الجماعي على عدد ونوع وحدات معالجة الرسومات ، والتوصيل البيني للشبكة في المجموعة. لتجاوز الخيار التلقائي، حدد communication_options المعلمة من MultiWorkerMirroredStrategy منشئ الصورة، على سبيل المثال communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • صب المتغيرات على tf.float إن أمكن. يتضمن نموذج ResNet الرسمي مثالاً على كيفية القيام بذلك.

التسامح مع الخطأ

في التدريب المتزامن ، ستفشل الكتلة إذا فشل أحد العمال ولا توجد آلية لاسترداد الفشل. يأتي استخدام Keras مع tf.distribute.Strategy تأتي tf.distribute.Strategy مع ميزة التسامح مع الخطأ في الحالات التي يموت فيها العمال أو يكونون غير مستقرين. يمكنك القيام بذلك عن طريق الحفاظ على حالة التدريب في نظام الملفات الموزعة الذي تختاره ، بحيث يتم استرداد حالة التدريب عند إعادة تشغيل المثيل الذي فشل أو تم إيقافه مسبقًا.

عندما يصبح العامل غير متاح ، سيفشل العمال الآخرون (ربما بعد انتهاء المهلة). في مثل هذه الحالات ، يحتاج العامل غير المتاح إلى إعادة التشغيل ، وكذلك العمال الآخرين الذين فشلوا.

رد اتصال ModelCheckpoint

لم يعد رد الاتصال ModelCheckpoint يوفر وظيفة التسامح مع الخطأ ، يرجى استخدام رد الاتصال BackupAndRestore بدلاً من ذلك.

لا يزال من الممكن استخدام رد الاتصال ModelCheckpoint لحفظ نقاط التفتيش. ولكن مع هذا ، إذا توقف التدريب أو انتهى بنجاح ، من أجل مواصلة التدريب من نقطة التفتيش ، يكون المستخدم مسؤولاً عن تحميل النموذج يدويًا.

اختياريا ، يمكن للمستخدم اختيار حفظ واستعادة النموذج / الأوزان خارج رد الاتصال ModelCheckpoint .

نموذج الادخار والتحميل

لحفظ النموذج الخاص بك باستخدام model.save أو tf.saved_model.save ، يجب أن تكون وجهة الحفظ مختلفة لكل عامل. على العمال غير الرئيسيين ، ستحتاج إلى حفظ النموذج في دليل مؤقت ، وعلى الرئيس ، ستحتاج إلى الحفظ في دليل النموذج المقدم. يجب أن تكون الدلائل المؤقتة الخاصة بالعامل فريدة من نوعها لمنع الأخطاء الناتجة عن العديد من العمال الذين يحاولون الكتابة إلى نفس الموقع. النموذج المحفوظ في جميع الدلائل متطابق وعادة ما يجب الإشارة فقط إلى النموذج المحفوظ من قبل الرئيس للاستعادة أو التقديم. يجب أن يكون لديك بعض منطق التنظيف الذي يحذف الأدلة المؤقتة التي أنشأها العمال بمجرد اكتمال تدريبك.

السبب الذي يجعلك تحتاج إلى التوفير على الرئيس والعاملين في نفس الوقت هو أنك قد تقوم بتجميع المتغيرات أثناء نقاط التفتيش التي تتطلب مشاركة كل من الرئيس والعاملين في بروتوكول الاتصال allreduce. من ناحية أخرى ، فإن السماح للرئيس والعاملين بالحفظ في دليل النموذج نفسه سيؤدي إلى حدوث أخطاء بسبب الخلاف.

باستخدام MultiWorkerMirroredStrategy ، يتم تشغيل البرنامج على كل عامل ، ومن أجل معرفة ما إذا كان العامل الحالي هو الرئيس ، فإنه يستفيد من كائن محلل الكتلة الذي يحتوي على سمات task_type و task_id . يخبرك task_type ما هي الوظيفة الحالية (على سبيل المثال ، "عامل") ، ويخبرك task_id بمعرف العامل. تم تعيين العامل برقم 0 كرئيس عمال.

في مقتطف الشفرة أدناه ، يوفر write_filepath مسار الملف المراد كتابته ، والذي يعتمد على معرف العامل. في حالة الرئيس (العامل بالمعرف 0) ، يكتب إلى مسار الملف الأصلي ؛ بالنسبة للآخرين ، يقوم بإنشاء دليل مؤقت (مع معرف في مسار الدليل) لكتابة:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

بذلك ، أنت الآن جاهز للحفظ:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

كما هو موضح أعلاه ، يجب تحميل النموذج لاحقًا فقط من رئيس المسار المحفوظ فيه ، لذلك دعونا نزيل النماذج المؤقتة التي حفظها العمال غير الرئيسيين:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

الآن ، عندما يحين وقت التحميل ، دعنا نستخدم واجهة برمجة تطبيقات tf.keras.models.load_model الملائمة ، ونستمر في العمل الإضافي. هنا ، افترض استخدام عامل واحد فقط لتحميل التدريب tf.keras.models.load_model ، وفي هذه الحالة لا تستدعي tf.keras.models.load_model ضمن tf.keras.models.load_model strategy.scope() . tf.keras.models.load_model strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 14ms/step - loss: 2.2981 - accuracy: 0.1039
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2813 - accuracy: 0.1430

<tensorflow.python.keras.callbacks.History at 0x7fa7f102cbe0>

حفظ واستعادة نقطة التفتيش

من ناحية أخرى ، تتيح لك نقاط التفتيش حفظ أوزان النموذج واستعادتها دون الحاجة إلى حفظ النموذج بأكمله. هنا ، ستنشئ tf.train.Checkpoint واحدًا يتتبع النموذج ، والذي تتم إدارته بواسطة tf.train.CheckpointManager بحيث يتم الاحتفاظ بأحدث نقطة tf.train.CheckpointManager فقط.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

بمجرد إعداد CheckpointManager ، فأنت الآن جاهز للحفظ وإزالة نقاط التفتيش للعمال غير الرئيسيين المحفوظة.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

الآن ، عندما تحتاج إلى الاستعادة ، يمكنك العثور على أحدث نقطة تفتيش محفوظة باستخدام وظيفة tf.train.latest_checkpoint . بعد استعادة نقطة التفتيش ، يمكنك متابعة التدريب.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 14ms/step - loss: 2.3013 - accuracy: 0.1083
Epoch 2/2
20/20 [==============================] - 0s 14ms/step - loss: 2.2850 - accuracy: 0.1703

<tensorflow.python.keras.callbacks.History at 0x7fa7f0fc6f60>

رد الاتصال BackupAndRestore

يوفر رد الاتصال BackupAndRestore وظيفة التسامح مع الخطأ ، عن طريق عمل نسخة احتياطية من النموذج ورقم الحقبة الحالية في ملف نقطة تفتيش مؤقتة ضمن وسيطة backup_dir إلى BackupAndRestore . يتم ذلك في نهاية كل حقبة.

بمجرد مقاطعة الوظائف وإعادة تشغيلها ، يستعيد رد الاتصال آخر نقطة تفتيش ، ويستمر التدريب من بداية حقبة الانقطاع. سيتم التخلص من أي تدريب جزئي تم إجراؤه بالفعل في الحقبة غير المكتملة قبل الانقطاع ، بحيث لا يؤثر على حالة النموذج النهائي.

لاستخدامه ، قم بتوفير مثيل لـ tf.keras.callbacks.experimental.BackupAndRestore في استدعاء tf.keras.Model.fit() .

باستخدام MultiWorkerMirroredStrategy ، في حالة مقاطعة عامل ، تتوقف المجموعة بأكملها مؤقتًا حتى يتم إعادة تشغيل العامل الذي تمت مقاطعته. سيتم إعادة تشغيل العمال الآخرين أيضًا ، وينضم العامل الذي تمت مقاطعته إلى المجموعة. بعد ذلك ، يقرأ كل عامل ملف نقاط التفتيش الذي تم حفظه مسبقًا ويلتقط حالته السابقة ، مما يسمح للمجموعة بالعودة إلى المزامنة. ثم يستمر التدريب.

يستخدم BackupAndRestore رد الاتصال CheckpointManager لحفظ واستعادة حالة التدريب ، والتي تنشئ ملفًا يسمى checkpoint يتتبع نقاط التفتيش الموجودة مع أحدثها. لهذا السبب ، لا ينبغي إعادة استخدام backup_dir لتخزين نقاط التفتيش الأخرى لتجنب تضارب الأسماء.

حاليًا ، يدعم رد الاتصال BackupAndRestore عاملًا واحدًا بدون إستراتيجية ، و MirroredStrategy ، ومتعدد العاملين باستخدام MultiWorkerMirroredStrategy. فيما يلي مثالان لكل من تدريب متعدد العمال وتدريب عامل واحد.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2811 - accuracy: 0.1663
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1709 - accuracy: 0.3821
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.0420 - accuracy: 0.5454

<tensorflow.python.keras.callbacks.History at 0x7fa7f0623fd0>

إذا قمت بفحص دليل backup_dir الذي حددته في BackupAndRestore ، فقد تلاحظ بعض ملفات نقاط التحقق التي تم إنشاؤها مؤقتًا. هذه الملفات مطلوبة لاستعادة المثيلات المفقودة سابقًا ، وستتم إزالتها بواسطة المكتبة في نهاية tf.keras.Model.fit() عند الخروج بنجاح من tf.keras.Model.fit() .

أنظر أيضا

  1. يوفر التدريب الموزع في دليل TensorFlow نظرة عامة على استراتيجيات التوزيع المتاحة.
  2. النماذج الرسمية ، يمكن تكوين العديد منها لتشغيل استراتيجيات توزيع متعددة.
  3. يوفر قسم الأداء في الدليل معلومات حول الاستراتيجيات والأدوات الأخرى التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow.