Yardım Kaggle üzerinde TensorFlow ile Büyük Bariyer Resifi korumak Meydan Üyelik

Keras ve MultiWorkerMirrorredStrategy ile özel eğitim döngüsü

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Bir Keras modeli üzerinde çalışacak şekilde tasarlanan bu yüzden Bu eğitimde, MultiWorkerMirroredStrategy aracılığıyla dağıtılan özel eğitim döngü API, çoklu işçi eğitimi gösteren tek işçinin asgari kod değişikliği ile birden işçilere can sorunsuz çalışması.

Modelimizi eğitmek için özel eğitim döngüleri kullanıyoruz çünkü bunlar bize eğitim üzerinde esneklik ve daha fazla kontrol sağlıyor. Ayrıca, modelde ve eğitim döngüsünde hata ayıklamak daha kolaydır. Daha detaylı bilgi mevcuttur sıfırdan bir eğitim döngü yazma .

Eğer nasıl kullanılacağı arıyorsanız MultiWorkerMirroredStrategy keras ile model.fit , bu başvurun öğretici yerine.

TensorFlow Eğitim Dağıtılmış dağıtım stratejileri genel bir bakış için rehber mevcuttur TensorFlow daha derin bir anlayışa ilgilenenler için destekler tf.distribute.Strategy API'leri.

Kurmak

İlk olarak, bazı gerekli ithalatlar.

import json
import os
import sys

TensorFlow'u içe aktarmadan önce ortamda birkaç değişiklik yapın.

Tüm GPU'ları devre dışı bırakın. Bu, aynı GPU'yu kullanmaya çalışan çalışanların neden olduğu hataları önler. Gerçek bir uygulama için her işçi farklı bir makinede olacaktır.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Reset TF_CONFIG ortam değişkeni, bu konuda daha sonra göreceksiniz.

os.environ.pop('TF_CONFIG', None)

Geçerli dizinin python yolunda olduğundan emin olun. Bu tarafından yazılmış dosyaları almak için dizüstü verir %%writefile sonra.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Şimdi TensorFlow'u içe aktarın.

import tensorflow as tf

Veri kümesi ve model tanımı

Sonraki bir oluşturmak mnist.py basit bir model ve veri kümesi kurulum ile dosyayı. Bu python dosyası, bu öğreticide çalışan işlemler tarafından kullanılacaktır:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Çok Çalışan Yapılandırması

Şimdi çok işçili eğitim dünyasına girelim. TensorFlow yılında TF_CONFIG ortam değişkeni muhtemelen farklı bir rolü vardır, her biri birden fazla makine üzerinde eğitim için gereklidir. TF_CONFIG Aşağıda kullanılan, kümenin parçası olan her işçi üzerindeki küme yapılandırması belirtmek için kullanılan bir JSON dizedir. Bu, bir küme belirterek kullanmak için varsayılan yöntemdir cluster_resolver.TFConfigClusterResolver ancak mevcut diğer seçenek vardır distribute.cluster_resolver modülü.

Kümenizi tanımlayın

İşte örnek bir yapılandırma:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

İşte aynıdır TF_CONFIG bir JSON dizesi olarak tefrika:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

İki bileşeni vardır TF_CONFIG : cluster ve task .

  • cluster bütün işçiler için aynıdır ve bu şekilde işler farklı türde oluşan bir dict olduğunu antrenman küme, hakkında bilgi veren worker . Çoklu işçi eğitimde MultiWorkerMirroredStrategy , genellikle bir tane worker kontrol noktasını kaydetme ve düzenli olanlara ilaveten TensorBoard için özet dosyası yazmak gibi biraz daha sorumluluk alır worker yapar. Böyle bir işçinin olarak adlandırılır chief işçisi, ve gelenektir worker ile index 0 baş olarak atandı worker (aslında bu nasıl tf.distribute.Strategy uygulanmaktadır).

  • task güncel görevin bilgi sağlar ve her işçi üzerinde farklıdır. Bu belirtir type ve index o işçinin.

