آموزش چند کارگری با کراس

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش نشان می دهد که چگونه به انجام آموزش توزیع چند کارگر با یک مدل Keras و Model.fit API با استفاده از tf.distribute.Strategy API-به طور خاص tf.distribute.MultiWorkerMirroredStrategy کلاس. با استفاده از این استراتژی ، یک مدل Keras که برای کار روی یک کارگر طراحی شده بود می تواند به طور یکپارچه روی چند کارگر با حداقل تغییرات کد کار کند.

برای کسانی که علاقه مند به یک درک عمیق تر از tf.distribute.Strategy API ها، آموزش توزیع شده در TensorFlow راهنمای برای یک مرور کلی از استراتژی توزیع TensorFlow پشتیبانی در دسترس است.

برای آشنایی با نحوه استفاده از MultiWorkerMirroredStrategy با Keras و یک حلقه آموزش سفارشی، برای اشاره حلقه آموزش سفارشی با Keras و MultiWorkerMirroredStrategy .

توجه داشته باشید که هدف از این آموزش نشان دادن حداقل مثال چند کارگر با دو کارگر است.

برپایی

با واردات ضروری شروع کنید:

import json
import os
import sys

قبل از وارد کردن TensorFlow ، چند تغییر در محیط ایجاد کنید:

  1. همه GPU ها را غیرفعال کنید. این کار از خطاهایی که کارگران سعی می کنند از GPU یکسان استفاده کنند جلوگیری می کند. در یک برنامه کاربردی در دنیای واقعی ، هر کارگر روی یک دستگاه متفاوت قرار می گیرد.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. تنظیم مجدد TF_CONFIG متغیر محیطی (شما بیشتر در مورد این بعد یاد بگیرند):
os.environ.pop('TF_CONFIG', None)
  1. اطمینان حاصل کنید که دایرکتوری جاری در پایتون مسیر این اجازه می دهد تا نوت بوک به واردات فایل های نوشته شده توسط %%writefile بعد:
if '.' not in sys.path:
  sys.path.insert(0, '.')

اکنون TensorFlow را وارد کنید:

import tensorflow as tf

مجموعه داده و تعریف مدل

بعد، ایجاد یک mnist.py فایل را با یک مدل ساده و مجموعه داده راه اندازی. این فایل پایتون توسط پردازنده های کارگر در این آموزش استفاده می شود:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

آموزش مدل در یک کارگر مجرد

سعی کنید آموزش مدل برای تعداد کمی از دوره و مشاهده نتایج یک کارگر تنها به آثار همه چیز مطمئن به درستی. با پیشرفت تمرین ، ضرر باید کاهش یابد و دقت افزایش یابد.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

پیکربندی چند کارگر

حالا بیایید وارد دنیای آموزش چند کارگر شویم.

خوشه ای با مشاغل و وظایف

در TensorFlow، آموزش توزیع شامل: یک 'cluster' با چند شغل، و هر یک از مشاغل ممکن است یک یا چند 'task' بازدید کنندگان.

شما را به نیاز TF_CONFIG پیکربندی متغیر محیطی برای آموزش بر روی چندین ماشین، که هر کدام احتمالا نقش های مختلف است. TF_CONFIG رشته JSON مورد استفاده برای تعیین پیکربندی خوشه برای هر کارگر است که بخشی از خوشه است.

دو جزء از یک وجود دارد TF_CONFIG متغیر: 'cluster' و 'task' .

  • 'cluster' یکسان برای همه کارگران و اطلاعات در مورد خوشه های آموزشی است، که یک بینی متشکل از انواع مختلف کارها، مانند فراهم می کند 'worker' و یا 'chief' .

    • در آموزش چند کارگر با tf.distribute.MultiWorkerMirroredStrategy است، معمولا یک وجود دارد 'worker' که طول می کشد در مسئولیت ها، مانند صرفه جویی در یک ایستگاه بازرسی و نوشتن یک فایل خلاصه برای TensorBoard، علاوه بر آنچه به طور منظم 'worker' می کند. چنین 'worker' است که به عنوان کارگر رئیس (با نام شغلی اشاره 'chief' ).
    • مرسوم است برای 'chief' به 'index' 0 به منصوب (در واقع، این است چگونه می tf.distribute.Strategy پیاده سازی شده است).
  • 'task' اطلاعات از وظیفه فعلی فراهم می کند و برای هر کارگر متفاوت است. این را مشخص می کند 'type' و 'index' که کارگر.

