این صفحه به‌وسیله ‏Cloud Translation API‏ ترجمه شده است.
Switch to English

آموزش چند کارگری با کراس

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش آموزش توزیع چند کارگر با مدل Keras را با استفاده از tf.distribute.Strategy API ، به طور خاص tf.distribute.MultiWorkerMirroredStrategy . با کمک این استراتژی ، یک مدل Keras که برای استفاده از تک کارگران طراحی شده است می تواند با کمترین تغییر کد روی چندین کارگر کار کند.

راهنمای آموزش توزیع شده در TensorFlow برای مروری بر استراتژی های توزیع TensorFlow برای کسانی که به درک عمیق tf.distribute.Strategy از API های tf.distribute.Strategy پشتیبانی می کنند ، در tf.distribute.Strategy .

برپایی

اول ، برخی از واردات لازم.

import json
import os
import sys

قبل از وارد کردن TensorFlow ، چند تغییر در محیط ایجاد کنید.

همه GPU ها را غیرفعال کنید. این از خطاهای ناشی از تلاش همه کارگران برای استفاده از GPU یکسان جلوگیری می کند. برای یک کاربرد واقعی ، هر کارگر روی دستگاه دیگری است.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

متغیر محیط TF_CONFIG را بازنشانی کنید ، بعداً اطلاعات بیشتری در این مورد خواهید دید.

os.environ.pop('TF_CONFIG', None)

مطمئن شوید که فهرست فعلی در مسیر پایتون قرار دارد. این به نوت بوک اجازه می دهد تا پرونده های نوشته شده توسط %%writefile بعداً وارد کند.

if '.' not in sys.path:
  sys.path.insert(0, '.')

اکنون TensorFlow را وارد کنید.

import tensorflow as tf

مجموعه داده و تعریف مدل

سپس یک پرونده mnist.py با یک مدل ساده و تنظیم مجموعه داده ایجاد کنید. این فایل پایتون توسط فرآیندهای کارگر در این آموزش استفاده خواهد شد:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

سعی کنید این مدل را برای تعداد کمی دوره آموزش دهید و نتایج یک کارگر را مشاهده کنید تا مطمئن شوید همه چیز به درستی کار می کند. با پیشرفت آموزش ، ضرر باید کاهش یافته و دقت آن افزایش یابد.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 2s 13ms/step - loss: 2.3010 - accuracy: 0.0826
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2501 - accuracy: 0.2605
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1914 - accuracy: 0.4208

<tensorflow.python.keras.callbacks.History at 0x7f100d347780>

پیکربندی چند کارگر

حال بیایید وارد دنیای آموزش چند کارگری شویم. در TF_CONFIG متغیر محیطی TF_CONFIG برای آموزش روی چندین ماشین مورد نیاز است ، هر کدام احتمالاً نقش دیگری دارند. TF_CONFIG یک رشته JSON است که برای تعیین پیکربندی خوشه در هر کارگر که بخشی از خوشه است استفاده می شود.

در اینجا یک مثال پیکربندی وجود دارد:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

در اینجا همان TF_CONFIG به صورت رشته ای به عنوان رشته JSON وجود دارد:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG دو TF_CONFIG : cluster و task .

  • cluster برای همه کارگران یکسان است و اطلاعات مربوط به خوشه آموزش را ارائه می دهد ، که یک دستورالعمل متشکل از انواع مختلف مشاغل مانند worker . در آموزش چند کارگر با MultiWorkerMirroredStrategy ، معمولاً یک worker وجود دارد که علاوه بر کاری که یک worker عادی انجام worker دهد ، مسئولیت کمی بیشتر مانند ذخیره ایست بازرسی و نوشتن پرونده خلاصه برای TensorBoard را بر عهده worker گیرد. از چنین کارگری به عنوان کارگر chief می شود ، و معمول است که worker با index 0 به عنوان worker اصلی منصوب می شود (در واقع این نحوه اجرای tf.distribute.Strategy است).

  • task اطلاعات مربوط به وظیفه فعلی را فراهم می کند و در هر کارگر متفاوت است. type و index آن کارگر را مشخص می کند.

