Punya pertanyaan? Terhubung dengan komunitas di Forum Kunjungan TensorFlow Forum

Pelatihan Server Parameter

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

Gambaran

Pelatihan server parameter adalah metode paralel data umum untuk meningkatkan pelatihan model di beberapa mesin. Kluster pelatihan server parameter terdiri dari pekerja dan server parameter. Variabel dibuat di server parameter dan dibaca serta diperbarui oleh pekerja di setiap langkah. Secara default, pekerja membaca dan memperbarui variabel ini secara independen tanpa melakukan sinkronisasi satu sama lain. Inilah sebabnya mengapa terkadang pelatihan gaya server parameter disebut pelatihan asinkron.

Di TF2, pelatihan server parameter diberdayakan oleh kelas tf.distribute.experimental.ParameterServerStrategy , yang mendistribusikan langkah-langkah pelatihan ke kluster yang menskalakan hingga ribuan pekerja (disertai dengan server parameter). Ada dua API pelatihan utama yang didukung: API Keras Training, juga dikenal sebagai Model.fit , dan Custom Training Loop (CTL). Model.fit direkomendasikan jika pengguna lebih memilih abstraksi tingkat tinggi dan penanganan pelatihan, sedangkan CTL direkomendasikan saat pengguna lebih memilih untuk menentukan detail loop pelatihan mereka.

Terlepas dari API pilihan, pelatihan terdistribusi di TF2 melibatkan "cluster" dengan beberapa "pekerjaan", dan setiap pekerjaan mungkin memiliki satu atau lebih "tugas". Saat menggunakan pelatihan server parameter, disarankan untuk memiliki satu pekerjaan koordinator (yang memiliki chief nama pekerjaan), beberapa pekerjaan pekerja ( worker nama pekerjaan), dan beberapa pekerjaan server parameter (nama pekerjaan ps ).

Saat koordinator membuat sumber daya, mengirimkan tugas pelatihan, menulis checkpoint, dan menangani kegagalan tugas, pekerja dan server parameter menjalankan tf.distribute.Server yang mendengarkan permintaan dari koordinator.

Pelatihan server parameter dengan Model.fit API

Pelatihan server parameter dengan Model.fit API memerlukan koordinator untuk menggunakan objek tf.distribute.experimental.ParameterServerStrategy , dan tf.keras.utils.experimental.DatasetCreator sebagai masukan. Mirip dengan penggunaan Model.fit tanpa strategi, atau dengan strategi lain, alur kerja melibatkan pembuatan dan penyusunan model, menyiapkan callback, diikuti dengan panggilan Model.fit .

Pelatihan server parameter dengan API loop pelatihan khusus (CTL)

Dengan CTLs, kelas tf.distribute.experimental.coordinator.ClusterCoordinator adalah komponen kunci yang digunakan untuk koordinator. Kelas ClusterCoordinator perlu bekerja bersama dengan objek tf.distribute.Strategy . Objek tf.distribute.Strategy ini diperlukan untuk memberikan informasi cluster dan digunakan untuk menentukan langkah pelatihan seperti yang telah kita lihat dalam pelatihan kustom dengan MirroredStrategy . Objek ClusterCoordinator kemudian mengirimkan pelaksanaan langkah-langkah pelatihan ini ke pekerja jarak jauh. Untuk pelatihan server parameter, ClusterCoordinator perlu bekerja dengan tf.distribute.experimental.ParameterServerStrategy .

API terpenting yang disediakan oleh objek ClusterCoordinator adalah schedule . API schedule mengantrekan fungsi tf.function dan mengembalikan RemoteValue seperti masa depan dengan segera. Fungsi antrean akan dikirim ke pekerja jarak jauh di utas latar belakang dan RemoteValue mereka akan diisi secara asinkron. Karena schedule tidak memerlukan penugasan pekerja, tf.function diteruskan dapat dijalankan pada pekerja mana pun yang tersedia. Jika pekerja yang menjalankannya menjadi tidak tersedia sebelum selesai, fungsi tersebut akan dicoba ulang pada pekerja lain yang tersedia. Karena fakta ini dan fakta bahwa eksekusi fungsi tidak bersifat atomik, suatu fungsi dapat dijalankan lebih dari sekali.

