本頁面由 Cloud Translation API 翻譯而成。
Switch to English

Keras的多工人培訓

在TensorFlow.org上查看 在Google Colab中運行 在GitHub上查看源代碼 下載筆記本

總覽

本教程演示了使用tf.distribute.Strategy API(特別是tf.distribute.experimental.MultiWorkerMirroredStrategy使用tf.distribute.Strategy模型進行tf.distribute.Strategy分佈式培訓。借助該策略,設計為在單個工作人員上運行的Keras模型可以以最少的代碼更改無縫地在多個工作人員上工作。

TensorFlow中的分佈式培訓指南可用於概述TensorFlow為那些對tf.distribute.Strategy API的深入了解感興趣的人提供的分發策略。

建立

首先,設置TensorFlow和必要的導入。

 import os
import tensorflow as tf
import numpy as np
 

準備數據集

現在,讓我們準備MNIST數據集。 MNIST數據集包含60,000個訓練示例和10,000個手寫數字0-9的測試示例,格式為28x28像素的單色圖像。在此示例中,我們將使用數據集的訓練部分進行演示。

 def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset
 

建立Keras模型

在這裡,我們使用tf.keras.Sequential API來構建和編譯簡單的捲積神經網絡Keras模型,以訓練我們的MNIST數據集。

 def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
 

首先,讓我們嘗試為少數時期訓練模型,並在單個工作程序中觀察結果,以確保一切正常。隨著時代的發展,您應該會看到損耗下降和精度接近1.0。

 per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 0s 2ms/step - loss: 2.2701 - accuracy: 0.2451
Epoch 2/3
70/70 [==============================] - 0s 2ms/step - loss: 2.1827 - accuracy: 0.4777
Epoch 3/3
70/70 [==============================] - 0s 2ms/step - loss: 2.0865 - accuracy: 0.5955

<tensorflow.python.keras.callbacks.History at 0x7fc59381ac50>

多工配置

現在,讓我們進入多工人培訓的世界。在TensorFlow中, TF_CONFIG環境變量是在多台機器上進行訓練所必需的,每台機器可能具有不同的作用。 TF_CONFIG是一個JSON字符串,用於在屬於集群的每個工作服務器上指定集群配置。

TF_CONFIG有兩個組件: clustertaskcluster提供有關培訓群集的信息,這是由不同類型的工作(例如worker 。在多工人培訓與MultiWorkerMirroredStrategy ,通常有一個worker ,負責就喜歡節省檢查站除了什麼是常規寫總結文件TensorBoard多一點責任worker一樣。此類工作人員稱為chief工作人員,通常將index 0的worker指定為主要worker (實際上這就是tf.distribute.Strategy的實現方式)。 task ,另一方面提供了當前任務的信息。第一個組件cluster對所有工作程序都相同,而第二個組件task在每個工作程序上均不同,並指定該工作程序的typeindex

在此示例中,我們將任務type設置為"worker"並將任務index0 。這意味著具有這種設置的機器是第一個工人,該工人將被任命為主要工人,並且比其他工人做更多的工作。請注意,其他計算機也需要設置TF_CONFIG環境變量,並且它應該具有相同的cluster字典,但是根據這些計算機的角色,可以使用不同的任務type或任務index

出於說明目的,本教程顯示瞭如何在localhost上使用2個worker設置TF_CONFIG 。實際上,用戶將在外部IP地址/端口上創建多個工作TF_CONFIG ,並在每個工作TF_CONFIG上適當地設置TF_CONFIG

 os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
 

注意,雖然在此示例中學習率是固定的,但通常可能需要基於全局批處理大小來調整學習率。

選擇正確的策略

在TensorFlow中,分佈式培訓包括同步培訓和異步培訓,在同步培訓中,培訓步驟跨工作人員和副本進行同步,異步培訓中的培訓步驟未嚴格同步。

MultiWorkerMirroredStrategy ,這是同步多工培訓的推薦策略。要訓練模型,請使用tf.distribute.experimental.MultiWorkerMirroredStrategy的實例。 MultiWorkerMirroredStrategy在所有工作人員的每個設備上的模型層中創建所有變量的副本。它使用CollectiveOps (一個TensorFlow op進行集體通信)來聚合梯度並使變量保持同步。 tf.distribute.Strategy指南提供了有關此策略的更多詳細信息。

 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy通過CollectiveCommunication參數提供了多種實現。 RING使用gRPC作為跨主機通信層實現基於環的集合。 NCCL使用Nvidia的NCCL實施集體。 AUTO將選擇推遲到運行時。集體實施的最佳選擇取決於GPU的數量和種類以及集群中的網絡互連。

使用MultiWorkerMirroredStrategy訓練模型

通過將tf.distribute.Strategy API集成到tf.keras ,將培訓分發給多人的唯一更改就是將模型構建和對model.compile()調用封裝在strategy.scope() 。分發策略的範圍規定瞭如何創建變量以及在何處創建變量,對於MultiWorkerMirroredStrategy ,創建的變量是MirroredVariable ,並將其複製到每個工作MultiWorkerMirroredStrategy上。

 num_workers = 4

# Here the batch size scales up by number of workers since 
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, 
# and now this becomes 128.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
 
Epoch 1/3
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
70/70 [==============================] - 0s 3ms/step - loss: 2.2682 - accuracy: 0.2265
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1714 - accuracy: 0.4954
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.0638 - accuracy: 0.6232

<tensorflow.python.keras.callbacks.History at 0x7fc5f4f062e8>

數據集分片和批量大小

在使用MultiWorkerMirroredStrategy多工人培訓時,需要對數據集進行分片以確保收斂和性能。但是,請注意,在上面的代碼片段中,無需分片即可將數據集直接傳遞到model.fit() 。這是因為tf.distribute.Strategy API自動處理數據集分片。它在文件級別對數據集進行分片,這可能會創建傾斜的分片。在只有一個文件的極端情況下,只有第一個分片(即工作程序)將獲得培訓或評估數據,結果所有工作程序都將獲得錯誤。

如果您更喜歡手動分片進行培訓,則可以通過tf.data.experimental.DistributeOptions api關閉自動分片。具體來說,

 options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
 

要注意的另一件事是datasets的批處理大小。在上面的代碼段中,我們使用global_batch_size = per_worker_batch_size * num_workers ,這是num_workers大小,是單個工人的情況的兩倍,因為每個工人的有效批處理大小是全局批處理大小(在tf.data.Dataset.batch()傳遞的參數tf.data.Dataset.batch() )除以工作人員的數量,並且通過此更改,我們使每個工作人員的批處理大小與以前相同。

評價

如果將validation_data傳遞給model.fit ,則它將在每個時期的訓練和評估之間交替。接受validation_data的評估分佈在同一組工作人員中,並且評估結果匯總並可供所有工作人員使用。與訓練類似,驗證數據集會在文件級別自動分片。您需要在驗證數據集中設置全局批處理大小並設置validation_steps 。還建議使用重複的數據集進行評估。

另外,您還可以創建另一個任務,該任務定期讀取檢查點並運行評估。這就是Estimator所做的。但這不是推薦的評估方法,因此省略了其詳細信息。

預測

當前, model.predict不適用於MultiWorkerMirroredStrategy.

性能

現在,您已經擁有一個Keras模型,該模型全部通過MultiWorkerMirroredStrategy設置為可在多個工作程序中運行。您可以嘗試以下技術,以通過MultiWorkerMirroredStrategy調整多工人培訓的MultiWorkerMirroredStrategy

  • MultiWorkerMirroredStrategy提供了多個集體的通信實現RING使用gRPC作為跨主機通信層實現基於環的集合。 NCCL使用Nvidia的NCCL實施集體。 AUTO將選擇推遲到運行時。集體實施的最佳選擇取決於GPU的數量和種類以及集群中的網絡互連。要覆蓋自動選擇,請為MultiWorkerMirroredStrategy的構造函數的communication參數指定一個有效值,例如communication=tf.distribute.experimental.CollectiveCommunication.NCCL
  • 如果可能,將變量tf.floattf.float 。 ResNet的官方模型包括如何完成此操作的示例

容錯能力

在同步訓練中,如果其中一個工作程序出現故障並且不存在故障恢復機制,則群集將失敗。在工人死亡或不穩定的情況下,將tf.distribute.Strategytf.distribute.Strategy結合使用具有tf.distribute.Strategy的優勢。我們通過在您選擇的分佈式文件系統中保留訓練狀態來做到這一點,以便在重新啟動先前失敗或被搶占的實例後,將恢復訓練狀態。

由於所有工人在培訓時期和步驟方面保持同步,因此其他工人將需要等待失敗或被搶占的工人重新啟動才能繼續。

ModelCheckpoint回調

ModelCheckpoint回調不再提供容錯功能,請改用BackupAndRestore回調。

ModelCheckpoint回調仍可用於保存檢查點。但是,如果培訓被中斷或成功完成,則為了從檢查點繼續進行培訓,用戶有責任手動加載模型。用戶可以選擇在ModelCheckpoint回調之外選擇保存和恢復模型/權重。

模型保存和加載

要使用model.savetf.saved_model.save保存模型,每個工作者的保存目的地都必須不同。在非主要工作人員上,您需要將模型保存到一個臨時目錄,在主要人員上,您需要將模型保存到提供的模型目錄。工作程序上的臨時目錄必須唯一,以防止由於多個工作程序嘗試寫入同一位置而導致的錯誤。保存在所有目錄中的模型都是相同的,通常只有負責人保存的模型才可以引用來進行還原或服務。我們建議您使用一些清理邏輯,以在培訓完成後刪除工作人員創建的臨時目錄。

您需要同時節省首席和工人的原因,是因為您可能在檢查點期間匯總變量,這需要首席和工人都參與到allreduce通信協議中。另一方面,讓負責人和工作人員保存到同一模型目錄會由於爭用而導致錯誤。

使用MultiWorkerMirroredStrategy ,程序在每個工作程序上運行,並且為了知道當前工作程序是否為主要工作程序,我們利用了具有task_typetask_id屬性的集群解析器對象。 task_type告訴您當前工作是什麼(例如'worker'),而task_id告訴您工作人員的標識符。 ID為0的工人被指定為主要工人。

在下面的代碼段中, write_filepath提供要寫入的文件路徑,這取決於工作程序ID。對於酋長(ID為0的工人),它將寫入原始文件路徑;對於其他用戶,它將創建一個臨時目錄(目錄路徑中的ID)以寫入:

 model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works 
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
 

這樣,您現在就可以保存:

 multi_worker_model.save(write_model_path)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

如上所述,稍後應僅從首席負責人保存到的路徑中加載模型,因此讓我們刪除非首席工作人員保存的臨時工作:

 if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))
 

