Yardım Kaggle üzerinde TensorFlow ile Büyük Bariyer Resifi korumak Meydan Üyelik

ParameterServerStrategy ile parametre sunucusu eğitimi

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Parametre sunucu eğitimi birden makinelerde modeli eğitim büyütmek için ortak bir veri paralel bir yöntemdir.

Bir parametre sunucu eğitim küme işçi ve parametre sunucudan oluşur. Değişkenler parametre sunucularında oluşturulur ve her adımda çalışanlar tarafından okunur ve güncellenir. Varsayılan olarak, çalışanlar bu değişkenleri birbirleriyle senkronize etmeden bağımsız olarak okur ve günceller. Bu asenkron eğitim denir neden bazen sunucu tarzı eğitim parametresidir.

TensorFlow 2, parametre sunucu eğitim tarafından desteklenmektedir tf.distribute.experimental.ParameterServerStrategy binlerce işçi kadar ölçekler bir kümeye eğitim adımlarını dağıtan sınıf, (parametre sunucularının eşliğinde).

Desteklenen eğitim yöntemleri

Desteklenen iki ana eğitim yöntemi vardır:

İşler ve görevler içeren bir küme

Ne olursa olsun seçim API (arasında Model.fit a: veya özel bir eğitim döngüsü), TensorFlow 2'de dağıtılan eğitim gerektirir 'cluster' birkaç ile 'jobs' ve işlerin her bir veya daha fazla olabilir 'tasks' .

Parametre sunucusu eğitimini kullanırken şunlara sahip olunması önerilir:

  • Bir koordinatör iş (işin adı olan chief )
  • Çoklu işçi işler (iş adı worker ); ve
  • Çoklu parametre sunucu işler (iş adı ps )

Koordinatör kaynakları, görevleri eğitim telgraflari yaratırken, kontrol noktaları yazar ve görev arızaları, işçi ve parametre sunucuları ile fırsatlar çalıştırmak tf.distribute.Server koordinatörü gelen istekleri dinlemek.

İle Parametre sunucu eğitim Model.fit API

İle Parametre sunucu eğitim Model.fit API bir kullanımı koordinatör gerektirir tf.distribute.experimental.ParameterServerStrategy nesnesi ve bir tf.keras.utils.experimental.DatasetCreator girişi olarak. Benzer Model.fit veya başka stratejilerle, iş akışı oluşturma ve modelini derleme, bir takiben geri aramalar, hazırlanmasını içerir hiçbir strateji ile kullanım Model.fit çağrısı.

Özel bir eğitim döngüsü ile parametre sunucusu eğitimi

Özel eğitim döngüler ile, tf.distribute.experimental.coordinator.ClusterCoordinator sınıf koordinatörü için kullanılan anahtar bileşenidir.

Tarafından sağlanan en önemli API ClusterCoordinator nesnesi olan schedule :

  • schedule API enqueues tf.function ve geleceği benzeri döner RemoteValue hemen.
  • Sıraya fonksiyonlar arka plan iş parçacığı uzak işçilere sevk edilecek ve bunların RemoteValue ler uyumsuz doldurulacaktır.
  • Yana schedule işçi ataması gerektirmez, tf.function kullanılabilir herhangi bir işçi üzerinde çalıştırılabilir içinde geçti.
  • Üzerinde yürütüldüğü çalışan tamamlanmadan önce kullanılamaz hale gelirse, işlev uygun başka bir çalışan üzerinde yeniden denenir.
  • Bu gerçek ve işlevin yürütülmesinin atomik olmadığı gerçeği nedeniyle, bir işlev birden fazla kez yürütülebilir.

Uzak fonksiyonları göndermeni yanı sıra, ClusterCoordinator ayrıca veri setlerini tüm işçilere veri setlerini oluşturmak ve yeniden inşa etmek için yardımcı olur zaman yetmezliğinden bir işçi kurtarır.

Eğitim kurulumu

Öğretici yöneleceğini Model.fit ve özel eğitim döngü yolları ve size uygun olanı seçebilirsiniz. "X ile Eğitim" dışındaki bölümler her iki yol için de geçerlidir.

pip install portpicker