Selain mengirimkan fungsi jarak jauh, ClusterCoordinator juga membantu membuat kumpulan data pada semua pekerja dan membangun kembali kumpulan data ini ketika seorang pekerja pulih dari kegagalan.

Pengaturan Tutorial

Tutorial akan bercabang menjadi jalur CTL atau Model.fit , dan Anda dapat memilih salah satu yang sesuai dengan kebutuhan Anda. Bagian selain "Pelatihan dengan X" dapat diterapkan ke kedua jalur.

pip install -q portpicker
pip install -q tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Penyiapan Cluster

Seperti disebutkan di atas, cluster pelatihan server parameter memerlukan tugas koordinator yang menjalankan program pelatihan Anda, satu atau beberapa pekerja dan tugas server parameter yang menjalankan server TensorFlow, yaitu tf.distribute.Server , dan mungkin tugas evaluasi tambahan yang menjalankan side-car evaluasi (lihat bagian evaluasi mobil samping di bawah). Persyaratan untuk menyiapkannya adalah:

  • Tugas koordinator perlu mengetahui alamat dan port dari semua server TensorFlow lainnya kecuali evaluator.
  • Pekerja dan server parameter perlu mengetahui port mana yang perlu mereka dengarkan. Demi kesederhanaan, kami biasanya meneruskan informasi cluster lengkap saat membuat server TensorFlow untuk tugas-tugas ini.
  • Tugas evaluator tidak harus mengetahui penyiapan cluster pelatihan. Jika ya, itu seharusnya tidak mencoba untuk terhubung ke cluster pelatihan.
  • Pekerja dan server parameter harus memiliki jenis tugas masing-masing sebagai "pekerja" dan "ps". Koordinator harus menggunakan "kepala" sebagai jenis tugas karena alasan warisan.

Dalam tutorial ini, kita akan membuat cluster dalam proses sehingga seluruh pelatihan server parameter dapat dijalankan di colab. Kami akan memperkenalkan cara menyiapkan cluster nyata di bagian selanjutnya.

Kluster dalam proses

Dalam tutorial ini, kita akan memulai banyak server TensorFlow terlebih dahulu dan menghubungkannya nanti. Perhatikan bahwa ini hanya untuk tujuan demonstrasi tutorial ini, dan dalam pelatihan nyata server akan dimulai pada mesin pekerja dan ps.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Pengaturan cluster dalam proses sering digunakan dalam pengujian unit kami. Ini salah satu contohnya .

Buat instance ParameterServerStrategy

Sebelum kita menyelami kode pelatihan, mari kita buat contoh objek ParameterServerStrategy . Perhatikan bahwa ini diperlukan terlepas dari apakah Anda melanjutkan dengan loop pelatihan khusus atau Model.fit . argumen variable_partitioner akan dijelaskan di bagian selanjutnya .

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:24366', 'localhost:17071'], 'worker': ['localhost:17839', 'localhost:24811', 'localhost:19665']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:24366', 'localhost:17071'], 'worker': ['localhost:17839', 'localhost:24811', 'localhost:19665']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0', '/job:chief/replica:0/task:0/device:GPU:1', '/job:chief/replica:0/task:0/device:GPU:2', '/job:chief/replica:0/task:0/device:GPU:3', '/job:chief/replica:0/task:0/device:GPU:4', '/job:chief/replica:0/task:0/device:GPU:5'], variable_device = '/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Untuk menggunakan GPU untuk pelatihan, alokasikan GPU yang terlihat oleh setiap pekerja. ParameterServerStrategy akan menggunakan semua GPU yang tersedia di setiap pekerja, dengan batasan bahwa semua pekerja harus memiliki jumlah GPU yang sama yang tersedia.

Sharding variabel

Variable sharding mengacu pada pemisahan variabel menjadi beberapa variabel yang lebih kecil. Kami menyebut variabel yang lebih kecil ini sebagai pecahan . Sharding variabel mungkin berguna untuk mendistribusikan beban jaringan saat mengakses shard ini. Ini juga berguna untuk mendistribusikan komputasi dan penyimpanan variabel normal ke beberapa server parameter.

