Yardım Kaggle üzerinde TensorFlow ile Büyük Bariyer Resifi korumak Meydan Üyelik

Keras ile çok çalışanlı eğitim

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Bu öğretici bir Keras modeli ve çoklu işçi dağıtılan eğitim gerçekleştirmek gösterilmiştir Model.fit kullanarak API tf.distribute.Strategy API özellikle tf.distribute.MultiWorkerMirroredStrategy sınıfını. Bu stratejinin yardımıyla, tek bir çalışan üzerinde çalışacak şekilde tasarlanmış bir Keras modeli, minimum kod değişikliği ile birden fazla çalışan üzerinde sorunsuz bir şekilde çalışabilir.

Daha derin bir anlayış ilgilenenler için tf.distribute.Strategy API'ler, TensorFlow içinde Dağıtılmış eğitim rehberi TensorFlow dağıtım stratejileri destekler genel bir bakış için kullanılabilir.

Nasıl kullanılacağını öğrenmek için MultiWorkerMirroredStrategy keras ve özel bir eğitim döngü ile, bakın keras ve MultiWorkerMirroredStrategy ile özel eğitim döngü .

Bu öğreticinin amacının, iki işçi ile minimal çok işçili bir örnek göstermek olduğunu unutmayın.

Kurmak

Bazı gerekli ithalatlarla başlayın:

import json
import os
import sys

TensorFlow'u içe aktarmadan önce ortamda birkaç değişiklik yapın:

  1. Tüm GPU'ları devre dışı bırakın. Bu, aynı GPU'yu kullanmaya çalışan çalışanların neden olduğu hataları önler. Gerçek dünyadaki bir uygulamada, her işçi farklı bir makinede olacaktır.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Reset TF_CONFIG (daha sonra bu konuda daha fazla bilgi edineceksiniz) ortam değişkeni:
os.environ.pop('TF_CONFIG', None)
  1. Emin geçerli dizin Python'un açık olduğundan emin olun yolu-bu tarafından yazılmış dosyaları almak için dizüstü verir %%writefile sonra:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Şimdi TensorFlow'u içe aktarın:

import tensorflow as tf

Veri kümesi ve model tanımı

Sonra, bir oluşturmak mnist.py basit bir model ve veri kümesi kurulum ile dosyayı. Bu Python dosyası, bu öğreticide çalışan süreçler tarafından kullanılacaktır:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Tek bir işçi üzerinde model eğitimi

Çağlardakinden az sayıda modeli eğitim deneyin ve doğru olarak her şeyin olup olmadığını saptamak için tek bir işçinin sonuçlarını gözlemleyin. Eğitim ilerledikçe, kayıp düşmeli ve doğruluk artmalıdır.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

Çok çalışanlı yapılandırma

Şimdi çok işçili eğitim dünyasına girelim.

İşler ve görevler içeren bir küme

Bir: TensorFlow olarak, dağıtılan eğitim gerektirir 'cluster' birkaç işlerle ve işlerin her bir veya daha fazla olabilir 'task' s.

Sen gerekecektir TF_CONFIG muhtemelen farklı bir rolü vardır, her biri birden makinelerde eğitim için yapılandırma ortam değişkeni. TF_CONFIG kümesinin parçası olan her işçi için küme yapılandırmasını belirtmek için kullanılan bir JSON dizedir.

Bir iki bileşeni vardır TF_CONFIG : değişken 'cluster' ve 'task' .

  • A 'cluster' tüm işçiler için aynıdır ve bu şekilde işler farklı türleri, oluşan dict olan eğitim küme, hakkında bilgi veren 'worker' veya 'chief' .

    • Çoklu işçi eğitimde tf.distribute.MultiWorkerMirroredStrategy genelde kendilerinin orada 'worker' böyle düzenli olanlara ilaveten, bir kontrol noktası tasarrufu ve TensorBoard için özet dosyası yazma gibi sorumluluk üstlendiği 'worker' yapar. Böyle 'worker' (Bir iş adı ile baş işçisi olarak adlandırılır 'chief' ).
    • İçin JavaScript kodunu 'chief' olması 'index' 0 atanacak (aslında, bu nasıl tf.distribute.Strategy uygulanmaktadır).
  • A 'task' şimdiki görevin bilgi sağlar ve her işçi için farklıdır. O belirten 'type' ve 'index' o işçinin.

Aşağıda örnek bir yapılandırma verilmiştir:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