در زیر یک پیکربندی نمونه وجود دارد:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

در اینجا همان است TF_CONFIG مرتب به عنوان یک رشته JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

توجه داشته باشید که tf_config فقط یک متغیر محلی در پایتون است. برای اینکه قادر به استفاده از آن برای یک پیکربندی آموزش، این بینی باید به عنوان یک JSON مرتب و در یک قرار داده TF_CONFIG متغیر محیط زیست است.

در مثال تنظیمات بالا، شما مجموعه ای از کار 'type' به 'worker' و وظیفه 'index' به 0 . بنابراین، این دستگاه کارگر اول است. از آن خواهد شد به عنوان منصوب 'chief' کارگر و انجام کار بیشتر از دیگران است.

به منظور ارائه، این آموزش نشان می دهد که چگونه شما ممکن است راه اندازی یک TF_CONFIG متغیر با دو کارگر در localhost .

در عمل، شما می کارگران خارجی متعدد بر روی IP آدرس / پورت ایجاد و تنظیم یک TF_CONFIG متغیر در هر کارگر درآمده است.

در این آموزش ، شما از دو کارگر استفاده خواهید کرد:

  • اولین ( 'chief' کارگر) را TF_CONFIG بالا نشان داده شده.
  • برای کارگر دوم، شما را راه tf_config['task']['index']=1

متغیرهای محیطی و فرایندهای فرعی در نوت بوک ها

فرایندهای فرعی متغیرهای محیط را از والدین خود به ارث می برند.

به عنوان مثال ، می توانید یک متغیر محیطی را در این فرآیند Jupyter Notebook به شرح زیر تنظیم کنید:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

سپس ، می توانید از یک فرایند فرعی به متغیر محیط دسترسی پیدا کنید:

echo ${GREETINGS}
Hello TensorFlow!

در بخش بعدی، شما یک روش مشابه به تصویب استفاده TF_CONFIG به بازتا کارگر. در یک سناریوی واقعی ، شما مشاغل خود را به این شکل راه اندازی نمی کنید ، اما در این مثال کافی است.

استراتژی مناسب را انتخاب کنید

در TensorFlow ، دو شکل اصلی آموزش توزیع شده وجود دارد:

  • آموزش همزمان، که در آن مراحل آموزش در سراسر کارگران و کپی همگام سازی، و
  • آموزش آسنکرون، که در آن مراحل آموزش به شدت همگام سازی نیست (برای مثال، آموزش سرور پارامتر ).

این آموزش نشان می دهد که چگونه به انجام آموزش چند کارگر سنکرون با استفاده از یک نمونه از tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy ایجاد کپی از تمام متغیرها در لایه های مدل در هر دستگاه در تمام کارگران. آن استفاده می کند CollectiveOps ، یک عملیات TensorFlow برای ارتباط جمعی، به شیب کل و حفظ متغیرهای در هماهنگی. tf.distribute.Strategy راهنمای دارای جزئیات مربوط به این استراتژی است.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy فراهم می کند پیاده سازی های متعدد از طریق CommunicationOptions پارامتر: 1) RING ادوات تعاونی مبتنی بر حلقه با استفاده از gRPC به عنوان لایه ارتباط متقابل میزبان. 2) NCCL با استفاده از NVIDIA کتابخانه جمعی ارتباطات به پیاده سازی تعاونی؛ و 3) AUTO عهده انتخاب را به عهده زمان اجرا. بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPU ها و اتصال شبکه در خوشه بستگی دارد.

مدل را آموزش دهید