Untuk mengaktifkan sharding variabel, Anda dapat meneruskan variable_partitioner saat membuat objek ParameterServerStrategy . variable_partitioner akan dipanggil setiap kali variabel dibuat dan diharapkan mengembalikan jumlah pecahan di sepanjang setiap dimensi variabel. Beberapa variable_partitioner out-of-box disediakan seperti tf.distribute.experimental.partitioners.FixedShardsPartitioner .

Saat variable_partitioner diteruskan dan jika Anda membuat variabel langsung di bawah strategy.scope() , ini akan menjadi jenis penampung dengan properti variables yang memberikan akses ke daftar pecahan. Biasanya, container ini akan otomatis diubah menjadi Tensor dengan menggabungkan semua shard. Hasilnya, ini dapat digunakan sebagai variabel normal. Di sisi lain, beberapa metode TensorFlow seperti tf.nn.embedding_lookup menyediakan implementasi yang efisien untuk jenis penampung ini dan dalam metode ini penggabungan otomatis akan dihindari.

Silakan lihat dokumen API dari ParameterServerStrategy untuk lebih jelasnya.

Pelatihan dengan Model.fit

Keras menyediakan API pelatihan yang mudah digunakan melalui Model.fit yang menangani loop pelatihan di balik train_step , dengan train_step , dan callback yang menyediakan fungsionalitas seperti penyimpanan pos pemeriksaan, atau penyimpanan ringkasan untuk TensorBoard. Dengan Model.fit , kode pelatihan yang sama dapat digunakan untuk strategi lain dengan pertukaran sederhana dari objek strategi.

Memasukan data

Model.fit dengan pelatihan server parameter mengharuskan data input disediakan dalam callable yang menggunakan argumen tunggal berjenis tf.distribute.InputContext , dan menampilkantf.data.Dataset . Kemudian, buat objek tf.keras.utils.experimental.DatasetCreator yang mengambil callable , dan objek opsional tf.distribute.InputOptions melalui argumen input_options . Perhatikan bahwa disarankan untuk mengacak dan mengulangi data dengan pelatihan server parameter, dan menentukan steps_per_epoch dalam panggilan yang fit sehingga pustaka mengetahui batas waktu.

Silakan lihat panduan Input Terdistribusi untuk informasi lebih lanjut tentang argumen InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Kode dalam dataset_fn akan dipanggil pada perangkat input, yang biasanya adalah CPU, pada setiap mesin pekerja.

Konstruksi dan kompilasi model

Sekarang, Anda akan membuat tf.keras.Model dengan API pilihan (model sepele tf.keras.models.Sequential digunakan sebagai demonstrasi di sini), diikuti dengan panggilan Model.compile untuk memasukkan komponen seperti pengoptimal, metrik, atau parameter seperti steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Panggilan balik dan pelatihan

Sebelum Anda memanggil model.fit untuk pelatihan sebenarnya, mari persiapkan callback yang diperlukan untuk tugas-tugas umum seperti:

  • ModelCheckpoint - untuk menyimpan bobot model.

  • BackupAndRestore - untuk memastikan kemajuan pelatihan secara otomatis dicadangkan, dan dipulihkan jika cluster mengalami ketidaktersediaan (seperti pembatalan atau preemption), atau

  • TensorBoard - untuk menyimpan laporan kemajuan ke dalam file ringkasan yang divisualisasikan dalam alat TensorBoard.

Perhatikan bahwa karena pertimbangan performa, callback kustom tidak boleh mengganti callback tingkat batch saat digunakan dengan ParameterServerStrategy . Harap ubah panggilan balik khusus Anda untuk menjadikannya panggilan tingkat waktu, dan sesuaikan steps_per_epoch ke nilai yang sesuai. Selain itu, steps_per_epoch adalah argumen yang diperlukan untuk Model.fit saat digunakan dengan ParameterServerStrategy .

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 6s - loss: 0.9476
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.8812
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.5994
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f1e973a5d40> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f1e8c19eb90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.4205
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.3881
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<tensorflow.python.keras.callbacks.History at 0x7f236401be50>

Penggunaan langsung dengan ClusterCoordinator (opsional)

Bahkan jika Anda memilih jalur pelatihan Model.fit , Anda dapat secara opsional membuat ClusterCoordinator objek ClusterCoordinator untuk menjadwalkan fungsi lain yang ingin Anda jalankan pada pekerja. Lihat di bawah Pelatihan dengan bagian Loop Pelatihan Kustom untuk detail dan contoh selengkapnya.

Pelatihan dengan Loop Pelatihan Kustom

Loop pelatihan kustom dengan tf.distribute.Strategy memberikan fleksibilitas yang tinggi untuk menentukan loop pelatihan. Dengan ParameterServerStrategy ditentukan di atas, Anda akan menggunakan ClusterCoordinator untuk mengirimkan pelaksanaan langkah-langkah pelatihan ke pekerja jarak jauh.

Kemudian, Anda akan membuat model, menentukan kumpulan data dan fungsi langkah seperti yang telah kita lihat di loop pelatihan dengan tf.distribute.Strategy lainnya. Anda dapat menemukan detail lebih lanjut di tutorial ini.

Untuk memastikan pengambilan set data yang efisien, gunakan API pembuatan set data terdistribusi yang direkomendasikan yang disebutkan di bagian Langkah Pelatihan Pengiriman untuk pekerja jarak jauh di bawah. Selain itu, pastikan untuk memanggil strategy.run di dalam worker_fn untuk memanfaatkan sepenuhnya GPU yang dialokasikan pada pekerja. Langkah-langkah lainnya sama untuk pelatihan dengan atau tanpa GPU.

Mari buat komponen ini dengan langkah-langkah berikut:

Siapkan datanya

Pertama, tulis fungsi yang membuat kumpulan data yang menyertakan logika preprocessing yang diimplementasikan oleh lapisan preprocessing Keras. Kami akan membuat lapisan ini di luar dataset_fn tetapi menerapkan transformasi di dalam dataset_fn karena Anda akan membungkus dataset_fn ke dalam fungsi tf.function yang tidak memungkinkan variabel dibuat di dalamnya.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab,
                                          mask_token=None)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Hasilkan contoh mainan dalam kumpulan data:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Kemudian kami membuat set data pelatihan yang dibungkus dengan dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Bangun modelnya

Kedua, kami membuat model dan objek lainnya. Pastikan untuk membuat semua variabel di bawah strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Mari kita konfirmasikan bahwa penggunaan FixedShardsPartitioner membagi semua variabel menjadi dua pecahan dan setiap pecahan ditetapkan ke server parameter yang berbeda:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Tentukan langkah pelatihan

Ketiga, buat langkah pelatihan yang dibungkus dengan fungsi tf.function .:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Dalam fungsi langkah di atas, memanggil strategy.run dan strategy.reduce di step_fn dapat mendukung beberapa GPU per pekerja. Jika pekerja memiliki alokasi GPU, strategy.run akan mendistribusikan set data pada beberapa replika.

Mengirim langkah-langkah pelatihan ke pekerja jarak jauh

Setelah semua perhitungan ditentukan oleh ParameterServerStrategy , kita akan menggunakan kelas ClusterCoordinator untuk membuat sumber daya dan mendistribusikan langkah-langkah pelatihan ke pekerja jarak jauh.

Mari pertama-tama buat objek ClusterCoordinator dan teruskan objek strategi:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Kemudian kami membuat set data per pekerja dan iterator. Di per_worker_dataset_fn bawah ini, dataset_fn untuk dataset_fn ke dalam strategy.distribute_datasets_from_function untuk memungkinkan dataset_fn efisien ke GPU dengan lancar.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Langkah terakhir adalah mendistribusikan komputasi ke pekerja jarak jauh menggunakan schedule . Metode schedule mengantrekan fungsi tf.function dan mengembalikan RemoteValue seperti masa depan dengan segera. Fungsi antrian akan dikirim ke pekerja jarak jauh di thread latar belakang dan RemoteValue akan diisi secara asinkron. Metode join dapat digunakan untuk menunggu hingga semua fungsi yang dijadwalkan dieksekusi.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.743750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Berikut adalah cara mengambil hasil dari RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.005490

Atau, Anda dapat meluncurkan semua langkah dan melakukan sesuatu sambil menunggu penyelesaian:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Untuk pelatihan lengkap dan alur kerja penayangan untuk contoh khusus ini, lihat pengujian ini.

Lebih lanjut tentang pembuatan set data

Dataset pada kode di atas dibuat menggunakan API create_per_worker_dataset . Ini membuat satu set data per pekerja dan mengembalikan objek kontainer. Anda dapat memanggil metode iter untuk membuat iterator per pekerja. Iterator per pekerja berisi satu iterator per pekerja dan potongan pekerja yang sesuai akan diganti dalam argumen input dari fungsi yang diteruskan ke metode schedule sebelum fungsi dijalankan pada pekerja tertentu.

Saat ini, metode schedule mengasumsikan pekerja setara dan dengan demikian mengasumsikan kumpulan data pada pekerja yang berbeda adalah sama kecuali mereka dapat diacak secara berbeda jika mereka berisi operasi kumpulan data . acak . Karena itu, kami juga merekomendasikan kumpulan data untuk diulang tanpa batas waktu dan menjadwalkan sejumlah langkah terbatas daripada mengandalkan OutOfRangeError dari kumpulan data.

Catatan penting lainnya adalah bahwa tf.data data tf.data tidak mendukung serialisasi implisit dan deserialisasi melintasi batas tugas. Jadi, penting untuk membuat seluruh dataset di dalam fungsi yang diteruskan ke create_per_worker_dataset .

Evaluasi

Ada lebih dari satu cara untuk mendefinisikan dan menjalankan loop evaluasi dalam pelatihan terdistribusi. Masing-masing memiliki pro dan kontra seperti yang dijelaskan di bawah ini. Metode evaluasi sebaris disarankan jika Anda tidak memiliki preferensi.

Evaluasi inline

Dalam metode ini koordinator berganti-ganti antara pelatihan dan evaluasi dan karenanya kami menyebutnya evaluasi inline. Ada beberapa manfaat evaluasi inline. Misalnya, ini dapat mendukung model evaluasi besar dan set data evaluasi yang tidak dapat ditampung oleh satu tugas. Contoh lain, hasil evaluasi dapat digunakan untuk membuat keputusan untuk pelatihan epoch berikutnya.

Ada dua cara untuk menerapkan evaluasi inline:

  • Evaluasi langsung - Untuk model kecil dan dataset evaluasi, koordinator dapat menjalankan evaluasi langsung pada model terdistribusi dengan dataset evaluasi pada koordinator:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Evaluasi terdistribusi - Untuk model besar atau kumpulan data yang tidak dapat dijalankan langsung di koordinator, tugas koordinator dapat mendistribusikan tugas evaluasi kepada pekerja melalui metode schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Evaluasi mobil samping

Metode lain disebut evaluasi mobil samping yang membuat tugas evaluator khusus yang berulang kali membaca pos pemeriksaan dan menjalankan evaluasi pada pos pemeriksaan terbaru. Ini memungkinkan program pelatihan Anda selesai lebih awal jika Anda tidak perlu mengubah putaran pelatihan berdasarkan hasil evaluasi. Namun, ini membutuhkan tugas evaluator tambahan dan pemeriksaan berkala untuk memicu evaluasi. Berikut ini adalah kemungkinan putaran evaluasi mobil samping:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Cluster di Dunia Nyata

Dalam lingkungan produksi nyata, Anda akan menjalankan semua tugas dalam proses yang berbeda pada mesin yang berbeda. Cara termudah untuk mengkonfigurasi informasi cluster pada setiap tugas adalah dengan mengatur variabel lingkungan "TF_CONFIG" dan menggunakan tf.distribute.cluster_resolver.TFConfigClusterResolver untuk mengurai "TF_CONFIG". Untuk penjelasan umum tentang variabel lingkungan "TF_CONFIG", silakan lihat panduan pelatihan terdistribusi .

Jika Anda memulai tugas pelatihan menggunakan Kubernetes atau template konfigurasi lainnya, kemungkinan besar template ini telah menyetel "TF_CONFIG" untuk Anda.

Setel variabel lingkungan "TF_CONFIG"

Misalkan Anda memiliki 3 pekerja dan 2 server parameter, “TF_CONFIG” dari pekerja 1 dapat berupa:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

“TF_CONFIG” dari penilai dapat berupa:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Bagian "cluster" dalam string "TF_CONFIG" di atas untuk evaluator adalah opsional.

Jika Anda menggunakan biner yang sama untuk semua tugas

Jika Anda lebih suka menjalankan semua tugas ini menggunakan satu biner, Anda harus membiarkan program Anda bercabang ke dalam peran yang berbeda di awal:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

Kode berikut memulai server TensorFlow dan menunggu:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Menangani Kegagalan Tugas

Kegagalan pekerja

ClusterCoordinator atau Model.fit menyediakan toleransi kesalahan Model.fit untuk kegagalan pekerja. Setelah pemulihan pekerja, fungsi DatasetCreator data yang disediakan sebelumnya (baik untuk create_per_worker_dataset untuk CTL, atau DatasetCreator untuk Model.fit ) akan dipanggil pada pekerja untuk membuat ulang Model.fit data.

Server parameter atau koordinator gagal

Namun, ketika koordinator melihat kesalahan server parameter, ia akan segera memunculkan UnavailableError atau AbortedError . Anda dapat memulai kembali koordinator dalam kasus ini. Koordinator itu sendiri juga bisa menjadi tidak tersedia. Oleh karena itu, perkakas tertentu disarankan agar tidak kehilangan kemajuan pelatihan:

  • Untuk Model.fit , Anda harus menggunakan callback BackupAndRestore , yang menangani penyimpanan dan pemulihan progres secara otomatis. Lihat bagian Callback dan pelatihan di atas untuk mengetahui contoh.

  • Untuk CTL, Anda harus memeriksa variabel model secara berkala dan memuat variabel model dari checkpoint, jika ada, sebelum pelatihan dimulai. Kemajuan pelatihan dapat disimpulkan kira-kira dari optimizer.iterations jika pengoptimal diberi tanda centang:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Mengambil RemoteValue

Pengambilan RemoteValue dijamin berhasil jika fungsi berhasil dijalankan. Ini karena saat ini nilai yang dikembalikan segera disalin ke koordinator setelah fungsi dijalankan. Jika ada pekerja yang gagal selama penyalinan, fungsi tersebut akan dicoba ulang pada pekerja lain yang tersedia. Oleh karena itu, jika Anda ingin mengoptimalkan kinerja, Anda dapat menjadwalkan fungsi tanpa nilai kembalian.

Pelaporan Kesalahan

Setelah koordinator melihat kesalahan seperti UnavailableError dari server parameter atau kesalahan aplikasi lain seperti InvalidArgument dari tf.debugging.check_numerics , itu akan membatalkan semua fungsi yang tertunda dan antri sebelum meningkatkan kesalahan. Mengambil RemoteValue sesuai akan memunculkan CancelledError .

Setelah kesalahan muncul, koordinator tidak akan memunculkan kesalahan yang sama atau kesalahan apa pun dari fungsi yang dibatalkan.

Peningkatan performa

Ada beberapa kemungkinan alasan jika Anda melihat masalah performa saat berlatih dengan ParameterServerStrategy dan ClusterResolver .

Salah satu alasan umum adalah server parameter memiliki beban yang tidak seimbang dan beberapa server parameter yang terisi penuh telah mencapai kapasitasnya. Bisa juga ada beberapa akar penyebab. Beberapa metode sederhana untuk mengurangi masalah ini adalah dengan

  1. pisahkan variabel model besar Anda dengan menetapkan variable_partitioner saat membuat ParameterServerStrategy .
  2. hindari membuat variabel hotspot yang diperlukan oleh semua server parameter dalam satu langkah jika memungkinkan. Misalnya, gunakan kecepatan pembelajaran konstan atau subclass tf.keras.optimizers.schedules.LearningRateSchedule di pengoptimal karena perilaku defaultnya adalah kecepatan pembelajaran akan menjadi variabel yang ditempatkan di server parameter tertentu dan diminta oleh semua server parameter lainnya di setiap langkah .
  3. acak kosakata Anda yang banyak sebelum meneruskannya ke lapisan pra-pemrosesan Keras.

Alasan lain yang mungkin untuk masalah kinerja adalah koordinatornya. Implementasi pertama kami dari schedule / join adalah berbasis Python dan karenanya mungkin memiliki overhead threading. Juga latensi antara koordinator dan pekerja bisa jadi besar. Jika ini masalahnya,

  • Untuk Model.fit , Anda dapat menyetel argumen steps_per_execution disediakan di Model.compile ke nilai yang lebih besar dari 1.

  • Untuk CTL, Anda dapat mengemas beberapa langkah menjadi satu tf.function .:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Saat kami terus mengoptimalkan pustaka, kami berharap sebagian besar pengguna tidak perlu mengemas langkah-langkah secara manual di masa mendatang.

Selain itu, trik kecil untuk peningkatan kinerja adalah menjadwalkan fungsi tanpa nilai pengembalian seperti yang dijelaskan di bagian kegagalan tugas penanganan di atas.

Batasan yang Diketahui

Sebagian besar batasan yang diketahui tercakup dalam bagian di atas. Bagian ini memberikan ringkasan.

ParameterServerStrategy umum

  • os.environment["grpc_fail_fast"]="use_caller" dibutuhkan pada setiap tugas, termasuk koordinator, untuk membuat toleransi kesalahan berfungsi dengan baik.
  • Pelatihan server parameter sinkron tidak didukung.
  • Biasanya perlu untuk mengemas beberapa langkah ke dalam satu fungsi untuk mencapai kinerja yang optimal.
  • Tidak didukung untuk memuat stored_model melalui tf.saved_model.load berisi variabel yang tf.saved_model.load . Catatan memuat save_model seperti itu menggunakan TensorFlow Serving diharapkan bisa bekerja.
  • Tidak didukung untuk memuat checkpoint yang berisi variabel slot pengoptimal sharded ke dalam jumlah shard yang berbeda.
  • Tidak didukung untuk memulihkan dari kegagalan server parameter tanpa memulai ulang tugas koordinator.
  • Penggunaan tf.lookup.StaticHashTable (yang umumnya digunakan oleh beberapa lapisan tf.keras.layers.experimental.preprocessing , seperti IntegerLookup , StringLookup , dan TextVectorization ) menghasilkan sumber daya yang ditempatkan pada koordinator saat ini dengan pelatihan PS. Ini memiliki implikasi kinerja pencarian RPC dari pekerja ke koordinator. Ini adalah prioritas tinggi saat ini untuk ditangani.

Model.fit spesifik

  • Argumen steps_per_epoch diperlukan di Model.fit . Anda dapat memilih nilai yang memberikan interval yang sesuai dalam satu zaman.
  • ParameterServerStrategy tidak memiliki dukungan untuk callback kustom yang memiliki panggilan tingkat batch karena alasan performa. Anda harus mengonversi panggilan tersebut menjadi panggilan tingkat zaman dengan langkah yang dipilih sesuai steps_per_epoch , sehingga panggilan tersebut dipanggil setiap langkah steps_per_epoch . Callback bawaan tidak terpengaruh: panggilan tingkat batch-nya telah dimodifikasi agar berkinerja baik. Mendukung panggilan tingkat batch untuk ParameterServerStrategy sedang direncanakan.
  • Untuk alasan yang sama, tidak seperti strategi lain, bilah kemajuan dan metrik dicatat hanya pada batas waktu.
  • Input untuk Model.fit hanya menggunakan tipe DatasetCreator .
  • run_eagerly tidak didukung.
  • Evaluasi di Model.fit belum didukung. Ini salah satu prioritasnya.
  • Model.evaluate dan Model.predict belum didukung.

Spesifikasi Loop Pelatihan Kustom