Yerel TensorFlow Everywhere etkinliğiniz için bugün LCV!
Bu sayfa, Cloud Translation API ile çevrilmiştir.
Switch to English

Estimator ile çok çalışanlı eğitim

TensorFlow.org'da görüntüleyin Google Colab'de çalıştırın Kaynağı GitHub'da görüntüleyin Defteri indirin

Genel Bakış

Bu eğitimde nasıl kullanılacağını gösterir tf.distribute.Strategy ile dağıtılan çoklu işçi eğitimi için kullanılabilir tf.estimator . Kodunuzu tf.estimator kullanarak tf.estimator ve yüksek performanslı tek bir makinenin ötesinde ölçeklendirmeyle ilgileniyorsanız, bu eğitim tam size göre.

Başlamadan önce lütfen dağıtım stratejisi kılavuzunu okuyun. Çoklu GPU eğitim öğreticisi de konuyla ilgilidir çünkü bu eğitimde aynı modeli kullanır.

Kurmak

İlk olarak, TensorFlow'u ve gerekli içe aktarmaları kurun.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json

Giriş işlevi

Bu öğreticide, TensorFlow Veri Kümelerinden MNIST veri kümesini kullanır. Buradaki kod, tek bir temel farkla çoklu GPU eğitim öğreticisine benzer: Çok çalışanlı eğitim için Estimator kullanılırken, model yakınsamasını sağlamak için veri kümesini çalışan sayısına göre parçalamak gerekir. Girdi verileri, çalışan dizini tarafından 1/num_workers , böylece her işçi, veri kümesinin 1/num_workers farklı bölümlerini işler.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Yakınsamayı başarmak için bir başka makul yaklaşım, veri kümesini her çalışanda farklı tohumlarla karıştırmaktır.

Çok çalışanlı yapılandırma

Bu öğreticideki temel farklardan biri ( çoklu GPU eğitim öğreticisine kıyasla), çok çalışanlı kurulumdur. TF_CONFIG ortam değişkeni, kümenin parçası olan her çalışana küme yapılandırmasını belirtmenin standart yoludur.

TF_CONFIG iki bileşeni vardır: cluster ve task . cluster , cluster tamamı, yani kümedeki çalışanlar ve parametre sunucuları hakkında bilgi sağlar. task , geçerli task hakkında bilgi sağlar. İlk bileşen cluster , cluster tüm çalışanlar ve parametre sunucuları için aynıdır ve ikinci bileşen task , her çalışan ve parametre sunucusunda farklıdır ve kendi type ve index belirtir. Bu örnekte, görev type worker ve görev index 0 .

Örnekleme amacıyla, bu öğretici localhost 2 çalışanlı bir TF_CONFIG nasıl ayarlanacağını gösterir. Uygulamada, harici bir IP adresi ve bağlantı noktası üzerinde birden fazla işçi oluşturursunuz ve her bir işçi için uygun şekilde TF_CONFIG ayarlarsınız, yani görev index değiştirirsiniz.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Modeli tanımlayın

Eğitim için katmanları, optimize ediciyi ve kayıp işlevini yazın. Bu eğitici, modeli çoklu GPU eğitim dersine benzer şekilde Keras katmanlarıyla tanımlar.

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrateji

Modeli eğitmek için tf.distribute.experimental.MultiWorkerMirroredStrategy örneğini kullanın. MultiWorkerMirroredStrategy , tüm çalışanlar genelinde her cihazda modelin katmanlarında bulunan tüm değişkenlerin kopyalarını oluşturur. Gradyanları toplamak ve değişkenleri senkronize tutmak için toplu iletişim için bir TensorFlow op olan CollectiveOps kullanır. tf.distribute.Strategy kılavuzu bu strateji hakkında daha fazla ayrıntı içerir.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

Modeli eğitin ve değerlendirin

Daha sonra, tahminci için RunConfig dağıtım stratejisini belirtin ve RunConfig çağırarak tf.estimator.train_and_evaluate ve değerlendirin. Bu öğretici, stratejiyi train_distribute aracılığıyla belirterek yalnızca eğitimi train_distribute . Değerlendirmeyi eval_distribute ile dağıtmak da mümkündür.

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f4c6c18af98>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.

INFO:tensorflow:Create CheckpointSaverHook.

WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:loss = 2.3278575, step = 0

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:global_step/sec: 201.897

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec)

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:global_step/sec: 215.773

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec)

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:global_step/sec: 213.717

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec)

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:global_step/sec: 215.652

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec)

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:global_step/sec: 215.686

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec)

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:global_step/sec: 217.858

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec)

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:global_step/sec: 216.886

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec)

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:global_step/sec: 242.69

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec)

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:global_step/sec: 621.93

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec)

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Done calling model_fn.

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Graph was finalized.

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Done running local_init_op.

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [10/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [20/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [30/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [40/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [50/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [60/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [70/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [80/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [90/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Evaluation [100/100]

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Inference Time : 1.01975s

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938

INFO:tensorflow:Loss for final step: 1.1389045.

INFO:tensorflow:Loss for final step: 1.1389045.

({'loss': 2.276255, 'global_step': 938}, [])

Eğitim performansını optimize edin

Artık bir modeliniz ve tf.distribute.Strategy tarafından desteklenen çok çalışanı olan bir Tahminciniz var. Çok çalışanı olan eğitimin performansını optimize etmek için aşağıdaki teknikleri deneyebilirsiniz:

  • Toplu iş boyutunu artırın: Burada belirtilen toplu iş boyutu GPU başınadır. Genel olarak, GPU belleğine uyan en büyük toplu iş boyutu önerilir.
  • Değişkenleri tf.float Mümkünse değişkenleri tf.float . Resmi ResNet modeli, bunun nasıl yapılabileceğine dair bir örnek içerir.
  • Toplu iletişimi kullanın: MultiWorkerMirroredStrategy , birden çok toplu iletişim uygulaması sağlar .

    • RING , ana bilgisayarlar arası iletişim katmanı olarak gRPC'yi kullanarak halka tabanlı kolektifleri uygular.
    • NCCL , kolektifleri uygulamak için Nvidia'nın NCCL kullanır.
    • AUTO , seçimi çalışma zamanına erteler.

    En iyi toplu uygulama seçimi, GPU'ların sayısına ve türüne ve kümedeki ağ bağlantısına bağlıdır. Otomatik seçimi geçersiz kılmak için MultiWorkerMirroredStrategy communication parametresine geçerli bir değer belirtin, örneğin communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

TensorFlow modellerinizin performansını optimize etmek için kullanabileceğiniz diğer stratejiler ve araçlar hakkında daha fazla bilgi edinmek için kılavuzdaki Performans bölümünü ziyaret edin.

Diğer kod örnekleri

  1. Kubernetes şablonlarını kullanarak tensorflow / ekosistemde çoklu çalışan eğitimi için uçtan uca örnek . Bu örnek bir Keras modeliyle başlar ve tf.keras.estimator.model_to_estimator API'sini kullanarak bunu bir Tahminciye dönüştürür.
  2. Birçoğu birden fazla dağıtım stratejisi yürütmek üzere yapılandırılabilen resmi modeller .