ParameterServerStrategy ile parametre sunucusu eğitimi

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Parametre sunucusu eğitimi , birden çok makinede model eğitimini büyütmek için yaygın bir veri paralel yöntemidir.

Bir parametre sunucusu eğitim kümesi, çalışanlardan ve parametre sunucularından oluşur. Değişkenler parametre sunucularında oluşturulur ve her adımda çalışanlar tarafından okunur ve güncellenir. Varsayılan olarak, çalışanlar bu değişkenleri birbirleriyle senkronize etmeden bağımsız olarak okur ve günceller. Bu nedenle bazen parametre sunucusu tarzı eğitime eşzamansız eğitim adı verilir.

TensorFlow 2'de parametre sunucusu eğitimi, eğitim adımlarını (parametre sunucuları eşliğinde) binlerce çalışana kadar ölçeklenen bir kümeye dağıtan tf.distribute.experimental.ParameterServerStrategy sınıfı tarafından desteklenir.

Desteklenen eğitim yöntemleri

Desteklenen iki ana eğitim yöntemi vardır:

İşler ve görevler içeren bir küme

Tercih edilen API'den ( Model.fit veya özel eğitim döngüsü) bağımsız olarak, TensorFlow 2'deki dağıtılmış eğitim şunları içerir: birkaç 'jobs' içeren bir 'cluster' ve işlerin her birinin bir veya daha fazla 'tasks' olabilir.

Parametre sunucusu eğitimini kullanırken şunlara sahip olunması önerilir:

  • Bir koordinatör işi (iş adı chief olan)
  • Birden çok işçi işi (iş adı worker ); ve
  • Birden çok parametre sunucusu işi (iş adı ps )

Koordinatör kaynakları oluştururken, eğitim görevlerini gönderirken, kontrol noktaları yazarken ve görev başarısızlıklarıyla ilgilenirken, çalışanlar ve parametre sunucuları koordinatörden gelen istekleri dinleyen tf.distribute.Server çalıştırır.

Model.fit API ile parametre sunucusu eğitimi

Model.fit API ile parametre sunucusu eğitimi, koordinatörün girdi olarak bir tf.distribute.experimental.ParameterServerStrategy nesnesi ve bir tf.keras.utils.experimental.DatasetCreator kullanmasını gerektirir. Stratejisiz veya diğer stratejilerle Model.fit kullanımına benzer şekilde, iş akışı modelin oluşturulmasını ve derlenmesini, geri aramaların hazırlanmasını ve ardından bir Model.fit aramasını içerir.

Özel bir eğitim döngüsü ile parametre sunucusu eğitimi

Özel eğitim döngüleriyle, tf.distribute.experimental.coordinator.ClusterCoordinator sınıfı, koordinatör için kullanılan temel bileşendir.

ClusterCoordinator nesnesi tarafından sağlanan en önemli API, schedule :

  • schedule API'si bir tf.function ve hemen geleceğe benzer bir RemoteValue döndürür.
  • Kuyruğa alınan işlevler, arka plan iş parçacıklarında uzaktaki çalışanlara gönderilecek ve RemoteValue eşzamansız olarak doldurulacak.
  • schedule , çalışan ataması gerektirmediğinden, aktarılan tf.function . işlevi, mevcut herhangi bir çalışan üzerinde yürütülebilir.
  • Üzerinde yürütüldüğü çalışan tamamlanmadan önce kullanılamaz hale gelirse, işlev uygun başka bir çalışan üzerinde yeniden denenir.
  • Bu gerçek ve işlevin yürütülmesinin atomik olmadığı gerçeği nedeniyle, bir işlev birden fazla kez yürütülebilir.

ClusterCoordinator , uzak işlevleri göndermeye ek olarak, tüm çalışanlar üzerinde veri kümeleri oluşturmaya ve bir çalışan hatadan kurtulduğunda bu veri kümelerini yeniden oluşturmaya da yardımcı olur.

Eğitim kurulumu

Eğitim, Model.fit ve özel eğitim döngüsü yollarına ayrılacak ve ihtiyaçlarınıza uygun olanı seçebilirsiniz. "X ile Eğitim" dışındaki bölümler her iki yol için de geçerlidir.

pip install portpicker