İşte aynıdır TF_CONFIG bir JSON dizesi olarak tefrika:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Not tf_config Python sadece yerel bir değişkendir. Bir eğitim yapılandırması için kullanmak mümkün olabilmesi için, bu dict bir JSON olarak seri hale getirilmiş ve bir de yerleştirilmesi gerekir TF_CONFIG ortam değişkeni.

Yukarıdaki örnek yapılandırmasında, görev set 'type' için 'worker' ve görev 'index' için 0 . Bu nedenle, bu makine ilk işçi. Bu olarak atanacak 'chief' işçi ve diğerlerine göre daha fazla iş yapmak.

Gösterim amacıyla, bu öğretici şov Eğer bir nasıl kurabileceği TF_CONFIG bir iki işçilerle değişkeni localhost .

Uygulamada, harici IP adresleri / limanlarda üzerinde birden işçileri oluşturmak ve bir kurarsınız TF_CONFIG buna göre her işçi üzerindeki değişken.

Bu öğreticide iki işçi kullanacaksınız:

  • İlk ( 'chief' ) işçinin TF_CONFIG yukarıda gösterilmiştir.
  • İkinci işçi için, ayarlayacaktır tf_config['task']['index']=1

Not defterlerinde ortam değişkenleri ve alt süreçler

Alt işlemler, ortam değişkenlerini üstlerinden devralır.

Örneğin, bu Jupyter Notebook işleminde bir ortam değişkenini aşağıdaki gibi ayarlayabilirsiniz:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Ardından, ortam değişkenine bir alt işlemden erişebilirsiniz:

echo ${GREETINGS}
Hello TensorFlow!

Bir sonraki bölümde, geçmek için benzer bir yöntem kullanacağız TF_CONFIG işçi subprocesses için. Gerçek dünya senaryosunda, işlerinizi bu şekilde başlatmazsınız, ancak bu örnekte bu yeterlidir.

Doğru stratejiyi seçin

TensorFlow'da dağıtılmış eğitimin iki ana biçimi vardır:

  • Eğitimin adımlar işçi ve kopyaları arasında senkronize edilir Senkron eğitim, ve
  • Eğitim adımlar kesinlikle senkronize edilmez Asenkron eğitim, (örneğin, parametre sunucu eğitim ).

Bu eğitimde bir örneğini kullanarak senkron çoklu işçi eğitimi gerçekleştirmek gösterilmiştir tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy bütün işçilerin genelinde her cihazda modelin katmanlarında tüm değişkenlerin kopyalarını yaratır. Bu kullanır CollectiveOps agrega degradelere, kolektif iletişim için TensorFlow op ve senkronize değişkenleri tutun. tf.distribute.Strategy kılavuzu Bu stratejiyle ilgili daha fazla ayrıntı vardır.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy üzerinden birden fazla uygulamaları sağlar CommunicationOptions parametresi: 1) RING çapraz ana iletişim tabakası olarak gRPC kullanarak halka-bazlı kolektiflerinin uygular; 2) NCCL kullanan NVIDIA Toplu Haberleşme Kütüphane kolektifleri uygulamak; ve 3) AUTO çalışma zamanı seçeneği ertelemektedir. Toplu uygulamanın en iyi seçimi, GPU'ların sayısına ve türüne ve kümedeki ağ ara bağlantısına bağlıdır.

Modeli eğit

Entegrasyonu ile tf.distribute.Strategy içine API tf.keras , sadece birden-işçilere eğitim modeli bina ve çevreleyen dağıtmak için yapacaktır değiştirmek model.compile() çağrısı iç strategy.scope() . Nasıl ve değişkenler oluşturdu ve söz konusu olan dağıtım stratejisinin kapsamı dikte MultiWorkerMirroredStrategy , yaratılan değişkenler MirroredVariable ler ve bunlar işçilerin her biri üzerinde çoğaltılır.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Aslında ile çalıştırmak için MultiWorkerMirroredStrategy Eğer alt işlemleri çalıştırmak ve geçmeniz gerekir TF_CONFIG onlara.

Gibi mnist.py önce yazılı dosya burada main.py işçilerin her aday olacağını:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

O notta Yukarıdaki kod snippet'inde global_batch_size geçirilen alır, Dataset.batch , ayarlandığında per_worker_batch_size * num_workers . Bu olmasını sağlar, her işçinin toplu işler o per_worker_batch_size bakılmaksızın işçilerin sayısının örnekler.

Geçerli dizin artık her iki Python dosyasını da içeriyor:

ls *.py
main.py
mnist.py