در این مثال ، شما type کار را روی "worker" و index کار را روی 0 . این دستگاه اولین کارگر است و به عنوان کارگر اصلی منصوب می شود و کارهای بیشتری نسبت به بقیه انجام می دهد. توجه داشته باشید که سایر ماشین ها نیز باید TF_CONFIG متغیر محیطی TF_CONFIG را داشته باشند و باید دارای cluster یکسان باشد ، اما type کار یا index کار متفاوت با توجه به نقش آن ماشین ها متفاوت است.

برای اهداف تصویرسازی ، این آموزش نشان می دهد چگونه می توان TF_CONFIG با 2 کارگر در localhost . در عمل ، کاربران چندین کارگر در آدرس IP / پورت های خارجی ایجاد می کنند و TF_CONFIG روی هر کارگر به طور مناسب تنظیم می کنند.

در این مثال شما از 2 کارگر استفاده خواهید کرد ، TF_CONFIG کارگر اول در بالا نشان داده شده است. برای کارگر دوم شما tf_config['task']['index']=1

در بالا ، tf_config فقط یک متغیر محلی در پایتون است. برای استفاده واقعی از آن برای پیکربندی آموزش ، این فرهنگ لغت باید به صورت سریال JSON مرتب شود و در متغیر محیط TF_CONFIG قرار گیرد.

متغیرهای محیطی و فرایندهای فرعی در دفترها

زیر پردازش ها متغیرهای محیط را از والدین خود به ارث می برند. بنابراین اگر در این فرآیند jupyter notebook متغیر محیطی تنظیم کنید:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

از زیرفرآیند می توانید به متغیر محیط دسترسی پیدا کنید:

echo ${GREETINGS}
Hello TensorFlow!

در بخش بعدی ، از این برای انتقال TF_CONFIG به TF_CONFIG کارگر استفاده خواهید کرد. شما واقعاً هرگز مشاغل خود را از این طریق راه اندازی نمی کنید ، اما برای اهداف این آموزش کافی است: نمایش یک مثال حداقل چند کارگر.

استراتژی درست را انتخاب کنید

در TensorFlow دو شکل اصلی آموزش توزیع شده وجود دارد:

  • آموزش همزمان ، که در آن مراحل آموزش در بین کارگران و نسخه ها همگام سازی می شود ، و
  • آموزش ناهمزمان ، که در آن مراحل آموزش دقیقاً همگام سازی نشده اند.

MultiWorkerMirroredStrategy ، که استراتژی پیشنهادی برای آموزش همزمان همزمان چند کارگری است ، در این راهنما نشان داده خواهد شد. برای آموزش مدل ، از نمونه ای از tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy کپی از تمام متغیرها را در لایه های مدل در هر دستگاه در همه کارگران ایجاد می کند. برای جمع آوری شیب ها و همگام سازی متغیرها از CollectiveOps ، یک گزینه TensorFlow برای ارتباط جمعی استفاده می کند. راهنمای tf.distribute.Strategy جزئیات بیشتری در مورد این استراتژی دارد.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy چندین پیاده سازی را از طریق پارامتر CommunicationOptions فراهم می کند. RING مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی بین میزبان پیاده سازی می کند. NCCL از NCCL انویدیا برای اجرای مجموعه ها استفاده می کند. AUTO انتخاب را به زمان اجرا موکول می کند. بهترین انتخاب اجرای جمعی به تعداد و نوع GPU ها و اتصال شبکه در خوشه بستگی دارد.

مدل را آموزش دهید

با ادغام tf.distribute.Strategy API در tf.keras ، تنها تغییری که در توزیع آموزش به چند کارگر ایجاد خواهید کرد ، محصور کردن ساختمان مدل و فراخوانی model.compile() در داخل strategy.scope() . دیکته دامنه استراتژی توزیع را چگونه و در کجا متغیرهای ایجاد می کند، و در مورد MultiWorkerMirroredStrategy ، متغیرهای ایجاد می MirroredVariable ، و آنها را بر روی هر یک از کارگران تکرار شده است.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

برای اجرای واقعی با MultiWorkerMirroredStrategy ، باید فرایندهای کارگر را اجرا کنید و یک TF_CONFIG به آنها منتقل کنید.

مانند فایل mnist.py که قبلاً نوشته شد ، در اینجا main.py که هر یک از کارگران اجرا می کنند:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

در قطعه کد بالا توجه داشته باشید که global_batch_size ، که به Dataset.batch منتقل می شود ، روی per_worker_batch_size * num_workers . این تضمین می کند که هر کارگر بدون در نظر گرفتن تعداد کارگران ، نمونه هایی از per_worker_batch_size پردازش می کند.

