روز جامعه ML 9 نوامبر است! برای به روز رسانی از TensorFlow، JAX به ما بپیوندید، و بیشتر بیشتر بدانید

حلقه آموزش سفارشی با Keras و MultiWorkerMirroredStrategy

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش آموزش چند کارگر با API حلقه آموزش سفارشی را نشان می دهد که از طریق MultiWorkerMirroredStrategy توزیع شده است ، بنابراین یک مدل Keras که برای کار با یک کارگر طراحی شده است می تواند با کمترین تغییر کد روی چندین کارگر کار کند.

ما برای آموزش مدل خود از حلقه های آموزش سفارشی استفاده می کنیم ، زیرا آنها به ما انعطاف پذیری و کنترل بیشتری بر آموزش می دهند. علاوه بر این ، اشکال زدایی از مدل و حلقه آموزش آسان تر است. اطلاعات دقیق تر در نوشتن یک حلقه آموزش از ابتدا موجود است .

اگر به دنبال چگونگی استفاده از MultiWorkerMirroredStrategy با keras model.fit ، در عوض به این آموزش مراجعه کنید.

راهنمای آموزش توزیع شده در TensorFlow برای مروری بر استراتژی های توزیع TensorFlow برای کسانی که به درک عمیق tf.distribute.Strategy از API های tf.distribute.Strategy پشتیبانی می کنند ، در tf.distribute.Strategy .

برپایی

اول ، برخی از واردات لازم.

import json
import os
import sys

قبل از وارد کردن TensorFlow ، چند تغییر در محیط ایجاد کنید.

همه GPU ها را غیرفعال کنید. این از خطاهای ناشی از تلاش همه کارگران برای استفاده از GPU یکسان جلوگیری می کند. برای یک کاربرد واقعی ، هر کارگر روی ماشین دیگری است.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

متغیر محیط TF_CONFIG را بازنشانی کنید ، بعداً اطلاعات بیشتری در این مورد خواهید دید.

os.environ.pop('TF_CONFIG', None)

مطمئن شوید که فهرست فعلی در مسیر پایتون قرار دارد. این به نوت بوک اجازه می دهد تا پرونده های نوشته شده توسط %%writefile بعداً وارد کند.

if '.' not in sys.path:
  sys.path.insert(0, '.')

اکنون TensorFlow را وارد کنید.

import tensorflow as tf

مجموعه داده و تعریف مدل

بعد یک فایل mnist.py با یک مدل ساده و تنظیمات مجموعه داده ایجاد کنید. این فایل پایتون توسط فرآیندهای کارگر در این آموزش استفاده خواهد شد:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

پیکربندی چند کارگر

حال بیایید وارد دنیای آموزش چند کارگری شویم. در TF_CONFIG متغیر محیطی TF_CONFIG برای آموزش روی چندین ماشین مورد نیاز است ، که هر کدام احتمالاً نقش متفاوتی دارند. TF_CONFIG استفاده شده در زیر ، یک رشته JSON است که برای تعیین پیکربندی خوشه در هر کارگر که بخشی از خوشه است استفاده می شود. این روش پیش فرض برای تعیین خوشه ، با استفاده از cluster_resolver.TFConfigClusterResolver ، اما گزینه های دیگری نیز در ماژول distribute.cluster_resolver موجود است.

خوشه خود را توصیف کنید

در اینجا یک مثال پیکربندی وجود دارد:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

در اینجا همان TF_CONFIG سریال رشته JSON است:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

دو جز components TF_CONFIG : cluster و task .

  • cluster برای همه کارگران یکسان است و اطلاعات مربوط به خوشه آموزش را ارائه می دهد ، که یک دستورالعمل متشکل از انواع مختلف شغل مانند worker . در آموزش چند کارگر با MultiWorkerMirroredStrategy ، معمولاً یک worker وجود دارد که علاوه بر کاری که یک worker عادی انجام worker دهد ، مسئولیت کمی بیشتر مانند ذخیره ایست بازرسی و نوشتن پرونده خلاصه برای TensorBoard را بر عهده worker گیرد. از چنین کارگری به عنوان کارگر chief می شود ، و معمول است که worker با index 0 به عنوان worker اصلی منصوب می شود (در واقع این نحوه اجرای tf.distribute.Strategy است).

  • task اطلاعات مربوط به وظیفه فعلی را فراهم می کند و برای هر کارگر متفاوت است. type و index آن کارگر را مشخص می کند.

