يوم مجتمع ML هو 9 نوفمبر! الانضمام إلينا للحصول على التحديثات من TensorFlow، JAX، وأكثر معرفة المزيد

حلقة تدريب مخصصة مع Keras و MultiWorkerMirroredStrategy

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

يوضح هذا البرنامج التعليمي تدريبًا متعدد العمال باستخدام حلقة تدريب مخصصة API ، موزعة عبر MultiWorkerMirroredStrategy ، لذلك يمكن لنموذج Keras المصمم للتشغيل على عامل واحد أن يعمل بسلاسة على العديد من العمال مع الحد الأدنى من تغيير الرمز.

نحن نستخدم حلقات تدريب مخصصة لتدريب نموذجنا لأنها تمنحنا مرونة وتحكمًا أكبر في التدريب. علاوة على ذلك ، من الأسهل تصحيح أخطاء النموذج وحلقة التدريب. يتوفر المزيد من المعلومات التفصيلية في كتابة حلقة تدريبية من البداية .

إذا كنت تبحث عن كيفية استخدام MultiWorkerMirroredStrategy مع model.fit ، model.fit هذا البرنامج التعليمي بدلاً من ذلك.

يتوفر التدريب الموزع في دليل TensorFlow للحصول على نظرة عامة على استراتيجيات التوزيع التي يدعمها TensorFlow للمهتمين بفهم أعمق لـ tf.distribute.Strategy APIs.

يثبت

أولا ، بعض الواردات الضرورية.

import json
import os
import sys

قبل استيراد TensorFlow ، قم بإجراء بعض التغييرات على البيئة.

قم بتعطيل كافة وحدات معالجة الرسومات. هذا يمنع الأخطاء التي يسببها جميع العمال الذين يحاولون استخدام نفس GPU. للحصول على تطبيق حقيقي ، سيكون كل عامل على جهاز مختلف.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG تعيين متغير البيئة TF_CONFIG ، وسترى المزيد حول هذا لاحقًا.

os.environ.pop('TF_CONFIG', None)

تأكد من أن الدليل الحالي على مسار بايثون. يسمح هذا لدفتر الملاحظات باستيراد الملفات المكتوبة بواسطة %%writefile لاحقًا.

if '.' not in sys.path:
  sys.path.insert(0, '.')

الآن قم باستيراد TensorFlow.

import tensorflow as tf

تعريف مجموعة البيانات والنموذج

بعد ذلك ، قم بإنشاء ملف mnist.py بنموذج بسيط وإعداد مجموعة بيانات. سيتم استخدام ملف python بواسطة العمليات العاملة في هذا البرنامج التعليمي:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

تكوين متعدد العمال

الآن دعنا ندخل عالم التدريب متعدد العمال. في TensorFlow ، TF_CONFIG متغير بيئة TF_CONFIG للتدريب على أجهزة متعددة ، لكل منها دور مختلف. TF_CONFIG المستخدم أدناه ، عبارة عن سلسلة JSON تستخدم لتحديد تكوين الكتلة على كل عامل يمثل جزءًا من الكتلة. هذا هو الأسلوب الافتراضي لتحديد مجموعة، وذلك باستخدام cluster_resolver.TFConfigClusterResolver ، ولكن هناك خيارات أخرى متاحة في distribute.cluster_resolver حدة.

صف مجموعتك

فيما يلي مثال على التكوين:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

إليك نفس TF_CONFIG المتسلسلة كسلسلة JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

هناك نوعان من مكونات TF_CONFIG : cluster task .

  • cluster هي نفسها لجميع العمال وتوفر معلومات حول مجموعة التدريب ، والتي تتكون من أنواع مختلفة من الوظائف مثل worker . في التدريب متعدد العمال باستخدام MultiWorkerMirroredStrategy ، عادة ما يكون هناك worker واحد يتحمل مسؤولية أكبر قليلاً مثل حفظ نقطة التفتيش وكتابة ملف ملخص لـ TensorBoard بالإضافة إلى ما يفعله worker العادي. يُشار إلى هذا العامل على أنه chief العمال ، ومن المعتاد أن يتم تعيين worker ذي index 0 باعتباره worker الرئيسي (في الواقع هذه هي الطريقة التي يتم بها تنفيذ tf.distribute.Strategy ).

  • task توفر معلومات عن المهمة الحالية وتختلف على كل عامل. يحدد type index ذلك العامل.