با ادغام tf.distribute.Strategy API به tf.keras ، تنها تغییر شما را به توزیع آموزش به چند کارگران است متصل به ساختمان مدل و خواهد ساخت model.compile() در داخل پاسخ strategy.scope() . دیکته دامنه استراتژی توزیع را چگونه و در کجا متغیرهای ایجاد می کند، و در مورد MultiWorkerMirroredStrategy ، متغیرهای ایجاد می MirroredVariable ، و آنها را بر روی هر یک از کارگران تکرار شده است.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

در واقع با اجرای MultiWorkerMirroredStrategy شما نیاز به اجرای فرایندهای کارگر و تصویب یک TF_CONFIG به آنها.

مانند mnist.py فایل زودتر نوشته شده است، در اینجا است main.py که هر یک از کارگران را اجرا خواهد کرد:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

در قطعه کد بالا توجه داشته باشید که global_batch_size ، می شود که به تصویب Dataset.batch ، قرار است به per_worker_batch_size * num_workers . این تضمین می کند که هر کارگر پردازش دسته از per_worker_batch_size نمونه بدون در نظر گرفتن تعداد کارگران.

دایرکتوری فعلی اکنون شامل هر دو فایل پایتون است:

ls *.py
main.py
mnist.py

بنابراین JSON-مرتب TF_CONFIG و آن را به متغیر های محیط زیست را اضافه کنید:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

در حال حاضر، شما می توانید یک پروسه کارگر که اجرا خواهد شد راه اندازی main.py و استفاده از TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

در مورد دستور بالا چند نکته قابل ذکر است:

  1. این با استفاده از %%bash است که نوت بوک "سحر و جادو" برای اجرای برخی از دستورات باش.
  2. آن استفاده می کند --bg پرچم برای اجرای bash روند در پس زمینه، به دلیل این کارگر نمی خاتمه. قبل از شروع کار منتظر همه کارگران است.

پردازش کننده پسزمینه خواهد خروجی را به این نوت بوک را چاپ کنید، به طوری که &> تغییر مسیر خروجی آن به یک فایل به طوری که شما می توانید بازرسی چه بعد از آن در یک فایل ورود به سیستم اتفاق افتاده است.

بنابراین ، چند ثانیه صبر کنید تا روند شروع شود:

import time
time.sleep(10)

حالا آنچه را که تا کنون در فایل log کارگر خروجی داده شده است بررسی کنید:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

آخرین خط فایل ورود به سیستم باید بگوییم: Started server with target: grpc://localhost:12345 . اولین کارگر در حال حاضر آماده است و منتظر است تا همه کارگران دیگر آماده ادامه کار باشند.

بنابراین به روز رسانی tf_config برای فرآیند کارگر دوم را انتخاب کنید تا:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

کارگر دوم را راه اندازی کنید. این کار آموزش را آغاز می کند زیرا همه کارگران فعال هستند (بنابراین نیازی به پیش زمینه این روند نیست):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

اگر گزارش های نوشته شده توسط اولین کارگر را مجدداً بررسی کنید ، می آموزید که در آموزش آن مدل شرکت کرده است:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

جای تعجب نیست، این فرار کندتر از آزمون را در ابتدای این آموزش را اجرا کنید.

اجرای چند کارگر روی یک دستگاه تنها هزینه اضافی را اضافه می کند.

هدف در اینجا بهبود زمان آموزش نبود ، بلکه فقط ارائه نمونه ای از آموزش چند کارگر بود.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

آموزش چند کارگر به صورت عمیق

تا اینجا ، شما نحوه انجام تنظیمات اولیه چند کاره را آموخته اید.

در ادامه آموزش ، با عوامل دیگر ، که ممکن است برای موارد استفاده واقعی مفید یا مهم باشند ، به طور مفصل آشنا شوید.

خرد کردن مجموعه داده ها

در آموزش چند کارگر، مجموعه داده sharding مورد نیاز است برای اطمینان از همگرایی و عملکرد.