در این مثال ، شما type کار را روی "worker" و index کار را روی 0 . این دستگاه اولین کارگر است و به عنوان کارگر اصلی منصوب می شود و کارهای بیشتری نسبت به بقیه انجام می دهد. توجه داشته باشید که سایر ماشین ها نیز باید TF_CONFIG متغیر محیطی TF_CONFIG را داشته باشند و باید دارای cluster یکسان باشد ، اما type کار یا index وظیفه متفاوت بستگی به نقش آن ماشین ها دارد.

برای اهداف تصویر سازی ، این آموزش نشان می دهد چگونه می توان TF_CONFIG با 2 کارگر در localhost . در عمل ، کاربران چندین کارگر در آدرس IP / پورت های خارجی ایجاد می کنند و TF_CONFIG روی هر کارگر به طور مناسب تنظیم می کنند.

در این مثال شما از 2 کارگر استفاده خواهید کرد ، TF_CONFIG کارگر اول در بالا نشان داده شده است. برای کارگر دوم شما tf_config['task']['index']=1

در بالا ، tf_config فقط یک متغیر محلی در پایتون است. برای استفاده واقعی از آن برای پیکربندی آموزش ، این فرهنگ لغت باید به صورت سریال JSON مرتب شود و در متغیر محیط TF_CONFIG قرار گیرد.

متغیرهای محیطی و فرایندهای فرعی در دفترها

زیر پردازش ها متغیرهای محیط را از والدین خود به ارث می برند. بنابراین اگر در این فرآیند jupyter notebook متغیر محیطی تنظیم کنید:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

از زیر فرایندها می توانید به متغیر محیط دسترسی پیدا کنید:

echo ${GREETINGS}
Hello TensorFlow!

در بخش بعدی ، از این برای انتقال TF_CONFIG به TF_CONFIG کارگر استفاده خواهید کرد. شما واقعاً هرگز شغل خود را از این طریق راه اندازی نمی کنید ، اما برای اهداف این آموزش کافی است: نشان دادن یک مثال حداقل چند کارگر.

MultiWorkerMirroredStrategy

برای آموزش مدل ، از نمونه ای از tf.distribute.MultiWorkerMirroredStrategy که کپی همه متغیرها را در لایه های مدل در هر دستگاه در همه کارگران ایجاد می کند. راهنمای tf.distribute.Strategy جزئیات بیشتری در مورد این استراتژی دارد.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

از tf.distribute.Strategy.scope استفاده کنید تا مشخص کنید هنگام ساخت مدل شما باید از یک استراتژی استفاده شود. این شما را در " متن کپی متقابل " برای این استراتژی قرار می دهد ، این بدان معنی است که استراتژی تحت کنترل مواردی مانند جایگذاری متغیر قرار می گیرد.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

داده های خود را بین کارگران خرد کنید

در آموزش های چند کارگر ، تقسیم داده ها لزوماً لازم نیست ، اما دقیقاً یک بار معنایی به شما می دهد که باعث می شود آموزش بیشتر تکرار شود ، یعنی آموزش روی چندین کارگر باید همان آموزش یک کارگر باشد. توجه: در برخی موارد عملکرد می تواند تحت تأثیر قرار گیرد.

مشاهده کنید: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

حلقه آموزش سفارشی را تعریف کنید و مدل را آموزش دهید

یک بهینه ساز را مشخص کنید

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

یک مرحله آموزش با tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

ذخیره و بازیابی ایستگاه بازرسی

پیاده سازی Checkpointing در یک حلقه آموزش سفارشی به کاربر نیاز دارد تا به جای استفاده از پاسخ keras ، آن را کنترل کند. این به شما امکان می دهد وزن های مدل را ذخیره کرده و بدون نیاز به ذخیره کل مدل ، آنها را بازیابی کنید.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

