![]() | ![]() | ![]() | ![]() |
Genel Bakış
Bu eğitimde nasıl kullanılacağını gösterir tf.distribute.Strategy
ile dağıtılan çoklu işçi eğitimi için kullanılabilir tf.estimator
. Kodunuzu tf.estimator
kullanarak tf.estimator
ve yüksek performanslı tek bir makinenin ötesinde ölçeklendirmeyle ilgileniyorsanız, bu eğitim tam size göre.
Başlamadan önce lütfen dağıtım stratejisi kılavuzunu okuyun. Çoklu GPU eğitim öğreticisi de konuyla ilgilidir çünkü bu eğitimde aynı modeli kullanır.
Kurmak
İlk olarak, TensorFlow'u ve gerekli içe aktarmaları kurun.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
Giriş işlevi
Bu öğreticide, TensorFlow Veri Kümelerinden MNIST veri kümesini kullanır. Buradaki kod, tek bir temel farkla çoklu GPU eğitim öğreticisine benzer: Çok çalışanlı eğitim için Estimator kullanılırken, model yakınsamasını sağlamak için veri kümesini çalışan sayısına göre parçalamak gerekir. Girdi verileri, çalışan dizini tarafından 1/num_workers
, böylece her işçi, veri kümesinin 1/num_workers
farklı bölümlerini işler.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
Yakınsamayı başarmak için bir başka makul yaklaşım, veri kümesini her çalışanda farklı tohumlarla karıştırmaktır.
Çok çalışanlı yapılandırma
Bu öğreticideki temel farklardan biri ( çoklu GPU eğitim öğreticisine kıyasla), çok çalışanlı kurulumdur. TF_CONFIG
ortam değişkeni, kümenin parçası olan her çalışana küme yapılandırmasını belirtmenin standart yoludur.
TF_CONFIG
iki bileşeni vardır: cluster
ve task
. cluster
, cluster
tamamı, yani kümedeki çalışanlar ve parametre sunucuları hakkında bilgi sağlar. task
, geçerli task
hakkında bilgi sağlar. İlk bileşen cluster
, cluster
tüm çalışanlar ve parametre sunucuları için aynıdır ve ikinci bileşen task
, her çalışan ve parametre sunucusunda farklıdır ve kendi type
ve index
belirtir. Bu örnekte, görev type
worker
ve görev index
0
.
Örnekleme amacıyla, bu öğretici localhost
2 çalışanlı bir TF_CONFIG
nasıl ayarlanacağını gösterir. Uygulamada, harici bir IP adresi ve bağlantı noktası üzerinde birden fazla işçi oluşturursunuz ve her bir işçi için uygun şekilde TF_CONFIG
ayarlarsınız, yani görev index
değiştirirsiniz.
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
Modeli tanımlayın
Eğitim için katmanları, optimize ediciyi ve kayıp işlevini yazın. Bu eğitici, modeli çoklu GPU eğitim dersine benzer şekilde Keras katmanlarıyla tanımlar.
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredStrateji
Modeli eğitmek için tf.distribute.experimental.MultiWorkerMirroredStrategy
örneğini kullanın. MultiWorkerMirroredStrategy
, tüm çalışanlar genelinde her cihazda modelin katmanlarında bulunan tüm değişkenlerin kopyalarını oluşturur. Gradyanları toplamak ve değişkenleri senkronize tutmak için toplu iletişim için bir TensorFlow op olan CollectiveOps
kullanır. tf.distribute.Strategy
kılavuzu bu strateji hakkında daha fazla ayrıntı içerir.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',) INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO
Modeli eğitin ve değerlendirin
Daha sonra, tahminci için RunConfig
dağıtım stratejisini belirtin ve RunConfig
çağırarak tf.estimator.train_and_evaluate
ve değerlendirin. Bu öğretici, stratejiyi train_distribute
aracılığıyla belirterek yalnızca eğitimi train_distribute
. Değerlendirmeyi eval_distribute
ile dağıtmak da mümkündür.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f4c6c18af98>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Iterator.get_next_as_optional()` instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:339: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Iterator.get_next_as_optional()` instead. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is. Cause: could not parse the source code: lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,)) This error may be avoided by creating the lambda in a standalone statement. To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is. Cause: could not parse the source code: lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,)) This error may be avoided by creating the lambda in a standalone statement. To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7f4c8dcaf730> and will run it as-is. Cause: could not parse the source code: lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,)) This error may be avoided by creating the lambda in a standalone statement. To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:loss = 2.3278575, step = 0 INFO:tensorflow:loss = 2.3278575, step = 0 INFO:tensorflow:global_step/sec: 201.897 INFO:tensorflow:global_step/sec: 201.897 INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec) INFO:tensorflow:loss = 2.3006024, step = 100 (0.498 sec) INFO:tensorflow:global_step/sec: 215.773 INFO:tensorflow:global_step/sec: 215.773 INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec) INFO:tensorflow:loss = 2.2919793, step = 200 (0.463 sec) INFO:tensorflow:global_step/sec: 213.717 INFO:tensorflow:global_step/sec: 213.717 INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec) INFO:tensorflow:loss = 2.286222, step = 300 (0.468 sec) INFO:tensorflow:global_step/sec: 215.652 INFO:tensorflow:global_step/sec: 215.652 INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec) INFO:tensorflow:loss = 2.2875795, step = 400 (0.464 sec) INFO:tensorflow:global_step/sec: 215.686 INFO:tensorflow:global_step/sec: 215.686 INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec) INFO:tensorflow:loss = 2.3000607, step = 500 (0.466 sec) INFO:tensorflow:global_step/sec: 217.858 INFO:tensorflow:global_step/sec: 217.858 INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec) INFO:tensorflow:loss = 2.2862964, step = 600 (0.457 sec) INFO:tensorflow:global_step/sec: 216.886 INFO:tensorflow:global_step/sec: 216.886 INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec) INFO:tensorflow:loss = 2.2848775, step = 700 (0.463 sec) INFO:tensorflow:global_step/sec: 242.69 INFO:tensorflow:global_step/sec: 242.69 INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec) INFO:tensorflow:loss = 2.2776775, step = 800 (0.409 sec) INFO:tensorflow:global_step/sec: 621.93 INFO:tensorflow:global_step/sec: 621.93 INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec) INFO:tensorflow:loss = 2.283049, step = 900 (0.161 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z INFO:tensorflow:Starting evaluation at 2020-09-11T01:27:54Z INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 1.01975s INFO:tensorflow:Inference Time : 1.01975s INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55 INFO:tensorflow:Finished evaluation at 2020-09-11-01:27:55 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.276255 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.1389045. INFO:tensorflow:Loss for final step: 1.1389045. ({'loss': 2.276255, 'global_step': 938}, [])
Eğitim performansını optimize edin
Artık bir modeliniz ve tf.distribute.Strategy
tarafından desteklenen çok çalışanı olan bir Tahminciniz var. Çok çalışanı olan eğitimin performansını optimize etmek için aşağıdaki teknikleri deneyebilirsiniz:
- Toplu iş boyutunu artırın: Burada belirtilen toplu iş boyutu GPU başınadır. Genel olarak, GPU belleğine uyan en büyük toplu iş boyutu önerilir.
- Değişkenleri
tf.float
Mümkünse değişkenleritf.float
. Resmi ResNet modeli, bunun nasıl yapılabileceğine dair bir örnek içerir. Toplu iletişimi kullanın:
MultiWorkerMirroredStrategy
, birden çok toplu iletişim uygulaması sağlar .-
RING
, ana bilgisayarlar arası iletişim katmanı olarak gRPC'yi kullanarak halka tabanlı kolektifleri uygular. -
NCCL
, kolektifleri uygulamak için Nvidia'nınNCCL
kullanır. -
AUTO
, seçimi çalışma zamanına erteler.
En iyi toplu uygulama seçimi, GPU'ların sayısına ve türüne ve kümedeki ağ bağlantısına bağlıdır. Otomatik seçimi geçersiz kılmak için
MultiWorkerMirroredStrategy
communication
parametresine geçerli bir değer belirtin, örneğincommunication=tf.distribute.experimental.CollectiveCommunication.NCCL
.-
TensorFlow modellerinizin performansını optimize etmek için kullanabileceğiniz diğer stratejiler ve araçlar hakkında daha fazla bilgi edinmek için kılavuzdaki Performans bölümünü ziyaret edin.
Diğer kod örnekleri
- Kubernetes şablonlarını kullanarak tensorflow / ekosistemde çoklu çalışan eğitimi için uçtan uca örnek . Bu örnek bir Keras modeliyle başlar ve
tf.keras.estimator.model_to_estimator
API'sini kullanarak bunu bir Tahminciye dönüştürür. - Birçoğu birden fazla dağıtım stratejisi yürütmek üzere yapılandırılabilen resmi modeller .