به عنوان مثال در بخش قبلی در autosharding به طور پیش فرض ارائه شده توسط متکی tf.distribute.Strategy API. شما می توانید sharding با تنظیم کنترل tf.data.experimental.AutoShardPolicy از tf.data.experimental.DistributeOptions .

برای کسب اطلاعات بیشتر در مورد خودکار sharding، به مراجعه راهنمای ورودی توزیع .

در اینجا یک مثال سریع از چگونه به نوبه خود خودکار sharding خاموش، به طوری که هر ماکت پردازش هر عنوان مثال (توصیه نمی شود) باشد:

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

ارزیابی

اگر شما در عبور validation_data به Model.fit ، آن را بین آموزش و ارزیابی برای هر دوره متناوب. ارزیابی در نظر گرفتن validation_data است که در سراسر مجموعه ای از کارگران برای همه کارگران توزیع شده و نتایج ارزیابی جمع شده و در دسترس است.

مشابه آموزش ، مجموعه داده های اعتبار سنجی به طور خودکار در سطح فایل خرد می شود. شما نیاز به تنظیم اندازه دسته ای جهانی در مجموعه داده اعتبار و تنظیم validation_steps .

مجموعه داده های مکرر نیز برای ارزیابی توصیه می شود.

متناوبا ، شما همچنین می توانید یک کار دیگر ایجاد کنید که به صورت دوره ای نقاط بازرسی را خوانده و ارزیابی را اجرا می کند. این کاری است که برآوردگر انجام می دهد. اما این یک روش توصیه شده برای انجام ارزیابی نیست و بنابراین جزئیات آن حذف می شود.

کارایی

شما در حال حاضر مدل Keras است که همه تا اجرا در کارگران متعدد با اند MultiWorkerMirroredStrategy .

برای بهبود عملکرد آموزش چند کارگر ، می توانید موارد زیر را امتحان کنید:

  • tf.distribute.MultiWorkerMirroredStrategy فراهم می کند چند پیاده سازی ارتباطات جمعی :

    • RING تعاونی مبتنی بر حلقه ادوات با استفاده از gRPC به عنوان لایه ارتباط متقابل میزبان.
    • NCCL با استفاده از جمعی کتابخانه ارتباطات NVIDIA برای اجرای تعاونی.
    • AUTO عهده انتخاب را به عهده زمان اجرا.

    بهترین انتخاب اجرای جمعی بستگی به تعداد GPU ها ، نوع GPU ها و اتصال شبکه در خوشه دارد. به نادیده گرفتن انتخاب خودکار، مشخص communication_options پارامتر MultiWorkerMirroredStrategy سازنده است. مثلا:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • بازیگران متغیرهای به tf.float در صورت امکان:

    • مدل رسمی RESNET شامل یک مثال از اینکه چگونه این می تواند انجام شود.

تحمل خطا

در آموزش همزمان ، اگر یکی از کارگران شکست بخورد و مکانیسم بازیابی خرابی وجود نداشته باشد ، خوشه شکست می خورد.

با استفاده از Keras با tf.distribute.Strategy می آید با استفاده از تحمل خطا در مواردی که کارگران می میرند و یا در غیر این صورت بی ثبات است. شما می توانید این کار را با حفظ وضعیت آموزشی در سیستم فایل توزیع شده به دلخواه خود انجام دهید ، به گونه ای که با راه اندازی مجدد نمونه ای که قبلاً شکست خورده یا پیشاپیش شده بود ، وضعیت آموزش بازیابی می شود.

هنگامی که کارگری در دسترس قرار نمی گیرد ، سایر کارگران شکست می خورند (احتمالاً پس از اتمام زمان). در چنین مواردی ، کارگر در دسترس غیر ضروری است ، و همچنین سایر کارگران شکست خورده.

ModelCheckpoint پاسخ به تماس

ModelCheckpoint پاسخ به تماس دیگر قابلیت تحمل خطا را فراهم می کند، لطفا با استفاده BackupAndRestore پاسخ به تماس به جای.