現在,當需要加載時,讓我們使用方便的tf.keras.models.load_model API,然後繼續進行進一步的工作。在這裡,我們假設僅使用單個工作程序來加載並繼續訓練,在這種情況下,您不會在另一個strategy.scope()調用tf.keras.models.load_model

 loaded_model = tf.keras.models.load_model(model_path)

# Now that we have the model restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9825 - accuracy: 0.1102
Epoch 2/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9367 - accuracy: 0.1117

<tensorflow.python.keras.callbacks.History at 0x7fc5f4b0d8d0>

檢查點保存和還原

另一方面,檢查點使您可以保存模型的權重並將其還原,而不必保存整個模型。在這裡,您將創建一個跟踪tf.train.Checkpoint的模型,該模型由tf.train.CheckpointManager管理,以便僅保留最新的檢查點。

 checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
  checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
 

設置CheckpointManager後,您現在可以保存,並刪除已保存的非首席工作人員的檢查點。

 checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)
 

現在,當您需要還原時,可以使用方便的tf.train.latest_checkpoint函數找到保存的最新檢查點。恢復檢查點後,您可以繼續進行培訓。

 latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9841 - accuracy: 0.6561
Epoch 2/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9445 - accuracy: 0.6805

<tensorflow.python.keras.callbacks.History at 0x7fc5f49d9d30>