Bu örnekte, görev set type için "worker" ve görev index için 0 . Bu makine ilk işçidir ve baş işçi olarak atanacak ve diğerlerinden daha fazla iş yapacaktır. Diğer makineler olması gerektiğini Not TF_CONFIG yanı ortam değişkeni seti ve aynı olmalıdır cluster dicti, ancak farklı görev type veya görev index o makinelerin rolleri ne bağlı.

Gösterim amacıyla, bu öğretici şov biri belirleyebilir nasıl TF_CONFIG 2 işçi ile localhost . Uygulamada, kullanıcılar dış IP adresleri / bağlantı noktaları ve set üzerinde birden fazla işçi yaratacak TF_CONFIG uygun her işçi üzerinde.

Bu örnekte ilk işçinin, 2 işçi kullanacak TF_CONFIG yukarıda gösterilmiştir. İkinci işçi için, kuracak tf_config['task']['index']=1

Yukarıda, tf_config python sadece yerel bir değişkendir. Aslında yapılandırmak eğitime kullanmak için, bu sözlükte ihtiyaçlar JSON olarak tefrika ve yerleştirilecek TF_CONFIG ortam değişkeni.

Not defterlerinde ortam değişkenleri ve alt süreçler

Alt işlemler, ortam değişkenlerini üstlerinden devralır. Yani eğer bu bir ortam değişkeni ayarlamak jupyter notebook süreci:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Ortam değişkenine bir alt işlemden erişebilirsiniz:

echo ${GREETINGS}
Hello TensorFlow!

Bir sonraki bölümde, geçmek için kullanacağız TF_CONFIG işçi subprocesses için. İşlerinizi asla bu şekilde başlatmazsınız, ancak bu öğreticinin amaçları için yeterlidir: Minimal bir çok işçili örnek göstermek.

Çoklu ÇalışanYansıtmalıStrateji

Modelini eğitmek için bir örneğini kullanmak tf.distribute.MultiWorkerMirroredStrategy bütün işçilerin genelinde her cihazda modelin katmanlarında tüm değişkenlerin kopyalarını oluşturur. tf.distribute.Strategy kılavuzu Bu stratejiyle ilgili daha fazla ayrıntı vardır.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2021-11-23 02:29:16.957442: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:16.957748: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

Kullanım tf.distribute.Strategy.scope modelinizi oluştururken bir strateji kullanılması gerektiğini belirtmek için. "Bu koyar Eğer çapraz çoğaltma bağlamında strateji değişken yerleştirme gibi şeyler kontrolü koymak demektir bu strateji için".

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Verilerinizi çalışanlar arasında otomatik olarak parçalayın

Çok çalışanlı eğitimde, veri kümesi parçalama mutlaka gerekli değildir, ancak size tam olarak bir kez daha fazla eğitimi daha tekrarlanabilir hale getiren anlambilim sağlar, yani birden fazla çalışanın eğitimi, bir çalışanın eğitimiyle aynı olmalıdır. Not: performans bazı durumlarda etkilenebilir.

Bkz: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step

Özel Eğitim Döngüsünü Tanımlayın ve Modeli Eğitin

Bir optimize edici belirtin

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

İle bir eğitim adımı tanımlayın tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Kontrol noktası kaydetme ve geri yükleme

Özel Eğitim Döngüsündeki denetim noktası uygulaması, kullanıcının bir keras geri araması kullanmak yerine bunu işlemesini gerektirir. Modelin ağırlıklarını kaydetmenize ve tüm modeli kaydetmenize gerek kalmadan geri yüklemenize olanak tanır.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Burada, bir yaratacağız tf.train.Checkpoint parçaları bir tarafından yönetilen model tf.train.CheckpointManager yüzden sadece son kontrol noktası korunduğunu.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Geri yüklemek gerektiğinde Şimdi, elverişli kullanılarak kaydedilen son kontrol noktasını bulabilirsiniz tf.train.latest_checkpoint fonksiyonunu.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Kontrol noktasını geri yükledikten sonra özel eğitim döngünüzü eğitmeye devam edebilirsiniz.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2021-11-23 02:29:18.214294: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.826228, train_loss: 0.540131.
Epoch: 1, accuracy: 0.937946, train_loss: 0.207413.
Epoch: 2, accuracy: 0.960603, train_loss: 0.137420.