ModelCheckpoint پاسخ به تماس هنوز هم می تواند استفاده شود برای صرفه جویی در ایستگاه های بازرسی. اما با این کار ، اگر آموزش قطع شد یا با موفقیت به پایان رسید ، برای ادامه آموزش از ایست بازرسی ، کاربر مسئول بارگیری مدل به صورت دستی است.

در صورت تمایل کاربر می تواند انتخاب برای ذخیره و بازیابی مدل / وزن خارج ModelCheckpoint پاسخ به تماس.

ذخیره و بارگذاری مدل

برای صرفه جویی در مدل خود را با استفاده از model.save یا tf.saved_model.save ، نیازهای صرفه جویی در مقصد که برای هر کارگر متفاوت است.

  • برای کارگران غیر اصلی ، باید مدل را در یک فهرست موقت ذخیره کنید.
  • برای رئیس ، باید در فهرست مدل ارائه شده ذخیره کنید.

فهرستهای موقت روی کارگر باید منحصر به فرد باشند تا از خطاهای ناشی از تلاش چندین کارگر برای نوشتن در یک مکان جلوگیری شود.

مدل ذخیره شده در تمام دایرکتوری ها یکسان است و معمولاً فقط مدل ذخیره شده توسط رئیس باید برای بازگردانی یا سرویس دهی ارجاع داده شود.

شما باید منطق پاکسازی داشته باشید که فهرستهای موقت ایجاد شده توسط کارگران را پس از اتمام آموزش حذف کند.

دلیل صرفه جویی در رئیس و کارگران به طور همزمان این است که ممکن است در هنگام بازرسی متغیرها را جمع آوری کنید که مستلزم آن است که رئیس و کارگران در پروتکل ارتباطی allreduce شرکت کنند. از سوی دیگر ، اجازه دادن به رئیس و کارگران در یک دایرکتوری مدل مشابه ، منجر به خطا در نتیجه اختلاف می شود.

با استفاده از MultiWorkerMirroredStrategy ، این برنامه در هر کارگر اجرا، و به منظور بدانند که آیا کارگر فعلی رئیس است، آن را طول می کشد استفاده از خوشه شی برطرف است که ویژگی های task_type و task_id :

  • task_type شما می گوید چه کار فعلی (به عنوان مثال است 'worker' ).
  • task_id به شما می گوید شناسه کارگر.
  • کارگر با task_id == 0 به عنوان کارگر رئیس تعیین شده است.

در تکه کد زیر، write_filepath عملکرد فراهم می کند مسیر فایل به نوشتن، که بستگی به کارگر task_id :

  • برای کارگر رئیس (با task_id == 0 )، آن را به مسیر فایل اصلی می نویسد.
  • برای سایر کارگران، آن را ایجاد directory- موقت temp_dir آرا task_id در مسیر دایرکتوری به نوشتن در:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

با این کار ، اکنون آماده ذخیره هستید:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

همانطور که در بالا توضیح داده شد ، بعداً مدل فقط باید از مسیری که رئیس ذخیره کرده است بارگیری شود ، بنابراین اجازه دهید موارد موقت را که کارگران غیر اصلی ذخیره کرده اند حذف کنیم:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

حالا، وقتی از آن زمان به بار، اجازه دهید با استفاده راحت tf.keras.models.load_model API، و با کار بیشتر ادامه خواهد داد.