BackupAndRestore回調

BackupAndRestore回調通過將模型和當前紀元數備份到BackupAndRestore backup_dir參數下的臨時檢查點文件中,提供了容錯功能。這是在每個時期結束時完成的。

一旦作業被中斷並重新啟動,回調將恢復最後一個檢查點,並且訓練將從中斷的紀元開始繼續。在中斷之前在未完成的紀元中已經進行的任何局部訓練都將被丟棄,從而不會影響最終的模型狀態。

要使用它, tf.keras.callbacks.experimental.BackupAndRestoretf.keras.Model.fit()調用中提供一個tf.keras.callbacks.experimental.BackupAndRestore實例。

使用MultiWorkerMirroredStrategy,如果某個工作線程被中斷,則整個群集將暫停,直到重新啟動被中斷的工作線程為止。其他工作程序也將重新啟動,並且被中斷的工作程序將重新加入群集。然後,每個工作人員都讀取先前保存的檢查點文件,並選擇其以前的狀態,從而使群集能夠恢復同步。然後訓練繼續。

BackupAndRestore回調使用CheckpointManager來保存和恢復訓練狀態,該訓練狀態會生成一個名為checkpoint的文件,該文件將跟踪現有的檢查點以及最新的檢查點。因此,不應重複使用backup_dir來存儲其他檢查點,以避免名稱衝突。

當前, BackupAndRestore回調支持無策略的單工作程序,MirroredStrategy和具有MultiWorkerMirroredStrategy的多工作人員。以下是多工人培訓和單工人培訓的兩個示例。

 # Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
 
Epoch 1/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2837 - accuracy: 0.1836
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2131 - accuracy: 0.4091
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1310 - accuracy: 0.5485

<tensorflow.python.keras.callbacks.History at 0x7fc5f49a3080>

如果檢查在BackupAndRestore指定的backup_dir目錄,則可能會注意到一些臨時生成的檢查點文件。這些文件是恢復以前丟失的實例所必需的,並且在成功退出培訓後,這些文件將被tf.keras.Model.fit()結尾的庫刪除。

也可以看看

  1. TensorFlow中的分佈式培訓指南提供了可用的分發策略的概述。
  2. 官方模型 ,其中許多模型可以配置為運行多種分銷策略。
  3. 指南中的`` 性能''部分提供了有關可用於優化TensorFlow模型性能的其他策略和工具的信息。