Json-serialize Yani TF_CONFIG ve ortam değişkenleri ekleyin:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Şimdi, çalışacak bir alt işlemi başlatabilir main.py ve kullanımı TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Yukarıdaki komutla ilgili dikkat edilmesi gereken birkaç nokta vardır:

  1. Bu kullanır %%bash bir olan dizüstü "sihirli" bazı kabuk komutları çalıştırmak için.
  2. Bu kullanır --bg çalıştırmak için bayrak bash bu işçi sona çünkü arka planda süreci. Başlamadan önce tüm çalışanları bekler.

Böylece arka plana çalışan işlemi, bu defterin çıktı yazdırmaz &> Daha sonra bir günlük dosyasında ne olduğunu incelemek, böylece bir dosyaya onun çıkış yönlendirir.

Bu nedenle, işlemin başlaması için birkaç saniye bekleyin:

import time
time.sleep(10)

Şimdi, işçinin günlük dosyasına şu ana kadar ne çıktı olduğunu inceleyin:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

Günlük dosyasının son satırı demeliyim: Started server with target: grpc://localhost:12345 . İlk işçi şimdi hazırdır ve diğer tüm işçi(ler)in ilerlemeye hazır olmasını beklemektedir.

Yani güncelleme tf_config ikinci işçinin süreç almak için:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

İkinci işçiyi çalıştırın. Bu, tüm çalışanlar aktif olduğu için eğitimi başlatacaktır (bu nedenle bu süreci arka plana atmaya gerek yoktur):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

İlk çalışan tarafından yazılan günlükleri yeniden kontrol ederseniz, o modelin eğitimine katıldığını öğreneceksiniz:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

Şaşırtıcı değil, bu koştu yavaş testte daha bu yazının başında çalıştırın.

Birden çok işçiyi tek bir makinede çalıştırmak yalnızca ek yük ekler.

Buradaki amaç, eğitim süresini iyileştirmek değil, sadece çok işçili eğitime bir örnek vermekti.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Çok çalışanlı derinlemesine eğitim

Şimdiye kadar, temel bir çok çalışanlı kurulumun nasıl gerçekleştirileceğini öğrendiniz.

Eğitimin geri kalanında, gerçek kullanım durumları için yararlı veya önemli olabilecek diğer faktörleri ayrıntılı olarak öğreneceksiniz.

Veri kümesi parçalama

Çok işçi eğitimde, veri kümesi Kırma işlemi yakınsaması ve performans sağlamak için gereklidir.

Önceki bölümde örnek tarafından sağlanan varsayılan autosharding dayanmaktadır tf.distribute.Strategy API. Sen ayarlayarak Sharding kontrol edebilirsiniz tf.data.experimental.AutoShardPolicy ait tf.data.experimental.DistributeOptions .

Otomatik Kırma işleminde hakkında daha fazla bilgi edinmek için bkz Dağıtılmış giriş kılavuz .

İşte oto böylece her yineleme her örnek (önerilmez) işler o kapalı Sharding açmak için nasıl hızlı bir örneğidir:

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Değerlendirme

Eğer geçerseniz validation_data içine Model.fit , her dönem için eğitim ve değerlendirme arasında değişir. Alarak değerlendirme validation_data tüm işçiler için işçilerin aynı sette dağıldığını ve değerlendirme sonuçları toplanır ve kullanılabilir.

Eğitime benzer şekilde, doğrulama veri kümesi otomatik olarak dosya düzeyinde paylaşılır. Sen onaylama verisinin küresel toplu boyutunu ayarlamak ve ayarlamanız gerekir validation_steps .

Değerlendirme için tekrarlanan bir veri kümesi de önerilir.

Alternatif olarak, kontrol noktalarını düzenli olarak okuyan ve değerlendirmeyi çalıştıran başka bir görev de oluşturabilirsiniz. Tahmincinin yaptığı budur. Ancak bu, değerlendirme yapmak için önerilen bir yol değildir ve bu nedenle ayrıntıları atlanmıştır.

Verim

Artık hepsi ile birden fazla işçi çalıştırmak için ayarlanmış bir Keras modele sahip MultiWorkerMirroredStrategy .

