آموزش چند کارگری با Keras

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش نحوه اجرای آموزش توزیع شده چندکاره با مدل Keras و Model.fit API را با استفاده از tf.distribute.Strategy API - به ویژه کلاس tf.distribute.MultiWorkerMirroredStrategy می دهد. با کمک این استراتژی، یک مدل Keras که برای اجرا بر روی یک کارگر طراحی شده است، می تواند به طور یکپارچه روی چندین کارگر با حداقل تغییرات کد کار کند.

برای کسانی که علاقه مند به درک عمیق تر از API های tf.distribute.Strategy ، آموزش توزیع شده در راهنمای TensorFlow برای مروری بر استراتژی های توزیع که TensorFlow پشتیبانی می کند در دسترس است.

برای یادگیری نحوه استفاده از MultiWorkerMirroredStrategy با Keras و یک حلقه آموزشی سفارشی، به حلقه آموزش سفارشی با Keras و MultiWorkerMirroredStrategy مراجعه کنید.

توجه داشته باشید که هدف از این آموزش نشان دادن یک مثال حداقل چند کارگری با دو کارگر است.

برپایی

با برخی واردات ضروری شروع کنید:

import json
import os
import sys

قبل از وارد کردن TensorFlow، چند تغییر در محیط ایجاد کنید:

  1. تمام پردازنده های گرافیکی را غیر فعال کنید. این از خطاهای ناشی از تلاش کارگران برای استفاده از یک GPU جلوگیری می کند. در یک برنامه دنیای واقعی، هر کارگر روی یک ماشین متفاوت خواهد بود.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. متغیر محیطی TF_CONFIG را بازنشانی کنید (در ادامه بیشتر در مورد آن خواهید آموخت):
os.environ.pop('TF_CONFIG', None)
  1. مطمئن شوید که دایرکتوری فعلی در مسیر پایتون قرار دارد - این به نوت بوک اجازه می دهد تا فایل های نوشته شده توسط %%writefile بعدا وارد کند:
if '.' not in sys.path:
  sys.path.insert(0, '.')

اکنون TensorFlow را وارد کنید:

import tensorflow as tf

مجموعه داده و تعریف مدل

در مرحله بعد، یک فایل mnist_setup.py با یک مدل ساده و تنظیمات مجموعه ایجاد کنید. این فایل پایتون توسط فرآیندهای کارگر در این آموزش استفاده خواهد شد:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

آموزش نمونه روی یک کارگر مجرد

سعی کنید مدل را برای تعداد کمی از دوره ها آموزش دهید و نتایج یک کارگر را مشاهده کنید تا مطمئن شوید همه چیز به درستی کار می کند. با پیشرفت تمرین، ضرر باید کاهش یابد و دقت باید افزایش یابد.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

پیکربندی چند کارگری

حالا بیایید وارد دنیای آموزش چندکاره شویم.

خوشه ای با مشاغل و وظایف

در TensorFlow، آموزش توزیع شده شامل: یک 'cluster' با چندین کار، و هر یک از مشاغل ممکن است یک یا چند 'task' داشته باشد.

شما به متغیر محیطی پیکربندی TF_CONFIG برای آموزش روی چندین ماشین نیاز دارید، که احتمالاً هر کدام نقش متفاوتی دارند. TF_CONFIG یک رشته JSON است که برای تعیین پیکربندی خوشه برای هر کارگری که بخشی از خوشه است استفاده می شود.

دو جزء از یک متغیر TF_CONFIG وجود دارد: 'cluster' و 'task' .

  • یک 'cluster' برای همه کارگران یکسان است و اطلاعاتی در مورد خوشه آموزشی ارائه می دهد، که دستوری است از انواع مختلفی از مشاغل، مانند 'worker' یا 'chief' .

    • در آموزش چندکاره‌ای با tf.distribute.MultiWorkerMirroredStrategy ، معمولاً یک 'worker' وجود دارد که علاوه بر کاری که یک 'worker' معمولی انجام می‌دهد، مسئولیت‌هایی مانند ذخیره یک پست بازرسی و نوشتن یک فایل خلاصه برای TensorBoard را بر عهده می‌گیرد. چنین 'worker' به عنوان کارگر اصلی (با نام شغل 'chief' ) شناخته می شود.
    • مرسوم است که 'chief' 'index' 0 را به آن منصوب می کند (در واقع، tf.distribute.Strategy به این ترتیب پیاده سازی می شود).
  • یک 'task' اطلاعات مربوط به وظیفه فعلی را ارائه می دهد و برای هر کارگر متفاوت است. 'type' و 'index' آن کارگر را مشخص می کند.