در اینجا، فرض تنها با استفاده از تنها کارگران را به بار و ادامه آموزش، که در این صورت شما تماس نمی tf.keras.models.load_model در یکی دیگر از strategy.scope() (توجه داشته باشید که strategy = tf.distribute.MultiWorkerMirroredStrategy() ، به عنوان تعریف زودتر ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

ذخیره و بازیابی ایست بازرسی

از سوی دیگر ، نقطه بازرسی به شما امکان می دهد وزن مدل خود را ذخیره کرده و بدون نیاز به ذخیره کل مدل ، آنها را بازیابی کنید.

در اینجا، شما آن را ایجاد کنید tf.train.Checkpoint که آهنگ مدل، است که توسط مدیریت tf.train.CheckpointManager ، به طوری که تنها شدن از ایست بازرسی حفظ شده است:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

هنگامی که CheckpointManager است تا، شما در حال حاضر آماده برای ذخیره و حذف پست های بازرسی کارگران غیر رئیس ذخیره کرده بود هستید:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

حالا، وقتی شما نیاز به بازگرداندن مدل، شما می توانید آخرین ایست بازرسی را نجات داد با استفاده از مناسب را پیدا کنید tf.train.latest_checkpoint تابع. پس از بازگرداندن بازرسی ، می توانید آموزش را ادامه دهید.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

پشتیبان گیری BackupAndRestore

tf.keras.callbacks.experimental.BackupAndRestore پاسخ به تماس قابلیت تحمل خطا توسط پشتیبان گیری از مدل و شماره دوره کنونی در یک فایل ایست بازرسی موقت تحت فراهم می کند backup_dir آرگومان به BackupAndRestore . این کار در پایان هر دوره انجام می شود.

به محض قطع شدن کارها و راه اندازی مجدد ، تماس مجدد آخرین بازرسی را بازیابی می کند و آموزش از ابتدای دوره وقفه ادامه می یابد. هرگونه آموزش جزئی که قبلاً در دوره ناتمام قبل از وقفه انجام شده است ، دور ریخته می شود ، به طوری که بر حالت مدل نهایی تأثیر نمی گذارد.

برای استفاده از آن، ارائه یک نمونه از tf.keras.callbacks.experimental.BackupAndRestore در Model.fit پاسخ.

با MultiWorkerMirroredStrategy ، اگر یک کارگر قطع می شود، تمام خوشه مکث تا کارگر قطع دوباره شروع شده است. سایر کارگران نیز دوباره راه اندازی می شوند و کارگر قطع شده دوباره به خوشه می پیوندد. سپس ، هر کارگر پرونده ایست بازرسی را که قبلاً ذخیره شده بود می خواند و حالت قبلی خود را برمی دارد ، در نتیجه به خوشه اجازه می دهد تا مجدداً همگام شود. سپس ، آموزش ادامه می یابد.

BackupAndRestore پاسخ به تماس با استفاده از CheckpointManager برای ذخیره و بازیابی وضعیت آموزش، که تولید یک فایل به نام پاسگاه ایست و بازرسی که آهنگ ایست های بازرسی موجود با هم با آخرین است. به همین دلیل، backup_dir نباید دوباره مورد استفاده برای ذخیره ایست های بازرسی دیگر به منظور جلوگیری از برخورد نام.

در حال حاضر، BackupAndRestore پاسخ به تماس از کارگر تنها با استراتژی، MirroredStrategy، و چند کارگر با MultiWorkerMirroredStrategy. در زیر دو مثال برای آموزش چند کارگر و آموزش کارگر مجرد آورده شده است.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

اگر شما بازرسی دایرکتوری از backup_dir شما در مشخص BackupAndRestore ، شما ممکن است برخی از فایل های ایست بازرسی به طور موقت تولید اطلاع می دهد. این فایل ها برای بازیابی موارد قبلا از دست داده مورد نیاز، و آنها را با کتابخانه در پایان حذف Model.fit پس از خروج موفق از آموزش های خود.

منابع اضافی

  1. آموزش توزیع شده در TensorFlow راهنمای یک مرور کلی از استراتژی توزیع موجود فراهم می کند.
  2. حلقه آموزش سفارشی با Keras و MultiWorkerMirroredStrategy آموزش نشان می دهد که چگونه به استفاده از MultiWorkerMirroredStrategy با Keras و یک حلقه آموزش سفارشی.
  3. اتمام مدل رسمی ، بسیاری از که می تواند پیکربندی شده برای اجرای استراتژی توزیع های متعدد.
  4. عملکرد بهتر با tf.function راهنمای اطلاعات در مورد استراتژی های دیگر و ابزار، مانند فراهم می کند TensorFlow نیمرخ شما می توانید به بهینه سازی عملکرد مدل TensorFlow خود استفاده کنید.