Çok işçili eğitimin performansını ayarlamak için aşağıdakileri deneyebilirsiniz:

  • tf.distribute.MultiWorkerMirroredStrategy birden içerir toplu iletişim uygulamaları :

    • RING çapraz ana iletişim tabakası olarak gRPC kullanarak uygular halka bazlı kollektifler.
    • NCCL kullanan NVIDIA Toplu Haberleşme Kütüphane kolektifleri uygulamaktır.
    • AUTO çalışma zamanı seçeneği ertelemektedir.

    Toplu uygulamanın en iyi seçimi, kümedeki GPU'ların sayısına, GPU'ların türüne ve ağ ara bağlantısına bağlıdır. Otomatik seçim geçersiz belirtmek için communication_options parametresini MultiWorkerMirroredStrategy 'ın yapıcısı. Örneğin:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Değişkenler Cast tf.float mümkünse:

    • Resmi ResNet modeli içeren bir örnek bu yapılabilir nasıl.

Hata toleransı

Eşzamanlı eğitimde, çalışanlardan biri başarısız olursa küme başarısız olur ve herhangi bir arıza giderme mekanizması mevcut değildir.

İle keras kullanma tf.distribute.Strategy işçileri ölür veya başka dengesiz olduğu durumlarda hataya dayanıklılık avantajı ile birlikte gelir. Bunu, seçtiğiniz dağıtılmış dosya sisteminde eğitim durumunu koruyarak yapabilirsiniz; öyle ki, önceden başarısız olan veya önceden alınan örneğin yeniden başlatılmasıyla eğitim durumu kurtarılır.

Bir işçi müsait olmadığında, diğer işçiler (muhtemelen bir zaman aşımından sonra) başarısız olur. Bu gibi durumlarda, uygun olmayan çalışanın ve başarısız olan diğer çalışanların yeniden başlatılması gerekir.

ModelKontrol noktası geri arama

ModelCheckpoint geri arama artık hata toleransı işlevsellik sağlar, kullanın BackupAndRestore yerine geri arama.

ModelCheckpoint geri arama hala kontrol noktaları kaydetmek için kullanılabilir. Ancak bununla eğitim yarıda kaldıysa veya başarıyla tamamlandıysa, eğitime kontrol noktasından devam etmek için modeli manuel olarak yüklemek kullanıcıya aittir.

İsteğe bağlı olarak kullanıcı dışında model / ağırlıkları kaydetmek ve geri yüklemek için tercih edebilirsiniz ModelCheckpoint callback'inde.

Model kaydetme ve yükleme

Kullanarak modelinizi kaydetmek için model.save veya tf.saved_model.save , tasarruf hedef ihtiyaçları her işçi için farklı olmak.

  • Şef olmayan çalışanlar için modeli geçici bir dizine kaydetmeniz gerekecektir.
  • Şef için sağlanan model dizinine kaydetmeniz gerekecek.

Birden çok çalışanın aynı konuma yazmaya çalışmasından kaynaklanan hataları önlemek için, çalışan üzerindeki geçici dizinlerin benzersiz olması gerekir.

Tüm dizinlerde kaydedilen model aynıdır ve tipik olarak, geri yükleme veya hizmet için yalnızca şef tarafından kaydedilen modele başvurulmalıdır.

Eğitiminiz tamamlandıktan sonra işçiler tarafından oluşturulan geçici dizinleri silen bir temizleme mantığına sahip olmalısınız.

Şef ve işçilerden aynı anda tasarruf etmenin nedeni, hem şefin hem de çalışanların allreduce iletişim protokolüne katılmasını gerektiren kontrol noktası sırasında değişkenleri topluyor olmanızdır. Öte yandan, şef ve işçilerin aynı model dizinine kaydetmesine izin vermek, çekişme nedeniyle hatalara neden olacaktır.

Kullanılması MultiWorkerMirroredStrategy , program her işçinin üzerinde çalışan ve mevcut işçi baş olup olmadığını bilmek için olduğu, bu nitelikleri vardır küme çözümleyici nesne yararlanır task_type ve task_id :

  • task_type mevcut iş (örneğin ne olduğunu size söyler 'worker' ).
  • task_id size işçinin tanımlayıcı söyler.
  • İle işçi task_id == 0 baş işçisi olarak belirlenmiştir.

Kodunda aşağıda pasajı, write_filepath fonksiyonu işçinin bağlıdır yazma için dosya yolunu sağlar task_id :

  • (İle baş işçi için task_id == 0 ), orijinal dosya yoluna yazar.
  • Diğer işçiler için bunun geçici dizinine-yaratır temp_dir -with task_id içinde yazma dizin yolundaki:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Bununla, artık kaydetmeye hazırsınız:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Yukarıda açıklandığı gibi, daha sonra model sadece şefin kaydedildiği yoldan yüklenmelidir, bu yüzden şef olmayan işçilerin kaydettiği geçici olanları kaldıralım:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Şimdi, bu yüke zaman geldiğinde, elverişli kullanmasına izin tf.keras.models.load_model API ve daha fazla çalışma ile devam etmektedir.