در زیر یک نمونه پیکربندی است:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

در اینجا همان TF_CONFIG است که به عنوان یک رشته JSON سریال شده است:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

توجه داشته باشید که tf_config فقط یک متغیر محلی در پایتون است. برای اینکه بتوان از آن برای پیکربندی آموزشی استفاده کرد، این دیکت باید به صورت JSON سریال شود و در یک متغیر محیطی TF_CONFIG قرار گیرد.

در پیکربندی مثال بالا، وظیفه 'type' را روی 'worker' و وظیفه 'index' را روی 0 تنظیم می کنید. بنابراین این دستگاه اولین کارگر است. به عنوان کارگر 'chief' منصوب می شود و بیشتر از بقیه کار می کند.

برای اهداف تصویری، این آموزش نشان می‌دهد که چگونه می‌توانید یک متغیر TF_CONFIG را با دو کارگر در یک میزبان localhost تنظیم کنید.

در عمل، چندین کارگر روی آدرس‌ها/پورت‌های IP خارجی ایجاد می‌کنید و بر اساس آن یک متغیر TF_CONFIG روی هر کارگر تنظیم می‌کنید.

در این آموزش شما از دو کارگر استفاده خواهید کرد:

  • اولین ( 'chief' ) TF_CONFIG کارگر در بالا نشان داده شده است.
  • برای کارگر دوم، tf_config['task']['index']=1 را تنظیم خواهید کرد

متغیرهای محیطی و فرآیندهای فرعی در نوت بوک ها

فرآیندهای فرعی متغیرهای محیطی را از والد خود به ارث می برند.

به عنوان مثال، می توانید یک متغیر محیطی را در این فرآیند نوت بوک Jupyter به صورت زیر تنظیم کنید:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

سپس، می توانید از یک زیر فرآیند به متغیر محیطی دسترسی پیدا کنید:

echo ${GREETINGS}
Hello TensorFlow!

در بخش بعدی، از روش مشابهی برای ارسال TF_CONFIG به زیرفرایندهای کارگر استفاده خواهید کرد. در یک سناریوی دنیای واقعی، شما مشاغل خود را به این روش راه اندازی نمی کنید، اما در این مثال کافی است.

استراتژی مناسب را انتخاب کنید

در TensorFlow، دو شکل اصلی آموزش توزیع شده وجود دارد:

  • آموزش همزمان ، که در آن مراحل آموزش در بین کارگران و کپی ها همگام سازی می شود و
  • آموزش ناهمزمان ، که در آن مراحل آموزش به طور دقیق همگام سازی نمی شوند (به عنوان مثال، آموزش سرور پارامتر ).

این آموزش نشان می دهد که چگونه می توان آموزش چند کارگری همزمان را با استفاده از نمونه tf.distribute.MultiWorkerMirroredStrategy انجام داد.

MultiWorkerMirroredStrategy کپی‌هایی از همه متغیرها در لایه‌های مدل در هر دستگاه در همه کارگران ایجاد می‌کند. از CollectiveOps ، یک عملیات TensorFlow برای ارتباطات جمعی، برای جمع‌آوری گرادیان‌ها و همگام نگه داشتن متغیرها استفاده می‌کند. راهنمای tf.distribute.Strategy جزئیات بیشتری در مورد این استراتژی دارد.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy چندین پیاده سازی را از طریق پارامتر tf.distribute.experimental.CommunicationOptions فراهم می کند: 1) RING مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی متقابل میزبان پیاده سازی می کند. 2) NCCL از NVIDIA Collective Communication Library برای پیاده سازی جمع ها استفاده می کند. و 3) AUTO انتخاب را به زمان اجرا موکول می کند. بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPUها و اتصال شبکه در خوشه بستگی دارد.