küme kurulumu

Yukarıda belirtildiği gibi, bir parametre sunucu eğitim küme çalıştırmak TensorFlow Sunucuları olmak antrenman programı, bir veya birkaç işçi ve parametre sunucu görevleri çalıştırır koordinatör görevi gerektirir tf.distribute.Server -ve muhtemelen ek bir değerlendirme görevi olduğunu çalışır yan araç değerlendirme (aşağıdaki sepet değerlendirme bölümüne bakın). Bunları ayarlamak için gereksinimler şunlardır:

  • Koordinatör görevinin, değerlendirici dışındaki tüm diğer TensorFlow sunucularının adreslerini ve bağlantı noktalarını bilmesi gerekir.
  • Çalışanların ve parametre sunucularının hangi bağlantı noktasını dinlemeleri gerektiğini bilmeleri gerekir. Basitlik adına, bu görevlerde TensorFlow sunucuları oluştururken genellikle tam küme bilgilerini iletebilirsiniz.
  • Değerlendirici görevinin eğitim kümesinin kurulumunu bilmesi gerekmez. Varsa, eğitim kümesine bağlanmaya çalışmamalıdır.
  • İşçi ve parametre sunucuları olarak görev türlerine sahip olmalıdır "worker" ve "ps" sırasıyla. Koordinatör kullanmalıdır "chief" eski nedenlerle görev türü olarak.

Bu öğreticide, tüm parametre sunucusu eğitiminin Colab'da çalıştırılabilmesi için bir süreç içi küme oluşturacaksınız. Sen nasıl ayarlanacağını öğreneceksiniz gerçek kümeleri daha sonraki bir bölümde.

İşlem içi küme

Önceden birkaç TensorFlow sunucusu oluşturarak başlayacak ve bunlara daha sonra bağlanacaksınız. Not Burada yalnızca bu tutorial en gösteri amaçlı olduğunu ve gerçek eğitim sunucular üzerinde başlatılan olacağını "worker" ve "ps" makinelerin.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

İn-proses küme kurulumu sıkça gibi, birim test kullanılır burada .

Yerel test için bir başka seçenek dışarı yerel makine-çeke süreçleri başlatmak için olduğunu keras ile Çoklu işçi eğitimi bu yaklaşımın bir örneği için.

Bir ParameterServerStrategy örneği oluşturun

Eğer eğitim koduna geçmeden önce, bir örneğini izin ParameterServerStrategy nesnesi. Bu olursa olsun devam etmeden olup olmadıklarına gerekli olduğunu Not Model.fit veya özel bir eğitim döngü. variable_partitioner bağımsız değişken olarak açıklanacaktır Değişken sharding bölümü .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:16686', 'localhost:23151'], 'worker': ['localhost:16753', 'localhost:22750', 'localhost:20823']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:16686', 'localhost:23151'], 'worker': ['localhost:16753', 'localhost:22750', 'localhost:20823']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

GPU'ları eğitim için kullanmak için, her çalışanın görebileceği GPU'ları tahsis edin. ParameterServerStrategy bütün işçiler mevcut GPU'ları aynı sayıda olması gerektiğini kısıtlama, her işçi üzerinde mevcut tüm GPU'ları kullanacaktır.

Değişken parçalama

Değişken Kırma işlemi bölme için kırıklarını denir birden fazla küçük değişken, bir değişken ifade eder. Değişken parçalama, bu parçalara erişirken ağ yükünü dağıtmak için faydalı olabilir. Normal bir değişkenin hesaplanmasını ve depolanmasını birden çok parametre sunucusu arasında dağıtmak da yararlıdır.

Değişken Sharding etkinleştirmek için, bir de geçebilir variable_partitioner bir oluşturarak zaman ParameterServerStrategy nesnesi. variable_partitioner değişken oluşturulduğunda her zaman çağrılır ve değişkenin her boyutun boyunca kırık sayısını dönmesi bekleniyor. Bazı dışı kutu variable_partitioner ler gibi verilmiştir tf.distribute.experimental.partitioners.MinSizePartitioner . Gibi boyut tabanlı bölme duvarları kullanılması önerilir tf.distribute.experimental.partitioners.MinSizePartitioner modeli eğitim hızına olumsuz etki yaratabilir küçük değişkenleri bölünmesine önlemek için.