در اینجا ، شما یک tf.train.Checkpoint ایجاد می کنید که مدل را ردیابی می کند و توسط tf.train.CheckpointManager مدیریت می شود تا فقط آخرین ایست بازرسی حفظ شود.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

اکنون ، هنگامی که نیاز به بازیابی دارید ، می توانید جدیدترین ایست بازرسی ذخیره شده را با استفاده از عملکرد مناسب tf.train.latest_checkpoint کنید.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

پس از بازیابی ایست بازرسی ، می توانید با آموزش حلقه آموزش سفارشی خود ادامه دهید.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

تنظیم کامل کد برای کارگران

برای اجرای واقعی با MultiWorkerMirroredStrategy ، باید فرایندهای کارگر را اجرا کنید و یک TF_CONFIG به آنها منتقل کنید.

مانند فایل mnist.py که قبلاً نوشته شد ، در اینجا main.py که حاوی همان کدی است که گام به گام در این کولاب گام به گام طی کردیم ، ما فقط آن را در یک فایل می نویسیم تا هر یک از کارگران آن را اجرا کنند:

پرونده: main.py

Writing main.py

آموزش دهید و ارزیابی کنید

فهرست فعلی اکنون شامل هر دو پرونده Python است:

ls *.py
main.py
mnist.py

بنابراین json TF_CONFIG را سریال کرده و به متغیرهای محیط اضافه کنید:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

اکنون می توانید یک فرایند کارگر را اجرا کنید که main.py را اجرا کرده و از TF_CONFIG استفاده کنید:

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

چند نکته در مورد دستور بالا وجود دارد که باید توجه داشته باشید:

  1. برای اجرای برخی از دستورات bash از %%bash که یک "جادوی" نوت بوک است استفاده می کند.
  2. از پرچم --bg برای اجرای فرآیند bash در پس زمینه استفاده می کند ، زیرا این کارگر پایان نمی یابد. قبل از شروع کار منتظر همه کارگران است.

روند کارگر پس زمینه خروجی را به این نوت بوک چاپ نمی کند ، بنابراین &> خروجی خود را به یک فایل هدایت می کند ، بنابراین می توانید ببینید چه اتفاقی افتاده است.

بنابراین ، چند ثانیه صبر کنید تا روند شروع شود:

import time
time.sleep(20)

اکنون ببینید چه چیزی تاکنون در پرونده کارگر خروجی گرفته شده است:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

در آخرین خط پرونده ورود به سیستم باید گفته شود: Started server with target: grpc://localhost:12345 . اولین کارگر اکنون آماده است و منتظر آماده شدن سایر کارگران (کارگران) است.

بنابراین tf_config برای فرآیند کارگر دوم به روز کنید تا انتخاب کند:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

اکنون کارگر دوم را راه اندازی کنید. این کار از آنجا که همه کارگران فعال هستند ، آموزش را شروع می کنیم (بنابراین نیازی به پیشینه این روند نیست):

python main.py > /dev/null 2>&1

اکنون اگر سیاهههای مربوط به کارگر اول را دوباره بررسی کنید می بینید که در آموزش آن مدل شرکت کرده است:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

آموزش چند کارگر بصورت عمیق

این آموزش گردش کار Custom Training Loop چند کارگر را نشان داده است. شرح مفصلی از سایر موضوعات در model.fit's guide تنظیمات چند کاره و قابل استفاده در CTL موجود است.

همچنین ببینید

  1. راهنمای آموزش توزیع شده در TensorFlow ، نمای کلی از استراتژی های توزیع موجود را ارائه می دهد.
  2. مدل های رسمی ، که بسیاری از آنها را می توان برای اجرای چندین استراتژی توزیع پیکربندی کرد.
  3. بخش عملکرد در راهنما ، اطلاعاتی در مورد سایر استراتژی ها و ابزارهایی ارائه می دهد که می توانید برای بهینه سازی عملکرد مدل های TensorFlow خود استفاده کنید.