مدل را آموزش دهید

با ادغام tf.distribute.Strategy API در tf.keras ، تنها تغییری که برای توزیع آموزش بین چند کارگر ایجاد خواهید کرد، احاطه ساختن مدل و فراخوانی model.compile model.compile() در داخل strategi.scope strategy.scope() است. دامنه استراتژی توزیع تعیین می کند که چگونه و کجا متغیرها ایجاد شوند، و در مورد MultiWorkerMirroredStrategy ، متغیرهای ایجاد شده MirroredVariable s هستند و بر روی هر یک از کارگران تکرار می شوند.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

برای اجرای واقعی با MultiWorkerMirroredStrategy ، باید فرآیندهای کارگر را اجرا کنید و یک TF_CONFIG به آنها ارسال کنید.

مانند فایل mnist_setup.py که قبلا نوشته شده است، در اینجا main.py است که هر یک از کارگران اجرا خواهند کرد:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

در قطعه کد بالا توجه داشته باشید که global_batch_size که به Dataset.batch می شود، روی per_worker_batch_size * num_workers شده است. این تضمین می کند که هر کارگر دسته هایی از نمونه های per_worker_batch_size بدون توجه به تعداد کارگران پردازش می کند.

فهرست فعلی شامل هر دو فایل پایتون است:

ls *.py
main.py
mnist_setup.py

بنابراین TF_CONFIG را json سریال کنید و آن را به متغیرهای محیط اضافه کنید:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

اکنون، می توانید یک فرآیند worker را راه اندازی کنید که main.py را اجرا می کند و از main.py استفاده می TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

در مورد دستور بالا باید به چند نکته توجه کرد:

  1. از %%bash که یک نوت بوک "جادویی" است برای اجرای برخی از دستورات bash استفاده می کند.
  2. از پرچم --bg برای اجرای فرآیند bash در پس‌زمینه استفاده می‌کند، زیرا این کارگر خاتمه نمی‌یابد. قبل از شروع کار منتظر همه کارگران است.

فرآیند کارگر پس‌زمینه خروجی را در این نوت‌بوک چاپ نمی‌کند، بنابراین &> خروجی خود را به یک فایل هدایت می‌کند تا بتوانید بعداً آنچه را که در یک فایل گزارش اتفاق افتاده است بررسی کنید.

بنابراین، چند ثانیه صبر کنید تا فرآیند شروع شود:

import time
time.sleep(10)

اکنون، آنچه را که تاکنون در فایل گزارش کارگر خروجی داده شده است، بررسی کنید:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

خط آخر فایل لاگ باید بگوید: Started server with target: grpc://localhost:12345 . اولین کارگر اکنون آماده است و منتظر است تا همه کارگران دیگر آماده ادامه کار باشند.

بنابراین tf_config را برای پردازش دومین کارگر به‌روزرسانی کنید:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

دومین کارگر را راه اندازی کنید. این آموزش را آغاز می کند زیرا همه کارگران فعال هستند (بنابراین نیازی به پیشینه این فرآیند نیست):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

اگر گزارش‌های نوشته شده توسط اولین کارگر را دوباره بررسی کنید، متوجه خواهید شد که در آموزش آن مدل شرکت کرده است:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

جای تعجب نیست که این کار کندتر از اجرای آزمایشی در ابتدای این آموزش بود.

راه اندازی چندین کارگر روی یک ماشین تنها باعث افزایش هزینه می شود.

هدف در اینجا بهبود زمان آموزش نبود، بلکه تنها ارائه مثالی از آموزش چند کارگری بود.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

آموزش عمیق چند کارگری

تا کنون، نحوه اجرای یک راه اندازی اولیه چندکاره را یاد گرفته اید.

در ادامه آموزش، با سایر عوامل، که ممکن است برای موارد استفاده واقعی مفید یا مهم باشند، به تفصیل آشنا خواهید شد.

اشتراک گذاری مجموعه داده