Bir zaman variable_partitioner içinde geçirilir ve hemen altında bir değişken oluşturmak eğer strategy.scope() , bir olan bir konteyner tipi olacak variables kırıkların listesine erişim sağlar özelliği. Çoğu durumda, bu kapsayıcı, tüm parçaları birleştirerek otomatik olarak bir Tensöre dönüştürülür. Sonuç olarak, normal bir değişken olarak kullanılabilir. Öte yandan, gibi bazı TensorFlow yöntemleri tf.nn.embedding_lookup bu konteyner türü için ve otomatik birleştirme önlenecektir bu yöntemlerin etkili uygulanmasını sağlamak.

API docs bakınız tf.distribute.experimental.ParameterServerStrategy fazla ayrıntı için.

İle Eğitim Model.fit

Keras aracılığıyla kolay kullanımlı eğitim API sağlar Model.fit o kolları eğitimi geçersiz kılınabilir esnekliğiyle başlık altında döngü, train_step böyle TensorBoard için tasarruf kontrol noktası tasarrufu veya özet olarak işlevsellikleri sağlamak ve geri aramalar. İle Model.fit , aynı eğitim kod strateji nesnenin basit bir takas ile diğer stratejileri için de kullanılabilir.

Giriş verileri

Model.fit parametre sunucu eğitimi ile Giriş veri tipi tek argüman alır bir çağrılabilir temin gerektirir tf.distribute.InputContext ve döndürür tf.data.Dataset . Daha sonra, bir oluşturma tf.keras.utils.experimental.DatasetCreator örneğin alır nesne callable ve isteğe bağlı bir tf.distribute.InputOptions ile nesne input_options argüman.

O karıştırmak ve parametre sunucu eğitimi ile veri tekrarlamak ve belirtmek için tavsiye edilir Not steps_per_epoch içinde fit kütüphane dönemi sınırlarını bilmesi çağrı.

Bakınız Dağıtılmış giriş hakkında daha fazla bilgi için öğretici InputContext argüman.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Kod dataset_fn işçi makineleri her birinde, çoğunlukla CPU giriş cihazı, üzerinde çağrılır.

Model oluşturma ve derleme

Şimdi, bir yaratacak tf.keras.Model -a önemsiz tf.keras.models.Sequential gösteri amaçlı-takiben tarafından modeli Model.compile gibi gibi bir iyileştirici, metrikler veya parametreler gibi bileşenler, dahil etmek çağrı steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Geri aramalar ve eğitim

Aramadan önce model.fit fiili eğitim için, en gibi ortak görevler için gerekli geri aramalar hazırlayalım:

  • ModelCheckpoint : Model ağırlıkları kaydedin.
  • BackupAndRestore : Emin eğitim ilerleme otomatik olarak yedeklenir ve eğer geri kazanılır yapmak için (örneğin iptal veya önalım gibi) küme deneyimleri kullanılamazlık; veya
  • TensorBoard : TensorBoard aracında görüntülenir almak özeti dosyalarının içine ilerleme raporlarını kaydedin.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-12-02 02:22:17.429288: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 6s - loss: 0.6550 - 6s/epoch - 286ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.5718 - 546ms/epoch - 27ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f4b38365dd0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f4b4a806c20> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 1s - loss: 0.4267 - 502ms/epoch - 25ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.3612 - 394ms/epoch - 20ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.3184 - 385ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f4b4b93c510>

Doğrudan kullanım ClusterCoordinator (isteğe bağlı)

Seçtiğiniz bile Model.fit eğitim yolunu, isteğe bağlı olarak bir örneğini tf.distribute.experimental.coordinator.ClusterCoordinator Eğer işçilere yürütülecek istiyorum diğer işlevleri zamanlama nesneyi. Bkz özel eğitim döngü ile Eğitim Daha fazla bilgi ve örnekler için bölümüne.

Özel bir eğitim döngüsüyle eğitim