في هذا المثال ، يمكنك تعيين type المهمة إلى "worker" index المهام إلى 0 . هذه الآلة هي العامل الأول وسيتم تعيينه كرئيس للعمال ويقوم بعمل أكثر من الآخرين. لاحظ أن الأجهزة الأخرى ستحتاج إلى مجموعة متغير بيئة TF_CONFIG أيضًا ، ويجب أن يكون لها نفس cluster الإملاء ، ولكن type مهمة أو index مهام مختلف اعتمادًا على أدوار تلك الأجهزة.

لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيف يمكن للمرء تعيين TF_CONFIG مع عاملين على localhost TF_CONFIG . من الناحية العملية ، يمكن للمستخدمين إنشاء عدة عمال على عناوين / منافذ IP خارجية ، وتعيين TF_CONFIG على كل عامل بشكل مناسب.

في هذا المثال سوف تستخدم عاملين ، يظهر TF_CONFIG العامل الأول أعلاه. بالنسبة للعامل الثاني ، يمكنك تعيين tf_config['task']['index']=1

أعلاه ، tf_config هو مجرد متغير محلي في بيثون. لاستخدامه بالفعل لتكوين التدريب ، يحتاج هذا القاموس إلى التسلسل كـ JSON ، ووضعه في متغير البيئة TF_CONFIG .

متغيرات البيئة والعمليات الفرعية في أجهزة الكمبيوتر المحمولة

ترث العمليات الفرعية متغيرات البيئة من الوالدين. لذلك إذا قمت بتعيين متغير بيئة في عملية jupyter notebook هذه:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

يمكنك الوصول إلى متغير البيئة من عمليات فرعية:

echo ${GREETINGS}
Hello TensorFlow!

في القسم التالي ، ستستخدم هذا لتمرير TF_CONFIG إلى TF_CONFIG للعمال. لن تقوم مطلقًا بإطلاق وظائفك بهذه الطريقة ، لكنها كافية لأغراض هذا البرنامج التعليمي: لإثبات مثال بسيط متعدد العمال.

MultiWorkerMirroredStrategy

لتدريب النموذج ، استخدم tf.distribute.MultiWorkerMirroredStrategy من tf.distribute.MultiWorkerMirroredStrategy ، والذي يقوم بإنشاء نسخ من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. يحتوي دليل tf.distribute.Strategy على مزيد من التفاصيل حول هذه الإستراتيجية.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

استخدم tf.distribute.Strategy.scope لتحديد وجوب استخدام إستراتيجية عند بناء نموذجك. يضعك هذا في " سياق النسخ المتماثل " لهذه الإستراتيجية ، مما يعني أن الإستراتيجية تتحكم في أشياء مثل الموضع المتغير.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

تقاسم بياناتك تلقائيًا بين العاملين

في التدريب متعدد العاملين ، لا تكون هناك حاجة بالضرورة إلى تجزئة مجموعة البيانات ، ولكنها تمنحك مرة واحدة دلاليًا مما يجعل المزيد من التدريب أكثر قابلية للتكرار ، أي أن التدريب على عدة عمال يجب أن يكون مماثلاً للتدريب على عامل واحد. ملاحظة: قد يتأثر الأداء في بعض الحالات.

انظر: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

تحديد حلقة تدريب مخصصة وتدريب النموذج

حدد مُحسِّنًا

06bdbb8d0

حدد خطوة تدريب tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

حفظ واستعادة نقطة التفتيش

