Loop pelatihan khusus dengan Keras dan MultiWorkerMirroredStrategy

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

Ringkasan

Tutorial ini mendemonstrasikan pelatihan multi-pekerja dengan API loop pelatihan khusus, yang didistribusikan melalui MultiWorkerMirroredStrategy, sehingga model Keras yang dirancang untuk dijalankan pada pekerja tunggal dapat bekerja dengan mulus pada banyak pekerja dengan perubahan kode minimal.

Kami menggunakan loop pelatihan khusus untuk melatih model kami karena mereka memberi kami fleksibilitas dan kontrol yang lebih besar pada pelatihan. Selain itu, lebih mudah untuk men-debug model dan loop pelatihan. Informasi lebih rinci tersedia di Menulis loop pelatihan dari awal .

Jika Anda mencari cara menggunakan MultiWorkerMirroredStrategy dengan keras model.fit , lihat tutorial ini sebagai gantinya.

Pelatihan Terdistribusi dalam panduan TensorFlow tersedia untuk ikhtisar tentang strategi distribusi yang didukung TensorFlow bagi mereka yang tertarik untuk memahami lebih dalam tentang tf.distribute.Strategy API.

Mempersiapkan

Pertama, beberapa impor yang diperlukan.

import json
import os
import sys

Sebelum mengimpor TensorFlow, buat beberapa perubahan pada lingkungan.

Nonaktifkan semua GPU. Ini mencegah kesalahan yang disebabkan oleh semua pekerja yang mencoba menggunakan GPU yang sama. Untuk aplikasi nyata setiap pekerja akan berada di mesin yang berbeda.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Setel ulang variabel lingkungan TF_CONFIG , Anda akan melihat lebih banyak tentang ini nanti.

os.environ.pop('TF_CONFIG', None)

Pastikan direktori saat ini berada di jalur python. Ini memungkinkan notebook untuk mengimpor file yang ditulis oleh %%writefile nanti.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Sekarang impor TensorFlow.

import tensorflow as tf

Definisi set data dan model

Selanjutnya buat file mnist.py dengan model sederhana dan pengaturan dataset. File python ini akan digunakan oleh proses pekerja dalam tutorial ini:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Konfigurasi Multi-Pekerja

Sekarang mari kita memasuki dunia pelatihan multi-pekerja. Di TensorFlow, variabel lingkungan TF_CONFIG diperlukan untuk pelatihan di beberapa mesin, yang masing-masing mungkin memiliki peran yang berbeda. TF_CONFIG digunakan di bawah ini, adalah string JSON yang digunakan untuk menentukan konfigurasi cluster pada setiap pekerja yang merupakan bagian dari cluster. Ini adalah metode default untuk menentukan cluster, menggunakan cluster_resolver.TFConfigClusterResolver , tetapi ada opsi lain yang tersedia di distribute.cluster_resolver .

Deskripsikan cluster Anda

Berikut adalah contoh konfigurasi:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ini adalah serial TF_CONFIG yang sama dengan string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Ada dua komponen TF_CONFIG : cluster dan task .

  • cluster adalah sama untuk semua pekerja dan memberikan informasi tentang cluster pelatihan, yang merupakan dict yang terdiri dari berbagai jenis pekerjaan seperti worker . Dalam pelatihan multi-pekerja dengan MultiWorkerMirroredStrategy , biasanya ada satu worker yang mengambil sedikit tanggung jawab seperti menyimpan pos pemeriksaan dan menulis file ringkasan untuk TensorBoard selain apa yang dilakukan worker biasa. Pekerja seperti itu disebut sebagai pekerja chief , dan merupakan kebiasaan bahwa worker dengan index 0 ditunjuk sebagai worker utama (sebenarnya beginilah cara tf.distribute.Strategy diterapkan).

  • task memberikan informasi tentang tugas saat ini dan berbeda pada setiap pekerja. Ini menentukan type dan index pekerja itu.

Dalam contoh ini, Anda mengatur type tugas ke "worker" dan index tugas ke 0 . Mesin ini adalah pekerja pertama dan akan diangkat sebagai kepala pekerja dan melakukan lebih banyak pekerjaan daripada yang lain. Perhatikan bahwa mesin lain harus memiliki variabel lingkungan TF_CONFIG yang disetel juga, dan itu harus memiliki dict cluster yang sama, tetapi type tugas atau index tugas yang berbeda tergantung pada peran mesin tersebut.

Untuk tujuan ilustrasi, tutorial ini menunjukkan bagaimana seseorang dapat mengatur TF_CONFIG dengan 2 pekerja di localhost . Dalam praktiknya, pengguna akan membuat banyak pekerja pada alamat/port IP eksternal, dan menyetel TF_CONFIG pada setiap pekerja dengan tepat.

Dalam contoh ini Anda akan menggunakan 2 pekerja, TF_CONFIG pekerja pertama ditampilkan di atas. Untuk pekerja kedua Anda akan mengatur tf_config['task']['index']=1

Di atas, tf_config hanyalah variabel lokal di python. Untuk benar-benar menggunakannya untuk mengonfigurasi pelatihan, kamus ini perlu diserialisasikan sebagai JSON, dan ditempatkan di variabel lingkungan TF_CONFIG .

Variabel lingkungan dan subproses di notebook

Subproses mewarisi variabel lingkungan dari induknya. Jadi, jika Anda menetapkan variabel lingkungan dalam proses jupyter notebook ini:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Anda dapat mengakses variabel lingkungan dari subproses:

echo ${GREETINGS}
Hello TensorFlow!