küme kurulumu

Yukarıda bahsedildiği gibi, bir parametre sunucusu eğitim kümesi, eğitim programınızı çalıştıran bir koordinatör görevi, TensorFlow sunucularını çalıştıran bir veya birkaç işçi ve parametre sunucusu görevi ( tf.distribute.Server ve muhtemelen sepet değerlendirmesini çalıştıran ek bir değerlendirme görevi gerektirir. (aşağıdaki sepet değerlendirme bölümüne bakın). Bunları ayarlamak için gereksinimler şunlardır:

  • Koordinatör görevinin, değerlendirici dışındaki tüm diğer TensorFlow sunucularının adreslerini ve bağlantı noktalarını bilmesi gerekir.
  • Çalışanların ve parametre sunucularının hangi bağlantı noktasını dinlemeleri gerektiğini bilmeleri gerekir. Basitlik adına, bu görevlerde TensorFlow sunucuları oluştururken genellikle tam küme bilgilerini iletebilirsiniz.
  • Değerlendirici görevinin eğitim kümesinin kurulumunu bilmesi gerekmez. Varsa, eğitim kümesine bağlanmaya çalışmamalıdır.
  • İşçiler ve parametre sunucuları sırasıyla "worker" ve "ps" görev türlerine sahip olmalıdır. Koordinatör, eski nedenlerle görev türü olarak "chief" kullanmalıdır.

Bu öğreticide, tüm parametre sunucusu eğitiminin Colab'da çalıştırılabilmesi için bir süreç içi küme oluşturacaksınız. Daha sonraki bir bölümde gerçek kümeleri nasıl kuracağınızı öğreneceksiniz.

İşlem içi küme

Önceden birkaç TensorFlow sunucusu oluşturarak başlayacak ve bunlara daha sonra bağlanacaksınız. Bunun yalnızca bu öğreticinin gösterimi amacıyla yapıldığını ve gerçek eğitimde sunucuların "worker" ve "ps" makinelerinde başlatılacağını unutmayın.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

İşlem içi küme kurulumu, burada olduğu gibi birim testlerinde sıklıkla kullanılır.

Yerel test için başka bir seçenek de süreçleri yerel makinede başlatmaktır; bu yaklaşımın bir örneği için Keras ile çok çalışanlı eğitime bakın.

Bir ParameterServerStrategy örneğini oluşturun

Eğitim koduna dalmadan önce, bir ParameterServerStrategy nesnesinin örneğini oluşturalım. Model.fit ile mi yoksa özel bir eğitim döngüsüyle mi ilerlediğinize bakılmaksızın bunun gerekli olduğunu unutmayın. variable_partitioner bağımsız değişkeni, Değişken parçalama bölümünde açıklanacaktır.

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
tutucu4 l10n-yer
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

GPU'ları eğitim için kullanmak için, her çalışanın görebileceği GPU'ları tahsis edin. ParameterServerStrategy , tüm çalışanların aynı sayıda GPU'ya sahip olması gerektiği kısıtlamasıyla, her çalışanda mevcut tüm GPU'ları kullanır.

Değişken parçalama

Değişken parçalama, bir değişkeni parça adı verilen daha küçük birden çok değişkene bölmeyi ifade eder. Değişken parçalama, bu parçalara erişirken ağ yükünü dağıtmak için faydalı olabilir. Normal bir değişkenin hesaplanmasını ve depolanmasını birden çok parametre sunucusu arasında dağıtmak da yararlıdır.

Değişken parçalamayı etkinleştirmek için, bir ParameterServerStrategy nesnesi oluştururken bir variable_partitioner iletebilirsiniz. Değişken_bölümü, bir variable_partitioner her oluşturulduğunda çağrılır ve değişkenin her boyutu boyunca parça sayısını döndürmesi beklenir. tf.distribute.experimental.partitioners.MinSizePartitioner gibi bazı kullanıma hazır variable_partitioner sağlanır. Model eğitim hızı üzerinde olumsuz etkisi olabilecek küçük değişkenleri bölümlemekten kaçınmak için tf.distribute.experimental.partitioners.MinSizePartitioner gibi boyut tabanlı bölümleyicilerin kullanılması önerilir.

Bir variable_partitioner geçirildiğinde ve doğrudan strategy.scope() altında bir değişken oluşturursanız, bu, parça listesine erişim sağlayan variables özelliğine sahip bir kapsayıcı türü haline gelecektir. Çoğu durumda, bu kapsayıcı, tüm parçaları birleştirerek otomatik olarak bir Tensöre dönüştürülür. Sonuç olarak, normal bir değişken olarak kullanılabilir. Öte yandan, tf.nn.embedding_lookup gibi bazı tf.nn.embedding_lookup yöntemleri bu kapsayıcı türü için verimli uygulama sağlar ve bu yöntemlerde otomatik birleştirme önlenir.

Daha fazla ayrıntı için lütfen tf.distribute.experimental.ParameterServerStrategy API belgelerine bakın.

Model.fit ile eğitim

Keras, kaputun altındaki eğitim döngüsünü, geçersiz train_step esnekliği ve TensorBoard için kontrol noktası kaydetme veya özet kaydetme gibi işlevler sağlayan geri aramalarla yöneten Model.fit aracılığıyla kullanımı kolay bir eğitim API'si sağlar. Model.fit ile aynı eğitim kodu, strateji nesnesinin basit bir takasıyla diğer stratejiler için kullanılabilir.

Giriş verileri

Parametre sunucusu eğitimi ile Model.fit , giriş verilerinin tf.distribute.InputContext türünde tek bir argüman alan ve bir tf.data.Dataset döndüren bir çağrılabilir içinde sağlanmasını gerektirir. Ardından, bu tür callable alan bir tf.keras.utils.experimental.DatasetCreator nesnesi ve input_options bağımsız değişkeni aracılığıyla isteğe bağlı bir tf.distribute.InputOptions nesnesi oluşturun.

Verileri parametre sunucusu eğitimi ile karıştırmanın ve tekrarlamanın ve kitaplığın çağ sınırlarını bilmesi için fit steps_per_epoch belirtmenin tavsiye edildiğini unutmayın.

InputContext bağımsız değişkeni hakkında daha fazla bilgi için lütfen Dağıtılmış girdi öğreticisine bakın.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn içindeki kod, çalışan makinelerin her birinde genellikle CPU olan giriş aygıtında çağrılır.

Model oluşturma ve derleme

Şimdi, bir tf.keras.Model bir tf.keras.models.Gösterim amaçlı tf.keras.models.Sequential model— ve ardından, optimize edici, ölçümler veya steps_per_execution gibi parametreleri dahil etmek için bir Model.compile çağrısı oluşturacaksınız:

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Geri aramalar ve eğitim

model.fit asıl eğitim için çağırmadan önce, aşağıdakiler gibi genel görevler için gerekli geri aramaları hazırlayalım:

  • ModelCheckpoint : model ağırlıklarını kaydetmek için.
  • BackupAndRestore : eğitim ilerlemesinin otomatik olarak yedeklendiğinden ve kümenin kullanılamaması durumunda (iptal etme veya önceden alma gibi) kurtarıldığından emin olmak için; veya
  • TensorBoard : ilerleme raporlarını TensorBoard aracında görselleştirilen özet dosyalara kaydetmek için.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
tutucu8 l10n-yer
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

ClusterCoordinator ile doğrudan kullanım (isteğe bağlı)

Model.fit eğitim yolunu seçseniz bile, işçiler üzerinde yürütülmesini istediğiniz diğer işlevleri planlamak için isteğe bağlı olarak bir tf.distribute.experimental.coordinator.ClusterCoordinator nesnesinin örneğini oluşturabilirsiniz. Daha fazla ayrıntı ve örnek için Özel eğitim döngüsüyle eğitim bölümüne bakın.

Özel bir eğitim döngüsüyle eğitim

tf.distribute.Strategy ile özel eğitim döngülerini kullanmak, eğitim döngülerini tanımlamak için büyük esneklik sağlar. Yukarıda tanımlanan ParameterServerStrategy ile ( strategy olarak), eğitim adımlarının yürütülmesini uzaktan çalışanlara göndermek için bir tf.distribute.experimental.coordinator.ClusterCoordinator kullanacaksınız.

Ardından, diğer tf.distribute.Strategy s ile eğitim döngüsünde yaptığınız gibi bir model oluşturacak, bir veri kümesi ve bir adım işlevi tanımlayacaksınız. Daha fazla ayrıntıyı tf.distribute.Strategy öğreticisiyle Özel eğitimde bulabilirsiniz.

Verimli veri kümesi önceden getirmeyi sağlamak için aşağıdaki Eğitim adımlarını uzaktan çalışanlara gönderme bölümünde belirtilen önerilen dağıtılmış veri kümesi oluşturma API'lerini kullanın. Ayrıca, çalışanlara tahsis edilen GPU'lardan tam olarak yararlanmak için worker_fn içinden Strategy.run çağırdığınızdan emin olun. Adımların geri kalanı, GPU'lu veya GPU'suz eğitim için aynıdır.

Bu bileşenleri aşağıdaki adımlarda oluşturalım:

verileri ayarla

İlk olarak, Keras ön işleme katmanları tarafından uygulanan ön işleme mantığını içeren bir veri kümesi oluşturan bir işlev yazın.

Bu katmanları dataset_fn dışında yaratacaksınız ama dönüşümü dataset_fn içinde uygulayacaksınız, çünkü dataset_fn içinde değişkenlerin oluşturulmasına izin vermeyen bir dataset_fn içine tf.function .

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
tutucu10 l10n-yer
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Bir veri kümesinde oyuncak örnekleri oluşturun:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Ardından, dataset_fn içine sarılmış eğitim veri kümesini oluşturun:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Modeli oluşturun

Ardından, modeli ve diğer nesneleri oluşturun. Tüm değişkenleri strategy.scope altında oluşturduğunuzdan emin olun.

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

FixedShardsPartitioner kullanımının tüm değişkenleri iki parçaya böldüğünü ve her parçanın farklı parametre sunucularına atandığını doğrulayalım:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Eğitim adımını tanımlayın

Üçüncüsü, bir tf.function içine sarılmış eğitim adımını oluşturun:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Yukarıdaki eğitim adımı işlevinde, step_fn Strategy.run ve Strategy.reduce çağrıları, çalışan başına birden çok GPU'yu destekleyebilir. Çalışanlara ayrılmış GPU'lar varsa, Strategy.run veri kümelerini birden çok kopyaya dağıtır.

Eğitim adımlarını uzaktan çalışanlara gönderin

Tüm hesaplamalar ParameterServerStrategy tarafından tanımlandıktan sonra, kaynaklar oluşturmak ve eğitim adımlarını uzak çalışanlara dağıtmak için tf.distribute.experimental.coordinator.ClusterCoordinator sınıfını kullanacaksınız.

Önce bir ClusterCoordinator nesnesi oluşturalım ve strateji nesnesini iletelim:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Ardından, çalışan başına bir veri kümesi ve bir yineleyici oluşturun. Aşağıdaki per_worker_dataset_fn , GPU'lara sorunsuz bir şekilde verimli bir şekilde önceden getirmeyi sağlamak için dataset_fn strategy.distribute_datasets_from_function içine sarmanız önerilir.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
tutucu18 l10n-yer
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Son adım, hesaplamayı ClusterCoordinator.schedule kullanarak uzak çalışanlara dağıtmaktır:

  • schedule yöntemi bir tf.function ve hemen geleceğe benzer bir RemoteValue döndürür. Kuyruğa alınan işlevler, arka plan iş parçacıklarında uzaktaki çalışanlara gönderilir ve RemoteValue eşzamansız olarak doldurulur.
  • ClusterCoordinator.join join , tüm zamanlanmış işlevler yürütülene kadar beklemek için kullanılabilir.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
tutucu20 l10n-yer
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

RemoteValue sonucunu şu şekilde alabilirsiniz:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
tutucu22 l10n-yer
Final loss is 0.000000

Alternatif olarak, tüm adımları başlatabilir ve tamamlanmasını beklerken bir şeyler yapabilirsiniz:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Bu özel örneğe yönelik eksiksiz eğitim ve sunum iş akışı için lütfen bu testi inceleyin .

Veri kümesi oluşturma hakkında daha fazla bilgi

Yukarıdaki koddaki veri kümesi, ClusterCoordinator.create_per_worker_dataset API'si kullanılarak oluşturulur). Çalışan başına bir veri kümesi oluşturur ve bir kapsayıcı nesnesi döndürür. Çalışan başına bir yineleyici oluşturmak için iter yöntemini çağırabilirsiniz. Çalışan başına yineleyici, çalışan başına bir yineleyici içerir ve bir çalışanın karşılık gelen dilimi, işlev belirli bir çalışan üzerinde yürütülmeden önce ClusterCoordinator.schedule yöntemine geçirilen işlevin giriş bağımsız değişkeninde değiştirilir.

Şu anda, ClusterCoordinator.schedule yöntemi, çalışanların eşdeğer olduğunu varsayar ve bu nedenle, bir Dataset.shuffle işlemi içermeleri durumunda farklı şekilde karıştırılabilmeleri dışında, farklı çalışanlar üzerindeki veri kümelerinin aynı olduğunu varsayar. Bu nedenle, veri kümelerinin süresiz olarak tekrarlanması ve bir veri kümesinden OutOfRangeError güvenmek yerine sınırlı sayıda adım planlamanız da önerilir.

Diğer bir önemli not da, tf.data veri kümelerinin, görev sınırları arasında örtük serileştirmeyi ve seriyi kaldırmayı desteklememesidir. Bu nedenle, tüm veri kümesini ClusterCoordinator.create_per_worker_dataset öğesine iletilen işlevin içinde oluşturmak önemlidir.

Değerlendirme

Dağıtılmış eğitimde bir değerlendirme döngüsü tanımlamanın ve çalıştırmanın birden fazla yolu vardır. Her birinin aşağıda açıklandığı gibi kendi artıları ve eksileri vardır. Bir tercihiniz yoksa satır içi değerlendirme yöntemi önerilir.

satır içi değerlendirme

Bu yöntemde koordinatör, eğitim ve değerlendirme arasında geçiş yapar ve bu nedenle buna satır içi değerlendirme denir.

Satır içi değerlendirmenin çeşitli faydaları vardır. Örneğin:

  • Tek bir görevin tutamayacağı büyük değerlendirme modellerini ve değerlendirme veri kümelerini destekleyebilir.
  • Değerlendirme sonuçları, bir sonraki çağın eğitimi için kararlar almak için kullanılabilir.

Satır içi değerlendirmeyi uygulamanın iki yolu vardır: doğrudan değerlendirme ve dağıtılmış değerlendirme.

  • Doğrudan değerlendirme : Küçük modeller ve değerlendirme veri kümeleri için, koordinatör, değerlendirme veri kümesini koordinatörde kullanarak doğrudan dağıtılmış model üzerinde değerlendirme çalıştırabilir:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
tutucu25 l10n-yer
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Dağıtılmış değerlendirme : Doğrudan koordinatör üzerinde çalıştırılması mümkün olmayan büyük modeller veya veri kümeleri için, koordinatör görevi, değerlendirme görevlerini ClusterCoordinator.schedule / ClusterCoordinator.join yöntemleri aracılığıyla çalışanlara dağıtabilir:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
tutucu27 l10n-yer
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Yan araç değerlendirmesi

Diğer bir yöntem, kontrol noktalarını tekrar tekrar okuyan ve en son kontrol noktasında değerlendirmeyi çalıştıran özel bir değerlendirici görevi oluşturduğunuz sepet değerlendirmesi olarak adlandırılır. Egzersiz döngünüzü değerlendirme sonuçlarına göre değiştirmeniz gerekmiyorsa, egzersiz programınızın erken bitmesine olanak tanır. Ancak, değerlendirmeyi tetiklemek için ek bir değerlendirici görevi ve periyodik kontrol noktası gerektirir. Olası bir sepet değerlendirme döngüsü aşağıdadır:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Gerçek dünyadaki kümeler

Gerçek bir üretim ortamında, tüm görevleri farklı makinelerde farklı süreçlerde çalıştıracaksınız. Her görevde küme bilgilerini yapılandırmanın en basit yolu, "TF_CONFIG" ortam değişkenlerini ayarlamak ve "TF_CONFIG" ayrıştırmak için bir tf.distribute.cluster_resolver.TFConfigClusterResolver kullanmaktır.

"TF_CONFIG" ortam değişkenleri hakkında genel bir açıklama için, Dağıtılmış eğitim kılavuzuna bakın.

Eğitim görevlerinizi Kubernetes veya diğer yapılandırma şablonlarını kullanarak başlatırsanız, bu şablonların sizin için zaten “TF_CONFIG" olması çok olasıdır.

"TF_CONFIG" ortam değişkenini ayarlayın

3 çalışanınız ve 2 parametre sunucunuz olduğunu varsayalım, işçi 1'in "TF_CONFIG" olabilir:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

Değerlendiricinin "TF_CONFIG" değeri şunlar olabilir:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Değerlendirici için yukarıdaki "TF_CONFIG" dizisindeki "cluster" kısmı isteğe bağlıdır.

Tüm görevler için aynı ikili dosyayı kullanırsanız

Tüm bu görevleri tek bir ikili dosya kullanarak çalıştırmayı tercih ederseniz, programınızın en başında farklı rollere ayrılmasına izin vermeniz gerekir:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Aşağıdaki kod bir TensorFlow sunucusunu başlatır ve bekler:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

İşlem hatası

işçi hatası

tf.distribute.experimental.coordinator.ClusterCoordinator veya Model.fit , çalışan hatası için yerleşik hata toleransı sağlar. Çalışan kurtarıldıktan sonra, veri kümelerini yeniden oluşturmak için çalışanlar üzerinde önceden sağlanan veri kümesi işlevi (özel eğitim döngüsü için ClusterCoordinator.create_per_worker_dataset veya tf.keras.utils.experimental.DatasetCreator for Model.fit için) çağrılır.

Parametre sunucusu veya koordinatör hatası

Ancak, koordinatör bir parametre sunucusu hatası gördüğünde, hemen bir UnavailableError veya AbortedError . Bu durumda koordinatörü yeniden başlatabilirsiniz. Koordinatörün kendisi de kullanılamaz hale gelebilir. Bu nedenle, eğitim ilerlemesini kaybetmemek için belirli araçlar önerilir:

  • Model.fit için, ilerleme kaydetme ve geri yüklemeyi otomatik olarak gerçekleştiren bir BackupAndRestore geri araması kullanmalısınız. Örnek için yukarıdaki Geri Aramalar ve eğitim bölümüne bakın.

  • Özel bir eğitim döngüsü için, model değişkenlerini periyodik olarak kontrol etmeli ve eğitim başlamadan önce varsa bir kontrol noktasından model değişkenlerini yüklemelisiniz. Bir optimize edici kontrol noktasına sahipse, eğitim ilerlemesi yaklaşık olarak optimizer.iterations öğesinden çıkarılabilir:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue Alma

Bir işlev başarıyla yürütülürse RemoteValue başarılı olacağı garanti edilir. Bunun nedeni, şu anda bir işlev yürütüldükten sonra dönüş değerinin koordinatöre hemen kopyalanmasıdır. Kopyalama sırasında herhangi bir çalışan hatası varsa, işlev uygun başka bir çalışan üzerinde yeniden denenecektir. Bu nedenle, performans için optimize etmek istiyorsanız, dönüş değeri olmayan işlevleri planlayabilirsiniz.

Hata raporlama

Koordinatör, parametre sunucularından UnavailableError gibi bir hata veya InvalidArgument tf.debugging.check_numerics gibi diğer uygulama hataları gördüğünde, hatayı yükseltmeden önce bekleyen ve kuyruğa alınan tüm işlevleri iptal eder. Karşılık gelen RemoteValue , CancelledError yükseltir.

Bir hata oluştuktan sonra, koordinatör aynı hatayı veya iptal edilen fonksiyonlardan herhangi bir hatayı ortaya çıkarmayacaktır.

Performans iyileştirme

ParameterServerStrategy ve ClusterResolver ile antrenman yaparken performans sorunları görmenizin birkaç olası nedeni vardır.

Yaygın bir neden, parametre sunucularının dengesiz yüke sahip olması ve bazı ağır yüklü parametre sunucularının kapasiteye ulaşmış olmasıdır. Ayrıca birden fazla kök neden olabilir. Bu sorunu hafifletmek için bazı basit yöntemler şunlardır:

  1. Bir ParameterServerStrategy oluştururken bir variable_partitioner belirterek büyük model değişkenlerinizi parçalayın.
  2. Mümkünse, tüm parametre sunucularının tek adımda ihtiyaç duyduğu bir etkin nokta değişkeni oluşturmaktan kaçının. Örneğin, varsayılan davranış, öğrenme hızının belirli bir parametre sunucusuna yerleştirilen ve her adımda diğer tüm parametre sunucuları tarafından istenen bir değişken haline gelmesi olduğundan, optimize edicilerde sabit bir öğrenme oranı veya alt sınıf tf.keras.optimizers.schedules.LearningRateSchedule . .
  3. Büyük kelime dağarcığınızı Keras ön işleme katmanlarına geçirmeden önce karıştırın.

Performans sorunlarının bir başka olası nedeni de koordinatördür. İlk schedule / join uygulamanız Python tabanlıdır ve bu nedenle iş parçacığı ek yüküne sahip olabilir. Ayrıca koordinatör ve işçiler arasındaki gecikme büyük olabilir. Eğer durum buysa,

  • Model.fit için, steps_per_execution sağlananstep_per_execution bağımsız değişkenini Model.compile büyük bir değere ayarlayabilirsiniz.

  • Özel bir eğitim döngüsü için, birden çok adımı tek bir tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Kitaplık daha da optimize edildiğinden, umarım çoğu kullanıcı gelecekte adımları manuel olarak paketlemek zorunda kalmaz.

Ek olarak, performans iyileştirme için küçük bir numara, yukarıdaki görev hatası işleme bölümünde açıklandığı gibi, işlevleri bir dönüş değeri olmadan programlamaktır.

Bilinen sınırlamalar

Bilinen sınırlamaların çoğu zaten yukarıdaki bölümlerde ele alınmıştır. Bu bölüm bir özet sunar.

ParameterServerStrategy genel

  • Hata toleransının düzgün çalışması için koordinatör dahil her görevde os.environment["grpc_fail_fast"]="use_caller" gereklidir.
  • Senkronize parametre sunucusu eğitimi desteklenmez.
  • Optimum performansı elde etmek için genellikle birden fazla adımı tek bir işlevde toplamak gerekir.
  • Parçalanmış değişkenler içeren bir tf.saved_model.load tf.saved_model.load yoluyla yüklenmesi desteklenmez. Böyle bir save_modelinin TensorFlow Serving kullanılarak yüklenmesinin işe yarayacağını unutmayın.
  • Parçalanmış optimize edici yuva değişkenlerini içeren bir kontrol noktasının farklı sayıda parçaya yüklenmesi desteklenmez.
  • Koordinatör görevini yeniden başlatmadan parametre sunucusu hatasından kurtarma işlemi desteklenmez.
  • tf.lookup.StaticHashTable (genellikle tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup ve tf.keras.layers.TextVectorization gibi bazı Keras ön işleme katmanları tarafından kullanılır) kullanımı, kaynakların koordinatör şu anda parametre sunucusu eğitimi ile. Bunun, çalışanlardan koordinatöre kadar arama RPC'leri için performans etkileri vardır. Bu, ele alınması gereken mevcut bir yüksek önceliktir.

Model.fit özellikleri

  • steps_per_epoch Model.fit değişkeni gereklidir. Bir çağda uygun aralıkları sağlayan bir değer seçebilirsiniz.
  • ParameterServerStrategy , performans nedenleriyle toplu düzeyde çağrılara sahip özel geri aramaları desteklemez. Bu çağrıları, uygun şekilde seçilmiş steps_per_epoch ile çağ düzeyinde çağrılara dönüştürmelisiniz, böylece her steps_per_epoch adım sayısı olarak adlandırılırlar. Yerleşik geri aramalar etkilenmez: toplu düzeydeki aramaları, performans gösterecek şekilde değiştirildi. ParameterServerStrategy için toplu düzeyde çağrıların desteklenmesi planlanmaktadır.
  • Aynı nedenle, diğer stratejilerden farklı olarak, ilerleme çubuğu ve metrikler yalnızca dönem sınırlarında günlüğe kaydedilir.
  • run_eagerly desteklenmiyor.

Özel eğitim döngüsü özellikleri