يتطلب تنفيذ Checkpointing في Custom Training Loop أن يتعامل معها المستخدم بدلاً من استخدام رد اتصال keras. يسمح لك بحفظ أوزان النموذج واستعادتها دون الحاجة إلى حفظ النموذج بالكامل.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

هنا ، ستقوم بإنشاء tf.train.Checkpoint واحد الذي يتتبع النموذج ، والذي تتم إدارته بواسطة tf.train.CheckpointManager بحيث يتم الاحتفاظ بأحدث نقطة tf.train.CheckpointManager فقط.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

الآن ، عندما تحتاج إلى الاستعادة ، يمكنك العثور على أحدث نقطة تفتيش محفوظة باستخدام وظيفة tf.train.latest_checkpoint الملائمة.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

بعد استعادة نقطة التفتيش ، يمكنك الاستمرار في تدريب حلقة التدريب المخصصة الخاصة بك.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

إعداد كود كامل على العاملين

للتشغيل فعليًا باستخدام MultiWorkerMirroredStrategy ستحتاج إلى تشغيل عمليات العمال وتمرير TF_CONFIG إليهم.

مثل ملف mnist.py المكتوب سابقًا ، إليك main.py التي تحتوي على نفس الكود الذي مررنا من خلاله خطوة بخطوة مسبقًا في هذا colab ، فنحن نكتبه فقط في ملف حتى يقوم كل عامل بتشغيله:

ملف: main.py

Writing main.py

تدريب وتقييم

يحتوي الدليل الحالي الآن على كلا ملفي Python:

ls *.py
main.py
mnist.py

لذا ، قم بتسلسل TF_CONFIG إلى متغيرات البيئة:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن ، يمكنك تشغيل عملية عاملة تقوم بتشغيل main.py واستخدام TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

هناك بعض الأشياء التي يجب ملاحظتها حول الأمر أعلاه:

  1. يستخدم %%bash وهو عبارة عن دفتر ملاحظات "سحري" لتشغيل بعض أوامر bash.
  2. يستخدم العلامة --bg لتشغيل عملية bash في الخلفية ، لأن هذا العامل لن ينتهي. ينتظر جميع العمال قبل أن يبدأ.

لن تقوم عملية العامل ذات الخلفية بطباعة الإخراج إلى هذا الكمبيوتر الدفتري ، لذا فإن &> يعيد توجيه إخراجها إلى ملف ، حتى تتمكن من رؤية ما حدث.

لذلك ، انتظر بضع ثوان حتى تبدأ العملية:

import time
time.sleep(20)

انظر الآن إلى ما تم إخراج ملف سجل العامل حتى الآن:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

يجب أن يقول السطر الأخير من ملف السجل: تم Started server with target: grpc://localhost:12345 . العامل الأول جاهز الآن ، وينتظر جميع العمال الآخرين ليكونوا مستعدين للمضي قدمًا.

لذا قم بتحديث tf_config لعملية العامل الثاني لالتقاط:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

الآن أطلق العامل الثاني. سيبدأ هذا التدريب نظرًا لأن جميع العمال نشطين (لذلك ليست هناك حاجة إلى خلفية هذه العملية):

python main.py > /dev/null 2>&1

الآن إذا أعدت التحقق من السجلات التي كتبها العامل الأول ، فسترى أنه شارك في تدريب هذا النموذج:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

تدريب متعدد العمال في العمق

أظهر هذا البرنامج التعليمي سير عمل Custom Training Loop للإعداد متعدد العمال. يتوفر وصف تفصيلي لموضوعات أخرى في model.fit's guide متعدد العمال model.fit's guide على model.fit's guide بها (CTL).

أنظر أيضا

  1. يوفر التدريب الموزع في دليل TensorFlow نظرة عامة على استراتيجيات التوزيع المتاحة.
  2. النماذج الرسمية ، يمكن تكوين العديد منها لتشغيل استراتيجيات توزيع متعددة.
  3. يوفر قسم الأداء في الدليل معلومات حول الاستراتيجيات والأدوات الأخرى التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow.