Keras ile çok çalışanlı eğitim

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Bu öğretici, tf.distribute.Strategy API'sini (özellikle tf.distribute.MultiWorkerMirroredStrategy sınıfını) kullanarak bir Keras modeli ve Model.fit API'si ile çok çalışanlı dağıtılmış eğitimin nasıl gerçekleştirileceğini gösterir. Bu stratejinin yardımıyla, tek bir çalışan üzerinde çalışacak şekilde tasarlanmış bir Keras modeli, minimum kod değişikliği ile birden çok çalışan üzerinde sorunsuz bir şekilde çalışabilir.

tf.distribute.Strategy API'lerini daha derinlemesine anlamak isteyenler için, TensorFlow'un desteklediği dağıtım stratejilerine genel bir bakış için TensorFlow kılavuzundaki Dağıtılmış eğitim mevcuttur.

MultiWorkerMirroredStrategy ile MultiWorkerMirroredStrategy ve özel eğitim döngüsünün nasıl kullanılacağını öğrenmek için Keras ve MultiWorkerMirrorredStrategy ile Özel eğitim döngüsüne bakın.

Bu öğreticinin amacının, iki işçi ile minimal çok işçili bir örnek göstermek olduğunu unutmayın.

Kurmak

Bazı gerekli ithalatlarla başlayın:

import json
import os
import sys

TensorFlow'u içe aktarmadan önce ortamda birkaç değişiklik yapın:

  1. Tüm GPU'ları devre dışı bırakın. Bu, aynı GPU'yu kullanmaya çalışan çalışanların neden olduğu hataları önler. Gerçek dünyadaki bir uygulamada, her işçi farklı bir makinede olacaktır.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. TF_CONFIG ortam değişkenini sıfırlayın (bunun hakkında daha fazla bilgi edineceksiniz):
os.environ.pop('TF_CONFIG', None)
  1. Geçerli dizinin Python'un yolunda olduğundan emin olun; bu, not defterinin daha sonra %%writefile tarafından yazılan dosyaları içe aktarmasına olanak tanır:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Şimdi TensorFlow'u içe aktarın:

import tensorflow as tf

Veri kümesi ve model tanımı

Ardından, basit bir model ve veri kümesi kurulumuyla bir mnist_setup.py dosyası oluşturun. Bu Python dosyası, bu öğreticideki çalışan işlemler tarafından kullanılacaktır:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
tutucu6 l10n-yer
Writing mnist_setup.py

Tek bir işçi üzerinde model eğitimi

Modeli az sayıda dönem için eğitmeyi deneyin ve her şeyin doğru çalıştığından emin olmak için tek bir çalışanın sonuçlarını gözlemleyin. Eğitim ilerledikçe, kayıp düşmeli ve doğruluk artmalıdır.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
tutucu8 l10n-yer
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Çok çalışanlı yapılandırma

Şimdi çok işçili eğitim dünyasına girelim.

İşler ve görevler içeren bir küme

TensorFlow'da dağıtılmış eğitim şunları içerir: birkaç işi olan bir 'cluster' ve işlerin her birinin bir veya daha fazla 'task' olabilir.

Her biri muhtemelen farklı bir role sahip olan birden çok makinede eğitim için TF_CONFIG yapılandırma ortamı değişkenine ihtiyacınız olacak. TF_CONFIG , kümenin parçası olan her çalışan için küme yapılandırmasını belirtmek için kullanılan bir JSON dizesidir.

Bir TF_CONFIG değişkeninin iki bileşeni vardır: 'cluster' ve 'task' .

  • Bir 'cluster' tüm işçiler için aynıdır ve 'worker' veya 'chief' gibi farklı iş türlerinden oluşan bir dikte olan eğitim kümesi hakkında bilgi sağlar.

    • tf.distribute.MultiWorkerMirroredStrategy ile çok çalışanlı eğitimde, normal bir 'worker' işçi'nin yaptıklarına ek olarak, bir kontrol noktası kaydetme ve TensorBoard için bir özet dosyası yazma gibi sorumlulukları üstlenen genellikle bir 'worker' vardır. Bu tür 'worker' , baş işçi olarak adlandırılır ( 'chief' bir iş adıyla).
    • 'chief' için 'index' 0 atanması alışılmış bir durumdur (aslında, tf.distribute.Strategy bu şekilde uygulanır).
  • Bir 'task' mevcut görev hakkında bilgi sağlar ve her çalışan için farklıdır. Bu çalışanın 'type' ve 'index' belirtir.

Aşağıda örnek bir yapılandırma verilmiştir:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

İşte bir JSON dizesi olarak serileştirilmiş aynı TF_CONFIG :

json.dumps(tf_config)
tutucu11 l10n-yer
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

tf_config yalnızca yerel bir değişken olduğunu unutmayın. Bir eğitim yapılandırması için kullanabilmek için, bu dict'in bir JSON olarak serileştirilmesi ve bir TF_CONFIG ortam değişkenine yerleştirilmesi gerekir.

Yukarıdaki örnek yapılandırmada, 'type' görevini 'worker' olarak ve 'index' görevini 0 olarak ayarladınız. Bu nedenle, bu makine ilk işçidir. 'chief' işçi olarak atanacak ve diğerlerinden daha fazla iş yapacak.

Örnekleme amacıyla, bu öğretici, bir localhost üzerinde iki çalışanla bir TF_CONFIG değişkenini nasıl kurabileceğinizi gösterir.

Pratikte, harici IP adreslerinde/portlarında birden çok işçi oluşturacak ve buna göre her bir çalışan için bir TF_CONFIG değişkeni ayarlayacaksınız.

Bu öğreticide iki işçi kullanacaksınız:

  • İlk ( 'chief' ) işçinin TF_CONFIG yukarıda gösterilmiştir.
  • İkinci çalışan için tf_config['task']['index']=1 değerini ayarlayacaksınız

Not defterlerinde ortam değişkenleri ve alt süreçler

Alt işlemler, ortam değişkenlerini üstlerinden devralır.

Örneğin, bu Jupyter Notebook işleminde bir ortam değişkenini aşağıdaki gibi ayarlayabilirsiniz:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Ardından, ortam değişkenine bir alt işlemden erişebilirsiniz:

echo ${GREETINGS}
tutucu14 l10n-yer
Hello TensorFlow!

Sonraki bölümde, TF_CONFIG çalışan alt işlemlere geçirmek için benzer bir yöntem kullanacaksınız. Gerçek dünya senaryosunda, işlerinizi bu şekilde başlatmazsınız, ancak bu örnekte bu yeterlidir.

Doğru stratejiyi seçin

TensorFlow'da dağıtılmış eğitimin iki ana biçimi vardır:

  • Eğitim adımlarının çalışanlar ve kopyalar arasında eşitlendiği eşzamanlı eğitim ve
  • Eğitim adımlarının kesin olarak eşitlenmediği zaman uyumsuz eğitim (örneğin, parametre sunucusu eğitimi ).

Bu öğretici, tf.distribute.MultiWorkerMirroredStrategy örneğini kullanarak eşzamanlı çok çalışan eğitiminin nasıl gerçekleştirileceğini gösterir.

MultiWorkerMirroredStrategy , tüm çalışanlarda her cihazda modelin katmanlarındaki tüm değişkenlerin kopyalarını oluşturur. Degradeleri toplamak ve değişkenleri senkronize tutmak için toplu iletişim için bir TensorFlow op olan CollectiveOps kullanır. tf.distribute.Strategy kılavuzunda bu strateji hakkında daha fazla ayrıntı bulunmaktadır.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
tutucu16 l10n-yer
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy , tf.distribute.experimental.CommunicationOptions parametresi aracılığıyla birden çok uygulama sağlar: 1) RING , ana bilgisayarlar arası iletişim katmanı olarak gRPC kullanarak halka tabanlı kolektifleri uygular; 2) NCCL , kolektifleri uygulamak için NVIDIA Toplu İletişim Kitaplığını kullanır; ve 3) AUTO , seçimi çalışma zamanına erteler. Toplu uygulamanın en iyi seçimi, GPU'ların sayısına ve türüne ve kümedeki ağ ara bağlantısına bağlıdır.

Modeli eğit

tf.distribute.Strategy API'sinin tf.keras entegrasyonuyla, eğitimi birden çok çalışana dağıtmak için yapacağınız tek değişiklik, model oluşturma ve model.compile() çağrısını strategy.scope() içine dahil etmektir. Dağıtım stratejisinin kapsamı, değişkenlerin nasıl ve nerede oluşturulacağını belirler ve MultiWorkerMirroredStrategy durumunda, oluşturulan değişkenler MirroredVariable 'lardır ve her bir çalışan üzerinde çoğaltılırlar.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Gerçekten MultiWorkerMirroredStrategy ile çalışmak için çalışan süreçleri çalıştırmanız ve onlara bir TF_CONFIG gerekir.

Daha önce yazılan mnist_setup.py dosyası gibi, her bir çalışanın çalıştıracağı main.py dosyası şu şekildedir:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
tutucu19 l10n-yer
Writing main.py

Yukarıdaki kod parçacığında, global_batch_size geçirilen Dataset.batch per_worker_batch_size * num_workers olarak ayarlandığını unutmayın. Bu, çalışan sayısından bağımsız olarak her çalışanın per_worker_batch_size örneklerini toplu olarak işlemesini sağlar.

Geçerli dizin artık her iki Python dosyasını da içeriyor:

ls *.py
tutucu21 l10n-yer
main.py
mnist_setup.py

Böylece, TF_CONFIG json-seri hale getirin ve onu ortam değişkenlerine ekleyin:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Şimdi, TF_CONFIG main.py bir çalışan işlemi başlatabilirsiniz:

# first kill any previous runs
%killbgscripts
All background processes were killed.
-yer tutucu25 l10n-yer
python main.py &> job_0.log

Yukarıdaki komutla ilgili dikkat edilmesi gereken birkaç nokta vardır:

  1. Bazı bash komutlarını çalıştırmak için bir dizüstü bilgisayar "sihri" olan %%bash kullanır.
  2. Bu çalışan sonlandırılmayacağından, bash işlemini arka planda çalıştırmak için --bg bayrağını kullanır. Başlamadan önce tüm çalışanları bekler.

Arka planda çalışan işlem, çıktıyı bu not defterine yazdırmaz, bu nedenle &> çıktısını bir dosyaya yönlendirir, böylece daha sonra bir günlük dosyasında ne olduğunu inceleyebilirsiniz.

Bu nedenle, işlemin başlaması için birkaç saniye bekleyin:

import time
time.sleep(10)

Şimdi, işçinin günlük dosyasına şu ana kadar ne çıktı olduğunu inceleyin:

cat job_0.log
tutucu28 l10n-yer
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Günlük dosyasının son satırı şunları söylemelidir Started server with target: grpc://localhost:12345 . İlk işçi şimdi hazırdır ve diğer tüm işçi(ler)in ilerlemeye hazır olmasını beklemektedir.

Bu nedenle, ikinci çalışanın alması için tf_config güncelleyin:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

İkinci işçiyi çalıştırın. Bu, tüm çalışanlar aktif olduğu için eğitimi başlatacaktır (bu nedenle bu süreci arka plana atmaya gerek yoktur):

python main.py
tutucu31 l10n-yer
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

İlk çalışan tarafından yazılan günlükleri yeniden kontrol ederseniz, o modelin eğitimine katıldığını öğreneceksiniz:

cat job_0.log
tutucu33 l10n-yer
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Şaşırtıcı olmayan bir şekilde, bu, bu öğreticinin başındaki test çalışmasından daha yavaş çalıştı.

Birden fazla işçiyi tek bir makinede çalıştırmak yalnızca ek yük ekler.

Buradaki amaç, eğitim süresini iyileştirmek değil, sadece çok işçili eğitime bir örnek vermekti.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
tutucu35 l10n-yer
All background processes were killed.

Çok çalışanlı derinlemesine eğitim

Şimdiye kadar, temel bir çok çalışanlı kurulumun nasıl gerçekleştirileceğini öğrendiniz.

Eğitimin geri kalanında, gerçek kullanım durumları için yararlı veya önemli olabilecek diğer faktörleri ayrıntılı olarak öğreneceksiniz.

Veri kümesi parçalama

Çok çalışanlı eğitimde, yakınsama ve performansı sağlamak için veri kümesi parçalamaya ihtiyaç vardır.

Önceki bölümdeki örnek, tf.distribute.Strategy API tarafından sağlanan varsayılan otomatik parçalamaya dayanmaktadır. tf.data.experimental.DistributeOptions öğesinin tf.data.experimental.AutoShardPolicy öğesini ayarlayarak parçalamayı kontrol edebilirsiniz.

Otomatik parçalama hakkında daha fazla bilgi edinmek için Dağıtılmış giriş kılavuzuna bakın.

Her bir kopyanın her örneği işlemesi için otomatik parçalamanın nasıl kapatılacağına dair hızlı bir örnek ( önerilmez ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Değerlendirme

validation_data Model.fit , her dönem için eğitim ve değerlendirme arasında geçiş yapar. validation_data alan değerlendirme, aynı çalışan grubuna dağıtılır ve değerlendirme sonuçları toplanır ve tüm çalışanlar için kullanılabilir.

Eğitime benzer şekilde, doğrulama veri kümesi dosya düzeyinde otomatik olarak paylaşılır. Doğrulama veri kümesinde genel bir toplu iş boyutu ayarlamanız ve validation_steps değerini ayarlamanız gerekir.

Değerlendirme için tekrarlanan bir veri kümesi de önerilir.

Alternatif olarak, düzenli aralıklarla kontrol noktalarını okuyan ve değerlendirmeyi çalıştıran başka bir görev de oluşturabilirsiniz. Tahmincinin yaptığı budur. Ancak bu, değerlendirme yapmak için önerilen bir yol değildir ve bu nedenle ayrıntıları atlanmıştır.

Verim

Artık MultiWorkerMirroredStrategy ile birden çok çalışanda çalışacak şekilde ayarlanmış bir MultiWorkerMirroredStrategy var.

Çok işçili eğitimin performansını ayarlamak için aşağıdakileri deneyebilirsiniz:

  • tf.distribute.MultiWorkerMirroredStrategy birden çok toplu iletişim uygulaması sağlar:

    • RING , ana bilgisayarlar arası iletişim katmanı olarak gRPC kullanan halka tabanlı kolektifleri uygular.
    • NCCL , kolektifleri uygulamak için NVIDIA Toplu İletişim Kitaplığını kullanır.
    • AUTO , seçimi çalışma zamanına erteler.

    Toplu uygulamanın en iyi seçimi, GPU'ların sayısına, GPU'ların türüne ve kümedeki ağ ara bağlantısına bağlıdır. Otomatik seçimi geçersiz kılmak için MultiWorkerMirroredStrategy communication_options parametresini belirtin. Örneğin:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Mümkünse değişkenleri tf.float :

    • Resmi ResNet modeli, bunun nasıl yapılabileceğinin bir örneğini içerir.

Hata toleransı

Eşzamanlı eğitimde, çalışanlardan biri başarısız olursa küme başarısız olur ve herhangi bir arıza giderme mekanizması mevcut değildir.

tf.distribute.Strategy ile kullanmak, çalışanların öldüğü veya başka bir şekilde istikrarsız olduğu durumlarda hata toleransı avantajıyla birlikte gelir. Bunu, seçtiğiniz dağıtılmış dosya sisteminde eğitim durumunu koruyarak yapabilirsiniz, böylece daha önce başarısız olan veya önceden alınan örneğin yeniden başlatılmasıyla eğitim durumu kurtarılır.

Bir işçi müsait olmadığında, diğer işçiler başarısız olur (muhtemelen bir zaman aşımından sonra). Bu gibi durumlarda, uygun olmayan çalışanın ve başarısız olan diğer çalışanların yeniden başlatılması gerekir.

ModelKontrol noktası geri arama

ModelCheckpoint geri araması artık hata toleransı işlevi sağlamamaktadır, lütfen bunun yerine BackupAndRestore geri aramasını kullanın.

ModelCheckpoint geri araması, kontrol noktalarını kaydetmek için hala kullanılabilir. Ancak bununla eğitim yarıda kaldıysa veya başarıyla tamamlandıysa, eğitime kontrol noktasından devam etmek için modeli manuel olarak yüklemek kullanıcıya aittir.

İsteğe bağlı olarak kullanıcı, ModelCheckpoint geri çağırma dışında modeli/ağırlıkları kaydetmeyi ve geri yüklemeyi seçebilir.

Model kaydetme ve yükleme

model.save veya tf.saved_model.save kullanarak modelinizi kaydetmek için, kaydetme hedefinin her çalışan için farklı olması gerekir.

  • Şef olmayan çalışanlar için modeli geçici bir dizine kaydetmeniz gerekecektir.
  • Şef için sağlanan model dizinine kaydetmeniz gerekecek.

Birden çok çalışanın aynı konuma yazmaya çalışmasından kaynaklanan hataları önlemek için, çalışan üzerindeki geçici dizinlerin benzersiz olması gerekir.

Tüm dizinlerde kaydedilen model aynıdır ve tipik olarak, geri yükleme veya hizmet için yalnızca şef tarafından kaydedilen modele başvurulmalıdır.

Eğitiminiz tamamlandıktan sonra işçiler tarafından oluşturulan geçici dizinleri silen bir temizleme mantığına sahip olmalısınız.

Şef ve işçilerden aynı anda tasarruf etmenin nedeni, hem şefin hem de çalışanların allreduce iletişim protokolüne katılmasını gerektiren kontrol noktası sırasında değişkenleri topluyor olmanızdır. Öte yandan, şef ve işçilerin aynı model dizinine kaydetmesine izin vermek, çekişme nedeniyle hatalara neden olacaktır.

MultiWorkerMirroredStrategy kullanarak, program her çalışan üzerinde çalıştırılır ve mevcut çalışanın şef olup olmadığını bilmek için, task_type ve task_id niteliklerine sahip küme çözümleyici nesnesinden yararlanır:

  • task_type size mevcut işin ne olduğunu söyler (örneğin 'worker' ).
  • task_id size çalışanın tanımlayıcısını söyler.
  • task_id == 0 olan çalışan, baş çalışan olarak atanır.

Aşağıdaki kod parçacığında, write_filepath işlevi, çalışanın task_id bağlı olarak yazılacak dosya yolunu sağlar:

  • Baş çalışan için ( task_id == 0 ile), orijinal dosya yoluna yazar.
  • Diğer çalışanlar için, dizin yolundaki task_id ile yazılacak geçici bir dizin ( temp_dir ) oluşturur:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Bununla, artık kaydetmeye hazırsınız:

multi_worker_model.save(write_model_path)
tutucu40 l10n-yer
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Yukarıda açıklandığı gibi, daha sonra model sadece şefin kaydedildiği yoldan yüklenmelidir, bu yüzden şef olmayan işçilerin kaydettiği geçici olanları kaldıralım:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Şimdi yükleme zamanı geldiğinde, uygun tf.keras.models.load_model API'sini kullanalım ve daha fazla çalışmaya devam edelim.

Burada, eğitimi yüklemek ve devam ettirmek için yalnızca tek bir çalışanın kullanıldığını varsayın; bu durumda başka bir strategy.scope() içinde tf.keras.models.load_model öğesini çağırmazsınız (daha önce tanımlandığı gibi strategy = tf.distribute.MultiWorkerMirroredStrategy() olduğunu unutmayın) ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
tutucu43 l10n-yer
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Kontrol noktası kaydetme ve geri yükleme

Öte yandan, kontrol noktası, modelinizin ağırlıklarını kaydetmenize ve tüm modeli kaydetmenize gerek kalmadan onları geri yüklemenize olanak tanır.

Burada, modeli izleyen ve tf.train.CheckpointManager tf.train.Checkpoint böylece yalnızca en son kontrol noktası korunur:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

CheckpointManager kurulduktan sonra, şef olmayan çalışanların kaydettiği kontrol noktalarını kaydetmeye ve kaldırmaya hazırsınız:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Şimdi, modeli geri yüklemeniz gerektiğinde, kullanışlı tf.train.latest_checkpoint işlevini kullanarak kaydedilen en son kontrol noktasını bulabilirsiniz. Kontrol noktasını geri yükledikten sonra eğitime devam edebilirsiniz.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
tutucu47 l10n-yer
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

BackupAndRestore geri arama

tf.keras.callbacks.BackupAndRestore geri araması, modeli ve geçerli dönem numarasını backup_dir bağımsız değişkeni altında backup_dir geçici bir kontrol noktası dosyasına yedekleyerek hataya dayanıklılık işlevselliği BackupAndRestore . Bu, her dönemin sonunda yapılır.

İşler kesintiye uğrayıp yeniden başladığında, geri arama son kontrol noktasını geri yükler ve eğitim kesintiye uğrayan dönemin başından itibaren devam eder. Kesintiden önce bitmemiş çağda halihazırda yapılmış olan herhangi bir kısmi eğitim, nihai model durumunu etkilememesi için atılacaktır.

Kullanmak için Model.fit çağrısında bir tf.keras.callbacks.BackupAndRestore örneği sağlayın.

MultiWorkerMirroredStrategy ile, bir çalışan kesintiye uğrarsa, kesintiye uğrayan çalışan yeniden başlatılana kadar tüm küme duraklar. Diğer çalışanlar da yeniden başlatılır ve kesintiye uğrayan çalışan kümeye yeniden katılır. Ardından, her çalışan önceden kaydedilmiş olan kontrol noktası dosyasını okur ve önceki durumunu alır, böylece kümenin tekrar eşitlenmesine izin verir. Ardından eğitim devam ediyor.

BackupAndRestore geri çağrısı, eğitim durumunu kaydetmek ve geri yüklemek için CheckpointManager kullanır; bu, mevcut kontrol noktalarını en sonuncusuyla birlikte izleyen kontrol noktası adlı bir dosya oluşturur. Bu nedenle, ad çakışmasını önlemek için backup_dir , diğer kontrol noktalarını depolamak için yeniden kullanılmamalıdır.

Şu anda BackupAndRestore geri çağrısı, stratejisi olmayan tek çalışanlı eğitimi— MirroredStrategy — ve MultiWorkerMirroredStrategy ile çok çalışanlı eğitimi destekler.

Aşağıda hem çok işçili eğitim hem de tek işçili eğitim için iki örnek verilmiştir:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
tutucu49 l10n-yer
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

BackupAndRestore içinde belirttiğiniz backup_dir dizinini BackupAndRestore , geçici olarak oluşturulmuş bazı kontrol noktası dosyaları fark edebilirsiniz. Bu dosyalar daha önce kaybolan örnekleri kurtarmak için gereklidir ve eğitiminizden başarılı bir şekilde çıkıldığında Model.fit sonunda kitaplık tarafından kaldırılacaktır.

Ek kaynaklar

  1. TensorFlow kılavuzundaki Dağıtılmış eğitim , mevcut dağıtım stratejilerine genel bir bakış sağlar.
  2. Keras ve MultiWorkerMirrorredStrategy ile Özel eğitim döngüsü , MultiWorkerMirroredStrategy ile nasıl kullanılacağını ve özel bir eğitim döngüsünü gösterir.
  3. Birçoğu birden fazla dağıtım stratejisi yürütmek üzere yapılandırılabilen resmi modellere göz atın.
  4. tf.fonksiyon kılavuzu ile daha iyi performans, TensorFlow modellerinizin performansını optimize etmek için kullanabileceğiniz TensorFlow Profiler gibi diğer stratejiler ve araçlar hakkında bilgi sağlar.