Burada, sadece yüke tek işçiyi kullanarak ve sen çağrı yapmak ve bu durumda eğitim devam edeceğinin varsayılması tf.keras.models.load_model başka dahilinde strategy.scope() o (not strategy = tf.distribute.MultiWorkerMirroredStrategy() , tanımlanan erken ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

Kontrol noktası kaydetme ve geri yükleme

Öte yandan, kontrol noktası, modelinizin ağırlıklarını kaydetmenize ve tüm modeli kaydetmenize gerek kalmadan bunları geri yüklemenize olanak tanır.

Burada, bir yaratacağız tf.train.Checkpoint tarafından yönetilen modelini izler tf.train.CheckpointManager sadece son kontrol noktası korunur böylece,:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Bir kez CheckpointManager kurulur, artık kaydedip olmayan baş işçiler kurtarmıştı kontrol noktalarının kaldırılması için hazırsınız:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Eğer modelini geri gerektiğinde Şimdi, elverişli kullanılarak kaydedilen son kontrol noktasını bulabilirsiniz tf.train.latest_checkpoint fonksiyonunu. Kontrol noktasını geri yükledikten sonra eğitime devam edebilirsiniz.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

BackupAndRestore geri arama

tf.keras.callbacks.experimental.BackupAndRestore geri arama altında geçici bir denetim noktası dosyası model ve cari dönem numarasını yedekleyerek hata toleransı işlevsellik sağlar backup_dir için argüman BackupAndRestore . Bu, her dönemin sonunda yapılır.

İşler kesintiye uğrayıp yeniden başladığında, geri arama son kontrol noktasını geri yükler ve eğitim kesintiye uğrayan dönemin başından itibaren devam eder. Kesintiden önce bitmemiş çağda halihazırda yapılmış olan herhangi bir kısmi eğitim, nihai model durumunu etkilememesi için atılacaktır.

Kullanmak için, bir örneğini temin tf.keras.callbacks.experimental.BackupAndRestore de Model.fit çağrısı.

İle MultiWorkerMirroredStrategy bir işçi kesintiye uğradıktan sonra da kesintiye işçinin yeniden dek, tüm küme duraklar. Diğer çalışanlar da yeniden başlatılır ve kesintiye uğrayan çalışan kümeye yeniden katılır. Ardından, her çalışan önceden kaydedilmiş olan kontrol noktası dosyasını okur ve önceki durumunu alır, böylece kümenin tekrar eşitlenmesine izin verir. Ardından eğitim devam ediyor.

BackupAndRestore geri arama kullanan CheckpointManager kaydetmek ve parçalar yeni biri ile birlikte kontrol noktaları mevcut olduğunu kontrol noktası adlı bir dosya oluşturur eğitim durumu, geri yükleyin. Bu nedenle, backup_dir önlemek adı çarpışması amacıyla diğer kontrol noktaları saklamak için kullanılan yeniden olmamalıdır.

Şu anda, BackupAndRestore geri arama MultiWorkerMirroredStrategy ile hiçbir strateji, MirroredStrategy ve çoklu işçi ile tek işçi destekler. Aşağıda hem çok işçili eğitim hem de tek işçili eğitim için iki örnek verilmiştir.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

Eğer dizinini kontrol ederse backup_dir size belirtilen BackupAndRestore , bazı geçici oluşturulan denetim noktası dosyaları görebilirsiniz. O dosyalar daha önce kayıp örneklerini kurtarmak için gereklidir ve bunlar sonunda kütüphane tarafından silinecektir Model.fit eğitiminizin başarılı çıkarken.

Ek kaynaklar

  1. TensorFlow içinde Dağıtılmış eğitim rehberi mevcut dağıtım stratejilerinin bir bakış sağlar.
  2. Keras ve MultiWorkerMirroredStrategy ile özel eğitim döngü öğretici gösterileri nasıl kullanmak MultiWorkerMirroredStrategy keras ile ve özel bir eğitim döngü.
  3. Check out resmi modellerini çoklu dağıtım stratejilerini çalıştırmak için yapılandırılabilir birçoğu.
  4. Tf.function daha iyi performans rehber gibi diğer stratejiler ve araçlar hakkında bilgi sağlar TensorFlow Profiler size TensorFlow modellerinin performansını optimize etmek için kullanabilirsiniz.