İle özel eğitim döngüler kullanarak tf.distribute.Strategy eğitim döngüler tanımlamak için büyük esneklik sağlar. İle ParameterServerStrategy (yukarıda tanımlandığı üzere strategy ), bir kullanacak tf.distribute.experimental.coordinator.ClusterCoordinator uzak işçilere eğitim adımlarının yürütülmesini merkeze.

Ardından, bir model yaratacak diğer ile eğitim döngüsünde yaptıkları gibi, bir veri kümesi ve bir adım işlev tanımlamak tf.distribute.Strategy s. Sen daha fazla ayrıntı bulabilirsiniz tf.distribute.Strategy ile özel eğitim öğretici.

Verimli veri kümesi önceden getirilmesini sağlamak için, sözü edilen veri kümesi oluşturma API'leri dağıtılan tavsiye kullanmak uzak işçiler için Sevk eğitim adımlar aşağıdaki bölümde. Ayrıca, aramak emin olun Strategy.run içindeki worker_fn işçilere tahsis GPU'larının tam olarak yararlanmak için. Adımların geri kalanı, GPU'lu veya GPU'suz eğitim için aynıdır.

Bu bileşenleri aşağıdaki adımlarda oluşturalım:

verileri ayarla

İlk olarak, uyguladığı mantığı ön işlenmesi içerir bir veri kümesini oluşturan bir işlev yazmaktır Keras önişleme katmanları .

Dışarıdan Bu katmanları yaratacak dataset_fn içindeki dönüşümü ancak uygulamak dataset_fn sen sarmak olacağından, dataset_fn bir içine tf.function değişkenleri içine oluşturulmasına izin vermez.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Bir veri kümesinde oyuncak örnekleri oluşturun:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Daha sonra, sarılmış eğitim veri kümesi oluşturmak dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Modeli oluşturun

Ardından, modeli ve diğer nesneleri oluşturun. Altındaki tüm değişkenler oluşturmak için emin olun strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Kullanımı o edelim onayla FixedShardsPartitioner iki kırıkların içine bütün değişkenleri bölünmüş ve kırıkların her biri farklı parametre sunucularına atandı:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Eğitim adımını tanımlayın

Üçüncü olarak, bir içine sarılmış eğitim adımını oluşturmak tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Yukarıdaki eğitim aşaması fonksiyonda, çağıran Strategy.run ve Strategy.reduce içinde step_fn çalışan başına birden GPU'ları destekleyebilir. İşçiler GPU'lar tahsis varsa, Strategy.run birden kopyaları üzerinde veri setlerini dağıtacak.

Eğitim adımlarını uzaktan çalışanlara gönderin

Tüm hesaplamalar tanımlanır sonra ParameterServerStrategy , kullanacağınız tf.distribute.experimental.coordinator.ClusterCoordinator kaynakları oluşturma sınıfı ve uzak işçilere eğitim adımlarını dağıtın.

Önce bir yaratalım ClusterCoordinator nesne ve strateji nesnesi geçirin:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Ardından, çalışan başına bir veri kümesi ve bir yineleyici oluşturun. Gelen per_worker_dataset_fn aşağıda, sarma dataset_fn içine strategy.distribute_datasets_from_function sorunsuz GPU'ları için verimli önceden getirilmesini sağlamak için önerilir.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Son adım kullanarak uzak işçilere hesaplama dağıtılmasıdır ClusterCoordinator.schedule :

  • schedule yöntemi enqueues tf.function ve geleceği benzeri döner RemoteValue hemen. Sıraya fonksiyonlar arka plan iş parçacığı uzak işçilere sevk edilecek ve RemoteValue uyumsuz doldurulacaktır.
  • join yöntemi ( ClusterCoordinator.join ) tüm işlevleri otomatik yürütülür kadar beklemek kullanılabilir.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.637500.
Finished epoch 1, accuracy is 0.906250.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Burada bir sonucunu getirebilir nasıl RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternatif olarak, tüm adımları başlatabilir ve tamamlanmasını beklerken bir şeyler yapabilirsiniz:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Bu özel örnekte için tam eğitim ve hizmet veren iş akışı için, bu kontrol edin testi .

Veri kümesi oluşturma hakkında daha fazla bilgi

Yukarıdaki kodda veri kümesi kullanılarak oluşturulan ClusterCoordinator.create_per_worker_dataset ) API. Çalışan başına bir veri kümesi oluşturur ve bir kapsayıcı nesnesi döndürür. Sen diyebilirsin iter başına işçi yineleyici oluşturmak için üzerine yöntemini. Başına çalışan yineleyici işçi için bir yineleyici içerir ve bir işçinin uygun dilim geçirilen fonksiyon parametreli olarak ikame edilecek ClusterCoordinator.schedule işlevi, belirli bir işçi ilgili yürütülmeden önce yöntem.

Şu anda, ClusterCoordinator.schedule yöntemi işçi eşdeğer kabul ve bu nedenle bir içeren farklı ise karıştırılmış olabilir dışında farklı işçiler üzerinde veri setleri aynıdır kabul Dataset.shuffle işlemi. Bu nedenle, aynı zamanda veri kümeleri süresiz tekrarlanması önerilir ve yerine güvenmek sonlu adımda sayıda program OutOfRangeError bir veri kümesinden.

Bir diğer önemli not olmasıdır tf.data veri kümeleri görev sınırları ötesinde örtülü seri ve deserialization desteklemez. Bu yüzden geçirilen işlevi içinde bütün veri kümesini oluşturmak için önemlidir ClusterCoordinator.create_per_worker_dataset .

Değerlendirme

Dağıtılmış eğitimde bir değerlendirme döngüsü tanımlamanın ve çalıştırmanın birden fazla yolu vardır. Her birinin aşağıda açıklandığı gibi kendi artıları ve eksileri vardır. Bir tercihiniz yoksa satır içi değerlendirme yöntemi önerilir.

satır içi değerlendirme

Bu yöntemde, eğitim ve değerlendirme ve böylece onu denir satır içi değerlendirme arasındaki koordinatör alternatiflerini.

Satır içi değerlendirmenin çeşitli faydaları vardır. Örneğin:

  • Tek bir görevin tutamayacağı büyük değerlendirme modellerini ve değerlendirme veri kümelerini destekleyebilir.
  • Değerlendirme sonuçları, bir sonraki çağın eğitimi için kararlar almak için kullanılabilir.

Satır içi değerlendirmeyi uygulamanın iki yolu vardır: doğrudan değerlendirme ve dağıtılmış değerlendirme.

  • Doğrudan değerlendirme: Küçük modelleri ve değerlendirme veri setleri için, koordinatör koordinatörü üzerinde değerlendirme veri kümesi ile dağıtılan model üzerinde doğrudan değerlendirmeyi çalıştırabilirsiniz:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Dağıtılmış değerlendirme: büyük modelleri veya koordinatör doğrudan çalıştırmak için olanaksız olan veri setleri için aracılığıyla işçilere değerlendirme görevleri dağıtabilir koordinatör görev ClusterCoordinator.schedule / ClusterCoordinator.join yöntemlerine:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Yan araç değerlendirmesi

Diğer bir yöntem art arda kontrol noktaları okur ve bir son kontrol noktasında üzerinde değerlendirme çalıştığı özel bir değerlendirici görev oluşturmak yan araç değerlendirme denir. Egzersiz döngünüzü değerlendirme sonuçlarına göre değiştirmeniz gerekmiyorsa, egzersiz programınızın erken bitmesine olanak tanır. Ancak, değerlendirmeyi tetiklemek için ek bir değerlendirici görevi ve periyodik kontrol noktası gerektirir. Olası bir sepet değerlendirme döngüsü aşağıdadır:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Gerçek dünyadaki kümeler

Gerçek bir üretim ortamında, tüm görevleri farklı makinelerde farklı süreçlerde çalıştıracaksınız. Her görev yapılandırmak küme bilgilere basit yolu ayarlamaktır "TF_CONFIG" ortam değişkenleri ve bir kullanma tf.distribute.cluster_resolver.TFConfigClusterResolver ayrıştırmak için "TF_CONFIG" .

Hakkında genel bir açıklama için "TF_CONFIG" ortam değişkenleri, bakın Dağıtılmış eğitim rehberi.