در آموزش چند کارگری، به اشتراک گذاری داده ها برای اطمینان از همگرایی و عملکرد مورد نیاز است.

مثال در بخش قبل متکی به autosharding پیش فرض ارائه شده توسط tf.distribute.Strategy API است. می‌توانید با تنظیم tf.data.experimental.AutoShardPolicy tf.data.experimental.DistributeOptions را کنترل کنید.

برای کسب اطلاعات بیشتر در مورد اشتراک گذاری خودکار ، به راهنمای ورودی توزیع شده مراجعه کنید.

در اینجا یک مثال سریع از نحوه غیرفعال کردن اشتراک گذاری خودکار آورده شده است، به طوری که هر کپی هر نمونه را پردازش می کند ( توصیه نمی شود ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

ارزیابی

اگر validation_data را به Model.fit کنید، برای هر دوره بین آموزش و ارزیابی متناوب خواهد شد. validation_data که از داده های

مشابه آموزش، مجموعه داده اعتبارسنجی به طور خودکار در سطح فایل به اشتراک گذاشته می شود. شما باید یک اندازه دسته کلی را در مجموعه داده اعتبار سنجی تنظیم کنید و validation_steps را تنظیم کنید.

یک مجموعه داده مکرر نیز برای ارزیابی توصیه می شود.

همچنین، می‌توانید کار دیگری ایجاد کنید که به صورت دوره‌ای نقاط بازرسی را می‌خواند و ارزیابی را اجرا می‌کند. این کاری است که استیماتور انجام می دهد. اما این روش توصیه ای برای انجام ارزیابی نیست و بنابراین جزئیات آن حذف می شود.

کارایی

اکنون یک مدل Keras دارید که برای اجرا در چندین کارگر با MultiWorkerMirroredStrategy شده است.

برای تغییر عملکرد آموزش چند کارگری، می توانید موارد زیر را امتحان کنید:

  • tf.distribute.MultiWorkerMirroredStrategy چندین پیاده سازی ارتباط جمعی را ارائه می دهد:

    • RING مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی متقابل میزبان پیاده سازی می کند.
    • NCCL از کتابخانه ارتباطات جمعی NVIDIA برای پیاده سازی مجموعه ها استفاده می کند.
    • AUTO انتخاب را به زمان اجرا موکول می کند.

    بهترین انتخاب پیاده سازی جمعی به تعداد GPU، نوع GPU و اتصال شبکه در خوشه بستگی دارد. برای لغو انتخاب خودکار، پارامتر communication_options MultiWorkerMirroredStrategy را مشخص کنید. مثلا:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • در صورت امکان متغیرها را به tf.float کنید:

    • مدل رسمی ResNet شامل مثالی از نحوه انجام این کار است.

تحمل خطا

در آموزش همزمان، اگر یکی از کارگران شکست بخورد و مکانیزم بازیابی شکست وجود نداشته باشد، خوشه شکست می‌خورد.

استفاده از Keras با tf.distribute.Strategy دارای مزیت تحمل خطا در مواردی است که کارگران می میرند یا ناپایدار هستند. شما می توانید این کار را با حفظ وضعیت آموزشی در سیستم فایل توزیع شده انتخابی خود انجام دهید، به طوری که پس از راه اندازی مجدد نمونه ای که قبلا شکست خورده یا از قبل استفاده شده بود، وضعیت آموزشی بازیابی می شود.

وقتی یک کارگر در دسترس نباشد، سایر کارگران شکست خواهند خورد (احتمالاً پس از یک تایم اوت). در چنین مواردی، کارگر غیرقابل دسترسی و همچنین سایر کارگرانی که شکست خورده اند، نیاز به راه اندازی مجدد دارند.

پاسخ به تماس مدل Checkpoint

پاسخ به تماس ModelCheckpoint دیگر قابلیت تحمل خطا را ارائه نمی دهد، لطفاً به جای آن از BackupAndRestore استفاده کنید.

پاسخ تماس ModelCheckpoint همچنان می تواند برای ذخیره نقاط بازرسی استفاده شود. اما با این کار، اگر آموزش قطع شد یا با موفقیت به پایان رسید، برای ادامه آموزش از نقطه بازرسی، کاربر موظف است مدل را به صورت دستی بارگذاری کند.

به صورت اختیاری، کاربر می تواند ذخیره و بازیابی مدل/وزن ها را خارج از پاسخ به تماس ModelCheckpoint کند.

ذخیره و بارگذاری مدل

برای ذخیره مدل خود با استفاده از model.save یا tf.saved_model.save ، مقصد ذخیره باید برای هر کارگر متفاوت باشد.

  • برای کارگران غیر ارشد، باید مدل را در یک فهرست موقت ذخیره کنید.
  • برای رئیس، باید در پوشه مدل ارائه شده ذخیره کنید.

دایرکتوری های موقت روی کارگر باید منحصر به فرد باشند تا از خطاهای ناشی از تلاش چندین کارگر برای نوشتن در یک مکان جلوگیری شود.

مدل ذخیره شده در همه دایرکتوری ها یکسان است و معمولاً فقط مدل ذخیره شده توسط chief باید برای بازیابی یا ارائه ارجاع داده شود.

شما باید یک منطق پاکسازی داشته باشید که پس از اتمام آموزش، دایرکتوری های موقت ایجاد شده توسط کارگران را حذف می کند.

دلیل صرفه جویی همزمان روی رئیس و کارگران این است که ممکن است متغیرهایی را در حین بازرسی جمع آوری کنید که هم رئیس و هم کارگران باید در پروتکل ارتباطی allreduce شرکت کنند. از سوی دیگر، اجازه دادن به مدیر و کارگران در یک فهرست مدل یکسان باعث بروز خطاهای ناشی از مشاجره می شود.

با استفاده از MultiWorkerMirroredStrategy ، برنامه بر روی هر کارگر اجرا می‌شود و برای اینکه بداند کارگر فعلی رئیس است یا خیر، از شیء حل‌کننده کلاستر که دارای ویژگی‌های task_type و task_id است بهره می‌برد:

  • task_type به شما می گوید که شغل فعلی چیست (به عنوان مثال 'worker' ).
  • task_id شناسه کارگر را به شما می گوید.
  • کارگر با task_id == 0 به عنوان کارگر اصلی تعیین می شود.

در قطعه کد زیر، تابع write_filepath مسیر فایل را برای نوشتن ارائه می‌کند که به task_id کارگر بستگی دارد:

  • برای chief worker (با task_id == 0 )، در مسیر فایل اصلی می نویسد.
  • برای سایر کارگران، یک دایرکتوری موقت – temp_dir – با task_id در مسیر دایرکتوری ایجاد می‌کند تا در آن بنویسد:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

با آن، اکنون آماده ذخیره کردن هستید:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

همانطور که در بالا توضیح داده شد، بعداً مدل باید فقط از مسیری که در آن ذخیره شده است بارگذاری شود، بنابراین بیایید موارد موقتی را که کارگران غیر ارشد ذخیره کرده اند حذف کنیم:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

اکنون، زمانی که زمان بارگیری فرا می رسد، بیایید از tf.keras.models.load_model API مناسب استفاده کنیم و به کار بیشتر ادامه دهیم.

در اینجا، فرض کنید که فقط از یک کارگر برای بارگیری و ادامه آموزش استفاده می‌کنید، در این صورت، tf.keras.models.load_model در یک strate.scope strategy.scope() دیگر فراخوانی نمی‌کنید (توجه داشته باشید که strategy = tf.distribute.MultiWorkerMirroredStrategy() ، همانطور که قبلاً تعریف شد. ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

ذخیره و بازیابی ایست بازرسی

از طرف دیگر، چک پوینت به شما این امکان را می دهد که وزن های مدل خود را ذخیره کرده و بدون نیاز به ذخیره کل مدل، آنها را بازیابی کنید.

در اینجا، یک tf.train.Checkpoint ایجاد می‌کنید که مدل را ردیابی می‌کند، که توسط tf.train.CheckpointManager مدیریت می‌شود، به طوری که فقط آخرین نقطه بازرسی حفظ می‌شود:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

پس از راه‌اندازی CheckpointManager ، اکنون آماده ذخیره و حذف پست‌های بازرسی هستید که کارگران غیر ارشد ذخیره کرده‌اند:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

اکنون، زمانی که نیاز به بازیابی مدل دارید، می‌توانید با استفاده از عملکرد راحت tf.train.latest_checkpoint ، آخرین پست بازرسی ذخیره شده را پیدا کنید. پس از بازگرداندن پست بازرسی، می توانید به آموزش ادامه دهید.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

BackupAndRestore callback

پاسخ تماس tf.keras.callbacks.BackupAndRestore عملکرد تحمل خطا را با پشتیبان گیری از مدل و شماره دوره فعلی در فایل ایست بازرسی موقت تحت آرگومان backup_dir در BackupAndRestore می کند. این کار در پایان هر دوره انجام می شود.

هنگامی که کارها قطع می شوند و مجدداً راه اندازی می شوند، فراخوان آخرین نقطه بازرسی را بازیابی می کند و آموزش از ابتدای دوره قطع شده ادامه می یابد. هر گونه آموزش جزئی که قبلاً در دوره ناتمام قبل از وقفه انجام شده است دور ریخته می شود، به طوری که بر وضعیت مدل نهایی تأثیر نمی گذارد.

برای استفاده از آن، نمونه ای از tf.keras.callbacks.BackupAndRestore را در تماس Model.fit .

با MultiWorkerMirroredStrategy ، اگر کارگری دچار وقفه شود، کل خوشه متوقف می شود تا زمانی که کارگر قطع شده مجدداً راه اندازی شود. سایر کارگران نیز راه اندازی مجدد خواهند شد و کارگر قطع شده دوباره به خوشه می پیوندد. سپس، هر کارگر فایل ایست بازرسی را که قبلاً ذخیره شده بود می‌خواند و حالت قبلی خود را می‌گیرد، در نتیجه به خوشه اجازه می‌دهد دوباره همگام شود. سپس، آموزش ادامه می یابد.

BackupAndRestore callback از CheckpointManager برای ذخیره و بازیابی وضعیت آموزشی استفاده می‌کند، که فایلی به نام checkpoint تولید می‌کند که پست‌های بازرسی موجود را به همراه آخرین مورد ردیابی می‌کند. به همین دلیل، backup_dir نباید دوباره برای ذخیره سایر نقاط بازرسی استفاده شود تا از تصادم نام جلوگیری شود.

در حال حاضر، BackupAndRestore از آموزش تک کارگری بدون استراتژی – MirroredStrategy – و آموزش چندکاره با MultiWorkerMirroredStrategy می کند.

در زیر دو مثال برای آموزش چند کارگری و آموزش تک کارگری آورده شده است:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

اگر دایرکتوری backup_dir را که در BackupAndRestore مشخص کرده‌اید بررسی کنید، ممکن است متوجه برخی از فایل‌های ایست بازرسی موقت شوید. این فایل‌ها برای بازیابی نمونه‌های از دست رفته قبلی مورد نیاز هستند، و پس از خروج موفقیت‌آمیز از آموزش، در انتهای Model.fit توسط کتابخانه حذف خواهند شد.

منابع اضافی

  1. راهنمای آموزشی Distributed in TensorFlow یک نمای کلی از استراتژی های توزیع موجود را ارائه می دهد.
  2. حلقه آموزشی Custom with Keras و MultiWorkerMirroredStrategy آموزش نحوه استفاده از MultiWorkerMirroredStrategy با Keras و یک حلقه آموزشی سفارشی را نشان می دهد.
  3. مدل‌های رسمی را بررسی کنید، که بسیاری از آنها را می‌توان برای اجرای چندین استراتژی توزیع پیکربندی کرد.
  4. راهنمای عملکرد بهتر با tf.function اطلاعاتی درباره استراتژی‌ها و ابزارهای دیگر، مانند TensorFlow Profiler که می‌توانید برای بهینه‌سازی عملکرد مدل‌های TensorFlow خود استفاده کنید، ارائه می‌دهد.