Di bagian berikutnya, Anda akan menggunakan ini untuk meneruskan TF_CONFIG ke subproses pekerja. Anda tidak akan pernah benar-benar meluncurkan pekerjaan Anda dengan cara ini, tetapi itu sudah cukup untuk tujuan tutorial ini: Untuk mendemonstrasikan contoh multi-pekerja minimal.

MultiWorkerMirroredStrategy

Untuk melatih model, gunakan instance tf.distribute.MultiWorkerMirroredStrategy , yang membuat salinan semua variabel dalam lapisan model pada setiap perangkat di semua pekerja. Panduan tf.distribute.Strategy memiliki rincian lebih lanjut tentang strategi ini.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Gunakan tf.distribute.Strategy.scope untuk menentukan bahwa strategi harus digunakan saat membangun model Anda. Ini menempatkan Anda dalam " konteks replika silang " untuk strategi ini, yang berarti strategi tersebut mengendalikan hal-hal seperti penempatan variabel.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Auto-sharding data Anda ke seluruh pekerja

Dalam pelatihan multi-pekerja, sharding kumpulan data tidak selalu diperlukan, namun ini memberi Anda semantik tepat satu kali yang membuat lebih banyak pelatihan lebih dapat direproduksi, yaitu pelatihan pada banyak pekerja harus sama dengan pelatihan pada satu pekerja. Catatan: kinerja dapat terpengaruh dalam beberapa kasus.

Lihat: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Tentukan Loop Pelatihan Kustom dan Latih modelnya

Tentukan pengoptimal

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Tentukan langkah pelatihan dengan tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Menyimpan dan memulihkan pos pemeriksaan

Implementasi pos pemeriksaan dalam Loop Pelatihan Kustom mengharuskan pengguna untuk menanganinya alih-alih menggunakan panggilan balik yang keras. Ini memungkinkan Anda untuk menyimpan bobot model dan mengembalikannya tanpa harus menyimpan keseluruhan model.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Di sini, Anda akan membuat satu tf.train.Checkpoint yang melacak model, yang dikelola oleh tf.train.CheckpointManager sehingga hanya pos pemeriksaan terbaru yang dipertahankan.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Sekarang, ketika Anda perlu memulihkan, Anda dapat menemukan pos pemeriksaan terbaru yang disimpan menggunakan fungsi tf.train.latest_checkpoint yang nyaman.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Setelah memulihkan pos pemeriksaan, Anda dapat melanjutkan dengan melatih loop pelatihan khusus Anda.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Penyiapan kode lengkap pada pekerja

Untuk benar-benar berjalan dengan MultiWorkerMirroredStrategy Anda harus menjalankan proses pekerja dan meneruskan TF_CONFIG kepada mereka.

Seperti file mnist.py yang ditulis sebelumnya, berikut ini adalah main.py yang berisi kode yang sama dengan yang kita jalani langkah demi langkah sebelumnya di colab ini, kita hanya menulisnya ke file sehingga masing-masing pekerja akan menjalankannya:

File: main.py

Writing main.py

Latih dan Evaluasi

Direktori saat ini sekarang berisi kedua file Python:

ls *.py
main.py
mnist.py

Jadi json-serialize TF_CONFIG dan tambahkan ke variabel lingkungan:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Sekarang, Anda dapat meluncurkan proses pekerja yang akan menjalankan main.py dan menggunakan TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ada beberapa hal yang perlu diperhatikan tentang perintah di atas:

  1. Ia menggunakan %%bash yang merupakan "ajaib" notebook untuk menjalankan beberapa perintah bash.
  2. Ia menggunakan flag --bg untuk menjalankan proses bash di latar belakang, karena pekerja ini tidak akan berhenti. Itu menunggu semua pekerja sebelum dimulai.

Proses pekerja di latar belakang tidak akan mencetak output ke notebook ini, jadi &> mengarahkan outputnya ke file, sehingga Anda dapat melihat apa yang terjadi.

Jadi, tunggu beberapa detik hingga proses dimulai:

import time
time.sleep(20)

Sekarang lihat apa yang telah dihasilkan ke file log pekerja sejauh ini:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Baris terakhir dari file log harus mengatakan: Started server with target: grpc://localhost:12345 . Pekerja pertama sekarang siap, dan sedang menunggu semua pekerja lain siap untuk melanjutkan.

Jadi perbarui tf_config untuk proses pekerja kedua untuk mengambil:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Sekarang luncurkan pekerja kedua. Ini akan memulai pelatihan karena semua pekerja aktif (jadi proses ini tidak perlu dilatar belakangi):

python main.py > /dev/null 2>&1

Sekarang jika Anda memeriksa ulang log yang ditulis oleh pekerja pertama, Anda akan melihat bahwa ia berpartisipasi dalam pelatihan model itu:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Pelatihan multi pekerja secara mendalam

Tutorial ini telah mendemonstrasikan alur kerja Custom Training Loop dari pengaturan multi-pekerja. Penjelasan rinci tentang topik lain tersedia di model.fit's guide pengaturan multi-pekerja dan berlaku untuk CTL.

Lihat juga

  1. Pelatihan Terdistribusi dalam panduan TensorFlow memberikan gambaran umum tentang strategi distribusi yang tersedia.
  2. Model resmi , banyak di antaranya dapat dikonfigurasi untuk menjalankan beberapa strategi distribusi.
  3. Bagian Kinerja dalam panduan memberikan informasi tentang strategi dan alat lain yang dapat Anda gunakan untuk mengoptimalkan kinerja model TensorFlow Anda.