Eğer Kubernetes veya diğer yapılandırma şablonları kullanarak antrenman görevleri başlarsak, çok büyük olasılıkla bu şablonlar zaten ayarlanmış olmasıdır “TF_CONFIG" senin için.

Set "TF_CONFIG" ortam değişkeni

Eğer 3 işçi ve 2 parametre sunucularını olduğunu varsayalım, "TF_CONFIG" işçinin 1 olabilir:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" değerlendirici olabilir:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" Yukarıdaki bölüm "TF_CONFIG" değerlendirici için dize isteğe bağlıdır.

Tüm görevler için aynı ikili dosyayı kullanırsanız

Tüm bu görevleri tek bir ikili dosya kullanarak çalıştırmayı tercih ederseniz, programınızın en başında farklı rollere ayrılmasına izin vermeniz gerekir:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Aşağıdaki kod bir TensorFlow sunucusunu başlatır ve bekler:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

İşlem hatası

işçi hatası

tf.distribute.experimental.coordinator.ClusterCoordinator veya Model.fit yerleşik sağlamak işçi başarısızlığın hata toleransı. İşçi kurtarma üzerine, daha önce sağlanan veri kümesi (birine işlevi ClusterCoordinator.create_per_worker_dataset özel eğitim döngü veya tf.keras.utils.experimental.DatasetCreator için Model.fit ) işçilerin üzerine çağrılır veri setlerini yeniden oluşturun.

Parametre sunucusu veya koordinatör hatası

Koordinatör bir parametre sunucu hatasını gördüğünde Ancak, bir yükseltecektir UnavailableError veya AbortedError hemen. Bu durumda koordinatörü yeniden başlatabilirsiniz. Koordinatörün kendisi de kullanılamaz hale gelebilir. Bu nedenle, eğitim ilerlemesini kaybetmemek için belirli araçlar önerilir:

  • İçin Model.fit , bir kullanmalısınız BackupAndRestore otomatik ilerleme tasarrufu ve restorasyon kolları geri arama. Bkz Callbacks ve eğitim bir örnek için yukarıdaki bölüme.

  • Özel bir eğitim döngüsü için, model değişkenlerini periyodik olarak kontrol etmeli ve eğitim başlamadan önce varsa bir kontrol noktasından model değişkenlerini yüklemelisiniz. Eğitim ilerleme yaklaşık anlaşılabilir optimizer.iterations bir iyileştirici checkpointed ise:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Bir getiriliyor RemoteValue

Bir getiriliyor RemoteValue bir işlev başarıyla yürütüldü ise başarılı olmak için garanti edilir. Bunun nedeni, şu anda bir işlev yürütüldükten sonra dönüş değerinin koordinatöre hemen kopyalanmasıdır. Kopyalama sırasında herhangi bir çalışan hatası varsa, işlev uygun başka bir çalışan üzerinde yeniden denenecektir. Bu nedenle, performans için optimize etmek istiyorsanız, dönüş değeri olmayan işlevleri planlayabilirsiniz.

Hata raporlama

Koordinatör gibi bir hata görür sonra UnavailableError gibi en parametre sunucuları veya diğer uygulama hatalarını InvalidArgument gelen tf.debugging.check_numerics , bu hatayı kaldırmadan önce tüm bekleyen ve kuyruğa fonksiyonlarını iptal eder. Bunlara karşılık gelen getiriliyor RemoteValue s yükseltecektir CancelledError .

Bir hata oluştuktan sonra, koordinatör aynı hatayı veya iptal edilen fonksiyonlardan herhangi bir hatayı ortaya çıkarmayacaktır.

Performans iyileştirme

Birlikte tren ne zaman performans sorunları görürseniz birkaç olası nedeni vardır ParameterServerStrategy ve ClusterResolver .