فهرست فعلی اکنون شامل هر دو پرونده Python است:

ls *.py
main.py
mnist.py

بنابراین json TF_CONFIG را سریال کرده و به متغیرهای محیط اضافه کنید:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

اکنون می توانید یک فرایند کارگر را اجرا کنید که main.py را اجرا کرده و از TF_CONFIG استفاده کنید:

# first kill any previous runs
%killbgscripts
All background processes were killed.

python main.py &> job_0.log

در مورد دستور فوق باید چند نکته را یادداشت کنید:

  1. برای اجرای برخی از دستورات bash از %%bash که یک "جادوی" نوت بوک است .
  2. از پرچم --bg برای اجرای فرآیند bash در پس زمینه استفاده می کند ، زیرا این کارگر خاتمه نمی یابد. قبل از شروع کار منتظر همه کارگران است.

روند کارگر پس زمینه خروجی را به این نوت بوک چاپ نمی کند ، بنابراین &> خروجی خود را به یک فایل هدایت می کند ، بنابراین می توانید ببینید چه اتفاقی افتاده است.

بنابراین ، چند ثانیه صبر کنید تا روند شروع شود:

import time
time.sleep(10)

اکنون ببینید چه چیزی تاکنون در پرونده کارگر خروجی گرفته شده است:

cat job_0.log
2021-01-13 02:21:09.851273: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-13 02:21:11.580815: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:11.581827: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-13 02:21:12.596384: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-13 02:21:12.596457: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1182113050
2021-01-13 02:21:12.596467: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1182113050
2021-01-13 02:21:12.596592: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-13 02:21:12.596630: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-13 02:21:12.596638: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-13 02:21:12.597579: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-13 02:21:12.598070: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:12.598767: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:12.603614: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-13 02:21:12.604141: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

در آخرین خط پرونده ورود به سیستم باید گفته شود: Started server with target: grpc://localhost:12345 . اولین کارگر اکنون آماده است و منتظر آماده شدن سایر کارگران (کارگران) است.

بنابراین tf_config برای فرآیند کارگر دوم به روز کنید تا انتخاب کند:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

اکنون کارگر دوم را راه اندازی کنید. این آموزش از آنجا که همه کارگران فعال هستند ، آموزش را شروع می کند (بنابراین نیازی به پیشینه این روند نیست):

python main.py
Epoch 1/3
70/70 [==============================] - 7s 55ms/step - loss: 2.2922 - accuracy: 0.0922
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2271 - accuracy: 0.3081
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1563 - accuracy: 0.4882

2021-01-13 02:21:19.938829: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-13 02:21:21.653385: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:21.654353: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-13 02:21:22.667646: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-13 02:21:22.667740: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1182113050
2021-01-13 02:21:22.667751: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1182113050
2021-01-13 02:21:22.667869: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-13 02:21:22.667929: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-13 02:21:22.667939: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-13 02:21:22.668853: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-13 02:21:22.669312: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:22.669912: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:22.674013: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-13 02:21:22.674475: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-01-13 02:21:23.661281: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-01-13 02:21:23.905655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-13 02:21:23.906128: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000179999 Hz

اکنون اگر سیاهههای مربوط به کارگر اول را دوباره بررسی کنید خواهید دید که در آموزش آن مدل شرکت کرده است:

cat job_0.log
2021-01-13 02:21:09.851273: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-13 02:21:11.580815: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:11.581827: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-13 02:21:12.596384: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-13 02:21:12.596457: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1182113050
2021-01-13 02:21:12.596467: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1182113050
2021-01-13 02:21:12.596592: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-13 02:21:12.596630: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-13 02:21:12.596638: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-13 02:21:12.597579: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-13 02:21:12.598070: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:12.598767: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-13 02:21:12.603614: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-13 02:21:12.604141: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-01-13 02:21:23.658801: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-01-13 02:21:23.913850: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-13 02:21:23.914363: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000179999 Hz
Epoch 1/3
70/70 [==============================] - 7s 55ms/step - loss: 2.2922 - accuracy: 0.0922
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2271 - accuracy: 0.3081
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1563 - accuracy: 0.4882

تعجب آور نیست که این سرعت نسبت به آزمون ابتدای این آموزش کندتر است . اجرای چندین کارگر در یک ماشین تنها باعث اضافه شدن هزینه های اضافی می شود. هدف در اینجا بهبود زمان آموزش نبود ، بلکه فقط ذکر مثالی از آموزش چند کارگری بود.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