İşçiler üzerinde tam kod kurulumu

Aslında ile çalıştırmak için MultiWorkerMirroredStrategy Eğer alt işlemleri çalıştırmak ve geçmeniz gerekir TF_CONFIG onlara.

Gibi mnist.py önce yazılı dosya burada main.py işçilerin her çalıştırmak olacaktır bu nedenle bu CoLab daha önce adım adım yürüdü aynı kod içerir, sadece bir dosyaya yazıyoruz:

Dosya: main.py

Writing main.py

Eğitin ve Değerlendirin

Geçerli dizin artık her iki Python dosyasını da içeriyor:

ls *.py
main.py
mnist.py

Json-serialize Yani TF_CONFIG ve ortam değişkenleri ekleyin:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Şimdi, çalışacak bir alt işlemi başlatabilir main.py ve kullanımı TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Yukarıdaki komutla ilgili dikkat edilmesi gereken birkaç nokta vardır:

  1. Bu kullanır %%bash bir olan dizüstü "sihirli" bazı kabuk komutları çalıştırmak için.
  2. Bu kullanır --bg çalıştırmak için bayrak bash bu işçi sona çünkü arka planda süreci. Başlamadan önce tüm çalışanları bekler.

Böylece arka plana çalışan işlemi, bu defterin çıktı yazdırmaz &> bir dosyaya onun çıkış yönlendirir ne olduğunu görebilmek için,.

Bu nedenle, işlemin başlaması için birkaç saniye bekleyin:

import time
time.sleep(20)

Şimdi, işçinin günlük dosyasına şu ana kadar ne çıktı olduğuna bakın:

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration

Günlük dosyasının son satırı demeliyim: Started server with target: grpc://localhost:12345 . İlk işçi şimdi hazırdır ve diğer tüm işçi(ler)in ilerlemeye hazır olmasını beklemektedir.

Yani güncelleme tf_config ikinci işçinin süreç almak için:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Şimdi ikinci işçiyi çalıştırın. Bu, tüm çalışanlar aktif olduğu için eğitimi başlatacaktır (bu nedenle bu süreci arka plana atmaya gerek yoktur):

python main.py > /dev/null 2>&1

Şimdi, ilk işçi tarafından yazılan günlükleri tekrar kontrol ederseniz, o modelin eğitimine katıldığını göreceksiniz:

cat job_0.log
2021-11-23 02:29:29.831748: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-11-23 02:29:29.832003: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.82.0 -- cannot find working devices in this configuration
2021-11-23 02:29:50.709898: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.820424, train_loss: 0.575663.
Epoch: 1, accuracy: 0.927344, train_loss: 0.241324.
Epoch: 2, accuracy: 0.953237, train_loss: 0.154762.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Çok çalışanlı derinlemesine eğitim

Bu öğretici bir göstermiştir Custom Training Loop çok işçi kurulum iş akışı. Diğer konular ayrıntılı bir açıklaması mevcuttur model.fit's guide CTLlere çok işçi kurulum ve uygulanabilir.

Ayrıca bakınız

  1. TensorFlow içinde Dağıtılmış Eğitim kılavuzunda mevcut dağıtım stratejilerinin bir bakış sağlar.
  2. Resmi modeller çoklu dağıtım stratejilerini çalıştırmak için yapılandırılabilir birçoğu.
  3. Performansı bölümünde kılavuzunda diğer strateji ve hakkında bilgi veren araçlar size TensorFlow modellerinin performansını optimize etmek için kullanabilirsiniz.