Yaygın bir neden, parametre sunucularının dengesiz yüke sahip olması ve bazı ağır yüklü parametre sunucularının kapasiteye ulaşmış olmasıdır. Ayrıca birden fazla kök neden olabilir. Bu sorunu hafifletmek için bazı basit yöntemler şunlardır:

  1. Bir belirterek aracılığıyla büyük modeli değişkenleri Shard variable_partitioner bir oluşturarak zaman ParameterServerStrategy .
  2. Mümkünse, tüm parametre sunucularının tek adımda ihtiyaç duyduğu bir etkin nokta değişkeni oluşturmaktan kaçının. Örneğin, sürekli bir öğrenme hızı veya alt sınıf kullanmak tf.keras.optimizers.schedules.LearningRateSchedule varsayılan davranış öğrenme oranı her adımda tüm diğer parametre sunucular tarafından belirli bir parametre sunucusunda yerleştirilen ve istenen bir değişken olacağını olduğundan optimize içinde .
  3. Büyük kelime dağarcığınızı Keras ön işleme katmanlarına geçirmeden önce karıştırın.

Performans sorunlarının bir başka olası nedeni de koordinatördür. Sizin ilk uygulama schedule / join Python tabanlı ve dolayısıyla havai diş olabilir. Ayrıca koordinatör ve işçiler arasındaki gecikme büyük olabilir. Eğer durum buysa,

  • İçin Model.fit , ayarlayabileceğiniz steps_per_execution sağlanan argüman Model.compile 1'den büyük bir değere.

  • Özel bir eğitim döngüsü, bir tek içine birden çok adım paketi olabilir tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Kitaplık daha da optimize edildiğinden, umarım çoğu kullanıcı gelecekte adımları manuel olarak paketlemek zorunda kalmaz.

Ek olarak, performans iyileştirme için küçük bir numara, yukarıdaki görev hatası işleme bölümünde açıklandığı gibi, işlevleri bir dönüş değeri olmadan programlamaktır.

Bilinen sınırlamalar

Bilinen sınırlamaların çoğu zaten yukarıdaki bölümlerde ele alınmıştır. Bu bölüm bir özet sağlar.

ParameterServerStrategy genel

  • os.environment["grpc_fail_fast"]="use_caller" düzgün hata toleransı çalışması için, koordinatör dahil her görevi ihtiyaç vardır.
  • Senkronize parametre sunucusu eğitimi desteklenmez.
  • Optimum performansı elde etmek için genellikle birden fazla adımı tek bir işlevde toplamak gerekir.
  • Üzeri saved_model yüklenmesi desteklenmez tf.saved_model.load kanatlı bir değişkenleri ihtiva etmektedir. Böyle bir save_modelinin TensorFlow Serving kullanılarak yüklenmesinin işe yarayacağını unutmayın.
  • Parçalanmış optimize edici yuva değişkenlerini içeren bir kontrol noktasının farklı sayıda parçaya yüklenmesi desteklenmez.
  • Koordinatör görevini yeniden başlatmadan parametre sunucusu hatasından kurtarma işlemi desteklenmez.
  • Kullanım tf.lookup.StaticHashTable (yaygın gibi bazı Keras ön işleme tabakaları tarafından kullanılır tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup ve tf.keras.layers.TextVectorization kaynakları ile sonuçlanır yerleştirilen) koordinatör şu anda parametre sunucusu eğitimi ile. Bunun, çalışanlardan koordinatöre kadar arama RPC'leri için performans etkileri vardır. Bu, ele alınması gereken mevcut bir yüksek önceliktir.

Model.fit özelliklerini

  • steps_per_epoch argüman gereklidir Model.fit . Bir çağda uygun aralıkları sağlayan bir değer seçebilirsiniz.
  • ParameterServerStrategy performans nedenleriyle toplu düzey görüşmeleri yapmak özel geri aramaları için destek yoktur. Uygun yerlerde aldı ile çağ düzey görüşmeleri içine bu aramaları dönüştürmek gerekir steps_per_epoch onlar her denir böylece, steps_per_epoch adımların sayısını. Yerleşik geri aramalar etkilenmez: toplu düzeydeki aramaları, performans gösterecek şekilde değiştirildi. İçin toplu düzey görüşmeleri Destekleyici ParameterServerStrategy planlanıyor.
  • Aynı nedenle, diğer stratejilerden farklı olarak, ilerleme çubuğu ve metrikler yalnızca dönem sınırlarında günlüğe kaydedilir.
  • run_eagerly desteklenmemektedir.

Özel eğitim döngüsü özellikleri