آموزش چند کارگر بصورت عمیق

تاکنون این آموزش یک تنظیم اساسی چند کارگری را نشان داده است. بقیه این سند با جزئیات عوامل دیگری را که ممکن است برای موارد استفاده واقعی مفید یا مهم باشند ، بررسی می کند.

اشتراک داده

در آموزش چند کارگر ، برای اطمینان از همگرایی و عملکرد ، به داده بندی داده نیاز است.

مثال در بخش قبلی به سخت افزار خودکار پیش فرض ارائه شده توسط tf.distribute.Strategy API tf.distribute.Strategy . شما می توانید sharding با تنظیم کنترل tf.data.experimental.AutoShardPolicy از tf.data.experimental.DistributeOptions . برای کسب اطلاعات بیشتر در مورد sharding خودکار به راهنمای توزیع شده مراجعه کنید.

در اینجا یک مثال سریع از نحوه خاموش کردن shard کردن خودکار آورده شده است ، بنابراین هر نسخه هر نمونه را پردازش می کند (توصیه نمی شود):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

ارزیابی

اگر validation_data را به model.fit ، بین آموزش و ارزیابی هر دوره جایگزین می شود. داده های ارزیابی با استفاده از validation_data در بین مجموعه ای مشابه از کارگران توزیع شده و نتایج ارزیابی برای همه کارگران جمع شده و در دسترس است. مشابه آموزش ، مجموعه داده های اعتبار سنجی به طور خودکار در سطح پرونده خرد می شوند. شما باید اندازه دسته جهانی را در مجموعه داده validation_steps تنظیم کنید و مراحل اعتبارسنجی را تعیین کنید. همچنین یک مجموعه داده مکرر برای ارزیابی توصیه می شود.

همچنین می توانید یک کار دیگر ایجاد کنید که به طور دوره ای ایستگاه های بازرسی را بخواند و ارزیابی را اجرا کند. این همان کاری است که ارزیابی کننده انجام می دهد. اما این یک روش توصیه شده برای انجام ارزیابی نیست و بنابراین جزئیات آن حذف می شود.

پیش بینی

در حال حاضر model.predict با MultiWorkerMirroredStrategy. کار نمی کند MultiWorkerMirroredStrategy.

کارایی

شما اکنون یک مدل Keras دارید که همه آن برای تنظیم در چندین کارگر با MultiWorkerMirroredStrategy . برای تغییر عملکرد آموزش چند کارگر با MultiWorkerMirroredStrategy می توانید تکنیک های زیر را امتحان کنید.

  • MultiWorkerMirroredStrategy چندین پیاده سازی ارتباط جمعی را فراهم می کند. RING مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی بین میزبان پیاده سازی می کند. NCCL از NCCL انویدیا برای اجرای مجموعه ها استفاده می کند. AUTO انتخاب را به زمان اجرا موکول می کند. بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPU ها و اتصال شبکه در خوشه بستگی دارد. برای لغو انتخاب خودکار ، پارامتر communication_options MultiWorkerMirroredStrategy 's سازنده را تعیین کنید ، به عنوان مثال communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • در صورت امکان متغیرها را به tf.float کنید. مدل رسمی ResNet مثالی از چگونگی انجام این کار را شامل می شود.

تحمل خطا

در آموزش همزمان ، اگر یکی از کارگران از کار بیفتد و مکانیسم بازیابی شکست وجود نداشته باشد ، خوشه از کار می افتد. استفاده از Keras با tf.distribute.Strategy در مواردی که کارگران می میرند یا در غیر اینصورت ناپایدار هستند با مزیت تحمل خطا همراه است. این کار را با حفظ حالت آموزش در سیستم پرونده توزیع شده به انتخاب خود انجام می دهید ، به این ترتیب که با شروع مجدد نمونه ای که قبلاً خراب یا پیش ساخته شده بود ، حالت آموزش بازیابی می شود.

وقتی کارگری از دسترس خارج شود ، سایر کارگران نیز شکست خواهند خورد (احتمالاً پس از وقفه زمانی). در چنین مواردی ، کارگر غیرقابل دسترس و همچنین کارگران دیگری که شکست خورده اند نیاز به راه اندازی مجدد دارند.

پاسخ ModelCheckpoint

پاسخ ModelCheckpoint دیگر قابلیت تحمل خطا را فراهم نمی کند ، لطفاً از BackupAndRestore به جای آن استفاده کنید.

از تماس ModelCheckpoint می توان برای ذخیره ایست های بازرسی استفاده کرد. اما با این کار ، اگر آموزش قطع شد یا با موفقیت به پایان رسید ، برای ادامه آموزش از ایست بازرسی ، کاربر مسئول بارگیری دستی مدل است.

به صورت اختیاری کاربر می تواند مدل ها / وزنه های خارج از پاسخ ModelCheckpoint را ذخیره و بازیابی کند.

ذخیره و بارگذاری مدل

برای ذخیره مدل خود با استفاده از model.save یا tf.saved_model.save ، مقصد پس انداز باید برای هر کارگر متفاوت باشد. در کارگران غیر ارشد ، شما باید مدل را در یک فهرست موقت ذخیره کنید ، و در بخش اصلی ، باید در فهرست مدل ارائه شده ذخیره کنید. فهرستهای موقت کارگر باید منحصر به فرد باشد تا از خطاهای ناشی از تلاش چندین کارگر برای نوشتن در همان مکان جلوگیری کند. مدل ذخیره شده در تمام دایرکتوری ها یکسان است و معمولاً فقط مدلی که رئیس ذخیره کرده است باید برای بازیابی یا سرویس دادن به آن ارجاع شود. شما باید منطقی برای پاکسازی داشته باشید که فهرست های موقتی ایجاد شده توسط کارگران را پس از اتمام آموزش حذف می کند.

دلیل اینکه شما به طور همزمان در رئیس و کارگران صرفه جویی می کنید این است که ممکن است در هنگام بازرسی متغیرهایی را جمع کنید که به رئیس و کارگران نیاز دارد تا در پروتکل ارتباطی کاهش هزینه شرکت کنند. از طرف دیگر ، اجازه دادن به مدیر و کارگران برای ذخیره در همان دایرکتوری مدل منجر به خطاهای ناشی از مشاجره خواهد شد.

با MultiWorkerMirroredStrategy ، این برنامه بر روی هر کارگر اجرا می شود ، و برای اینکه بدانید کارگر فعلی رئیس است یا خیر ، از شی of حل کننده خوشه که دارای ویژگی های task_type و task_id است ، task_id . task_type به شما می گوید شغل فعلی چیست (به عنوان مثال "کارگر") و task_id شناسه کارگر را به شما می گوید. کارگر با شناسه 0 به عنوان کارگر اصلی تعیین می شود.

در قطعه کد زیر ، write_filepath مسیر پرونده را برای نوشتن فراهم می کند که به شناسه کارگر بستگی دارد. در مورد رئیس (کارگر با شناسه 0) ، آن را در مسیر پرونده اصلی می نویسد. برای دیگران ، یک فهرست موقتی ایجاد می کند (با شناسه در مسیر فهرست) برای نوشتن در:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

با استفاده از این ، اکنون آماده ذخیره هستید:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

همانطور که در بالا توضیح داده شد ، بعداً مدل فقط باید از مسیر ذخیره شده در آن بارگیری شود ، بنابراین بیایید موارد موقت را که کارگران غیر اصلی ذخیره کرده اند حذف کنیم:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

حال ، وقت آنکه بارگذاری شود ، بیایید از API مناسب tf.keras.models.load_model استفاده کنیم و به کار بیشتر ادامه دهیم. در اینجا ، فرض کنید که فقط از تک کارگر برای بارگیری و ادامه آموزش استفاده کنید ، در این صورت شما با tf.keras.models.load_model در یک strategy.scope() دیگر. tf.keras.models.load_model strategy.scope() تماس نمی tf.keras.models.load_model .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 14ms/step - loss: 2.3154 - accuracy: 0.0031
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2786 - accuracy: 0.0180

<tensorflow.python.keras.callbacks.History at 0x7f100a1c2048>

ذخیره و بازیابی ایستگاه بازرسی

از طرف دیگر ، checkpointing به شما امکان می دهد وزنهای مدل را ذخیره کرده و آنها را بدون نیاز به ذخیره کل مدل بازیابی کنید. در اینجا ، شما یک tf.train.Checkpoint ایجاد می کنید که مدل را ردیابی می کند ، و توسط tf.train.CheckpointManager مدیریت می شود تا فقط آخرین ایست بازرسی حفظ شود.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

پس از راه اندازی CheckpointManager ، اکنون آماده ذخیره سازی و حذف ایستگاه های بازرسی کارگران غیر اصلی ذخیره شده هستید.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

اکنون ، هنگامی که نیاز به بازیابی دارید ، می توانید جدیدترین ایست بازرسی ذخیره شده را با استفاده از عملکرد مناسب tf.train.latest_checkpoint کنید. پس از بازیابی ایست بازرسی ، می توانید به آموزش ادامه دهید.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 14ms/step - loss: 2.3230 - accuracy: 0.0978
Epoch 2/2
20/20 [==============================] - 0s 14ms/step - loss: 2.2971 - accuracy: 0.1291

<tensorflow.python.keras.callbacks.History at 0x7f100a0fdb00>

BackupAndRestore پاسخ به تماس

BackupAndRestore پاسخ به تماس فراهم می کند قابلیت تحمل خطا، توسط پشتیبان گیری از مدل و شماره دوره کنونی در یک فایل ایست بازرسی موقت تحت backup_dir آرگومان به BackupAndRestore . این کار در پایان هر دوره انجام می شود.

پس از قطع مشاغل و راه اندازی مجدد آن ، پاسخ مجدد آخرین ایست بازرسی را بازیابی می کند و آموزش از ابتدای دوره قطع شده ادامه می یابد. هر آموزش مقداری که قبلاً در دوره ناتمام قبل از قطع انجام شده باشد ، دور ریخته می شود ، بنابراین بر وضعیت مدل نهایی تأثیر نمی گذارد.

برای استفاده از آن ، نمونه ای از تماس tf.keras.callbacks.experimental.BackupAndRestore در تماس tf.keras.Model.fit() .

با MultiWorkerMirroredStrategy ، اگر کارگر دچار وقفه شود ، کل خوشه مکث می کند تا زمان شروع مجدد کارگر قطع شده. سایر کارگران نیز مجدداً شروع به کار می کنند و کارگر قطع شده دوباره به خوشه می پیوندد. سپس ، هر کارگر پرونده ایست بازرسی را که قبلاً ذخیره شده بود می خواند و حالت سابق خود را برمی دارد ، در نتیجه به خوشه اجازه می دهد که همگام شود. سپس آموزش ادامه می یابد.

تماس با BackupAndRestore از CheckpointManager برای ذخیره و بازیابی حالت آموزش استفاده می کند که فایلی به نام checkpoint ایجاد می کند که ایستگاه های بازرسی موجود را همراه با آخرین مورد ردیابی می کند. به همین دلیل ، برای جلوگیری از برخورد نام ، نباید از backup_dir برای ذخیره سایر ایست های بازرسی استفاده شود.

در حال حاضر ، تماس با BackupAndRestore از یک کارگر بدون استراتژی ، MirroredStrategy و چند کارگر با MultiWorkerMirroredStrategy پشتیبانی می کند. در زیر دو مثال برای آموزش چند کارگری و آموزش تک کارگری آورده شده است.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2964 - accuracy: 0.1302
Epoch 2/3
70/70 [==============================] - 1s 14ms/step - loss: 2.2484 - accuracy: 0.3245
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1932 - accuracy: 0.4485

<tensorflow.python.keras.callbacks.History at 0x7f1009fceda0>

اگر فهرست backup_dir که در BackupAndRestore مشخص کرده BackupAndRestore ، ممکن است متوجه برخی از پرونده های بازرسی موقت شوید. این پرونده ها برای بازیابی موارد از دست رفته قبلی مورد نیاز است و با پایان موفقیت آمیز آموزش ، آنها در پایان tf.keras.Model.fit() توسط کتابخانه حذف می شوند.

همچنین ببینید

  1. راهنمای آموزش توزیع شده در TensorFlow ، نمای کلی از استراتژی های توزیع موجود را ارائه می دهد.
  2. مدل های رسمی ، که بسیاری از آنها را می توان برای اجرای چندین استراتژی توزیع پیکربندی کرد.
  3. بخش عملکرد در راهنما اطلاعاتی در مورد سایر استراتژی ها و ابزارهایی ارائه می دهد که می توانید برای بهینه سازی عملکرد مدل های TensorFlow خود استفاده کنید.