本頁面由 Cloud Translation API 翻譯而成。
Switch to English

使用TensorFlow進行分佈式培訓

在TensorFlow.org上查看 在Google Colab中運行 在GitHub上查看源代碼 下載筆記本

總覽

tf.distribute.Strategy是一個TensorFlow API,用於在多個GPU,多個計算機或TPU之間分配培訓。使用此API,您可以以最小的代碼更改來分發現有模型和培訓代碼。

設計tf.distribute.Strategy時要牢記以下關鍵目標:

  • 易於使用並支持多個用戶細分,包括研究人員,ML工程師等。
  • 開箱即用地提供良好的性能。
  • 輕鬆切換策略。

tf.distribute.Strategy可以用高級API等中使用Keras ,並且也可以用於分發定制訓練環(和,一般地,使用TensorFlow任何計算)。

在TensorFlow 2.x中,您可以急切地執行程序,也可以使用tf.function在圖形中執行程序。 tf.distribute.Strategy打算支持這兩種執行模式,但與tf.function一起使用效果最佳。建議僅將Eager模式用於調試目的,而TPUStrategy不支持此TPUStrategy 。儘管我們在本指南中大部分時間都在討論培訓,但該API也可以用於在不同平台上分發評估和預測。

您可以使用tf.distribute.Strategy更改代碼,因為我們已將TensorFlow的基礎組件更改為可感知策略。這包括變量,層,模型,優化器,指標,摘要和檢查點。

在本指南中,我們說明了各種類型的策略以及如何在不同情況下使用它們。

 # Import TensorFlow
import tensorflow as tf
 

策略類型

tf.distribute.Strategy打算涵蓋不同方面的許多用例。目前支持其中的某些組合,將來還會添加其他組合。其中一些軸是:

  • 同步訓練與異步訓練:這是通過數據並行性分佈訓練的兩種常用方法。在同步培訓中,所有工作人員都同步地對輸入數據的不同片段進行培訓,並在每個步驟中匯總梯度。在異步培訓中,所有工作人員都在獨立訓練輸入數據並異步更新變量。通常情況下,同步訓練通過全約簡和參數服務器架構的異步支持。
  • 硬件平台:您可能希望將培訓擴展到一台計算機上的多個GPU或網絡中的多台計算機(每個具有0個或多個GPU)或Cloud TPU上。

為了支持這些用例,提供了六種策略。在下一節中,我們將說明當前在TF 2.2中的哪些情況下支持這些功能。快速概述:

培訓API 鏡像策略 TPU策略多工鏡像策略中央存儲策略 ParameterServerStrategy
Keras API 支持的支持的實驗支持實驗支持計劃支持的職位2.3
自定義訓練循環 支持的支持的實驗支持實驗支持計劃支持的職位2.3
估算器API 有限的支持不支持有限的支持有限的支持有限的支持

鏡像策略

tf.distribute.MirroredStrategy支持在一台機器上的多個GPU上的同步分佈式訓練。它為每個GPU設備創建一個副本。模型中的每個變量都在所有副本之間進行鏡像。這些變量一起形成一個稱為MirroredVariable概念變量。通過應用相同的更新,這些變量彼此保持同步。

高效的全縮減算法用於在設備之間傳遞變量更新。通過將所有張量相加,All-reduce可以在所有設備上聚合它們,並使它們在每個設備上可用。這是一種非常有效的融合算法,可以大大減少同步的開銷。根據設備之間可用的通信類型,有許多全縮減算法和實現可用。默認情況下,它使用NVIDIA NCCL作為全縮減實現。您可以從我們提供的其他選項中進行選擇,也可以自己編寫。

這是創建MirroredStrategy的最簡單方法:

 mirrored_strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

這將創建一個MirroredStrategy實例,該實例將使用TensorFlow可見的所有GPU,並將NCCL用作跨設備通信。

如果您只想使用計算機上的某些GPU,可以這樣做:

 mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
 
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

如果您希望覆蓋跨設備通信,則可以通過提供tf.distribute.CrossDeviceOps的實例,使用cross_device_ops參數來tf.distribute.CrossDeviceOps 。目前, tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDevice比其他兩個選項tf.distribute.NcclAllReduce這是默認的。

 mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPU策略

tf.distribute.TPUStrategy ,您可以在Tensor處理單元(TPU)上運行TensorFlow培訓。 TPU是Google的專用ASIC,旨在極大地加快機器學習的工作量。可在Google Colab, TensorFlow Research CloudCloud TPU上使用它們

在分佈式培訓體系結構方面, TPUStrategy是相同的MirroredStrategy它實現了同步的分佈式培訓。 TPU跨多個TPU內核提供自己的高效全縮減和其他集合操作的實現,這些內核在TPUStrategy中使用。

這是實例化TPUStrategy

 cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
 

TPUClusterResolver實例可幫助定位TPU。在Colab中,您無需為其指定任何參數。

如果要將其用於Cloud TPU:

  • 您必須在tpu參數中指定TPU資源的名稱。
  • 您必須在程序開始時顯式初始化tpu系統。在將TPU用於計算之前,這是必需的。初始化tpu系統還會擦除TPU內存,因此,請務必首先完成此步驟,以避免丟失狀態。

多工鏡像策略

tf.distribute.experimental.MultiWorkerMirroredStrategyMirroredStrategy非常相似。它實現了跨多個工作人員的同步分佈式培訓,每個工作人員都可能具有多個GPU。與MirroredStrategy相似,它在所有工作人員的每台設備上的模型中創建所有變量的副本。

它使用CollectiveOps作為多變量全減少通信方法,用於使變量保持同步。集合運算是TensorFlow圖中的單個運算,可以根據硬件,網絡拓撲和張量大小在TensorFlow運行時中自動選擇全約算法。

它還實現了其他性能優化。例如,它包括靜態優化,該優化將小張量上的多個全約轉換為大張量上的較少全約。此外,我們正在設計它具有插件架構-以便將來您將能夠插入針對您的硬件進行更好調整的算法。請注意,集體行動還實施其他集體行動,例如廣播和全會。

這是創建MultiWorkerMirroredStrategy的最簡單方法:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

當前, MultiWorkerMirroredStrategy允許您在集體操作的兩種不同實現之間進行選擇。 CollectiveCommunication.RING使用gRPC作為通信層實現基於環的集合。 CollectiveCommunication.NCCL使用Nvidia的NCCL來實現集合。 CollectiveCommunication.AUTO將選擇推遲到運行時。集體實施的最佳選擇取決於GPU的數量和種類以及集群中的網絡互連。您可以通過以下方式指定它們:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

與多GPU培訓相比,進行多工作者培訓的主要差異之一是多工作者設置。 TF_CONFIG環境變量是TensorFlow中為群集的每個工作程序指定群集配置的標準方法。了解有關設置TF_CONFIG的更多信息。

中央存儲策略

tf.distribute.experimental.CentralStorageStrategy也執行同步訓練。變量不會被鏡像,而是將它們放置在CPU上,並且操作會在所有本地GPU之間複製。如果只有一個GPU,則所有變量和操作都將放置在該GPU上。

通過以下方式創建CentralStorageStrategy的實例:

 central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
 
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

這將創建一個CentralStorageStrategy實例,該實例將使用所有可見的GPU和CPU。在副本上對變量的更新將在應用於變量之前進行匯總。

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy支持在多台計算機上進行參數服務器培訓。在此設置中,某些計算機被指定為工作服務器,而某些計算機被指定為參數服務器。模型的每個變量都放在一個參數服務器上。計算在所有工作程序的所有GPU之間複製。

在代碼方面,它看起來與其他策略類似:

 ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
 

對於多工作者訓練, TF_CONFIG需要指定集群中參數服務器和工作者的配置,您可以在下面的TF_CONFIG中了解更多信息。

其他策略

除了上述策略外,還有其他兩種策略可能在使用tf.distribute API時對原型設計和調試有用。

默認策略

默認策略是一種分發策略,當沒有明確的分發策略在範圍內時,就會存在。它實現了tf.distribute.Strategy接口,但是是傳遞功能,不提供實際的分配。例如, strategy.run(fn)將僅調用fn 。使用此策略編寫的代碼應與沒有任何策略編寫的代碼完全一樣。您可以將其視為“無操作”策略。

默認策略是單例-不能創建它的更多實例。可以使用tf.distribute.get_strategy()在任何顯式策略範圍之外(與可用於在顯式策略範圍內獲取當前策略的API相同)獲得它。

 default_strategy = tf.distribute.get_strategy()
 

此策略有兩個主要目的:

  • 它允許無條件地編寫具有分發意識的庫代碼。例如,在優化程序中,我們可以執行tf.distribute.get_strategy()並使用該策略來減少梯度-它總是返回一個策略對象,我們可以在該對像上調用reduce API。
 # In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
 
1.0
  • 與庫代碼相似,它可以用於編寫最終用戶的程序,以使用或不使用分配策略,而無需條件邏輯。一個示例代碼片段說明了這一點:
 if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
 
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy是一種將所有變量和計算放在單個指定設備上的策略。

 strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
 

此策略在許多方面與默認策略不同。在默認策略中,與沒有任何分發策略的TensorFlow運行相比,變量放置邏輯保持不變。但是,當使用OneDeviceStrategy ,在其作用域中創建的所有變量都將顯式放置在指定的設備上。此外,通過OneDeviceStrategy.run調用的任何函數也將放置在指定的設備上。

通過此策略分配的輸入將被預取到指定的設備。在默認策略中,沒有輸入分佈。

與默認策略類似,在切換到實際分發給多個設備/機器的其他策略之前,也可以使用此策略來測試代碼。這將比默認策略更多地使用分發策略機制,但不能像使用MirroredStrategyTPUStrategy等那樣充分發揮其作用。如果您希望代碼表現得像沒有策略,那麼請使用默認策略。

到目前為止,我們已經討論了可用的不同策略以及如何實例化它們。在接下來的幾節中,我們將討論使用它們分發培訓的不同方法。我們將在本指南中顯示簡短的代碼片段,並鏈接至可以端對端運行的完整教程。

tf.distribute.Strategytf.keras.Model.fit tf.distribute.Strategy使用

我們已將tf.distribute.Strategy集成到tf.keras ,這是TensorFlow對tf.keras API規範的實現。 tf.keras是用於構建和訓練模型的高級API。通過集成到tf.keras後端,我們使您可以無縫地使用model.fit分發在model.fit培訓框架中編寫的培訓。

您需要在代碼中進行以下更改:

  1. 創建適當的tf.distribute.Strategy的實例。
  2. 將Keras模型,優化器和指標的創建移到strategy.scope

我們支持所有類型的Keras模型-順序,功能和子類化。

這是一段代碼的示例,適用於具有一個密集層的非常簡單的Keras模型:

 mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

在此示例中,我們使用了MirroredStrategy因此我們可以在具有多個GPU的計算機上運行它。 strategy.scope()向Keras指示使用哪種策略來分發培訓。在此範圍內創建模型/優化器/指標可以使我們創建分佈式變量而不是常規變量。設置完成後,您就可以像平時一樣適應模型。 MirroredStrategy負責在可用的GPU上複製模型的訓練,匯總漸變等。

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
 
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.0035
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.4436
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.2755

0.27546340227127075

在這裡,我們使用tf.data.Dataset提供訓練和評估輸入。您還可以使用numpy數組:

 import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
 
Epoch 1/2
10/10 [==============================] - 0s 1ms/step - loss: 0.1961
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.0867

<tensorflow.python.keras.callbacks.History at 0x7fa17808e240>

在兩種情況下(數據集或numpy),給定輸入的每批均等地分配在多個副本中。例如,如果將MirroredStrategy與2個GPU結合使用,則每批大小為10的批處理將分配到2個GPU中,每個GPU在每個步驟中接收5個輸入示例。然後,當您添加更多GPU時,每個紀元將訓練得更快。通常,您希望隨著添加更多的加速器來增加批處理大小,以便有效利用額外的計算能力。您還需要根據模型重新調整學習率。您可以使用strategy.num_replicas_in_sync來獲取副本數。

 # Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
 

現在支持什麼?

培訓API 鏡像策略 TPU策略多工鏡像策略中央存儲策略 ParameterServerStrategy
Keras API 支持的支持的實驗支持實驗支持計劃支持的職位2.3

示例和教程

這是一系列教程和示例,這些示例和示例說明了與Keras的上述集成端到端:

  1. MirroredStrategy訓練MNIST的教程
  2. 使用MultiWorkerMirroredStrategy訓練MNIST的教程
  3. 使用TPUStrategy培訓MNIST的指南
  4. TensorFlow Model Garden 存儲庫包含使用各種策略實施的最新模型的集合。

tf.distribute.Strategy與自定義訓練循環一起使用

如您所見,在tf.distribute.Strategy使用model.fit只需更改幾行代碼。稍加努力,您還可以將tf.distribute.Strategy與自定義訓練循環一起使用。

如果您需要比Estimator或Keras更大的靈活性和對訓練循環的控制,則可以編寫自定義訓練循環。例如,當使用GAN時,您可能希望每輪採取不同數量的生成器或鑑別器步驟。同樣,高級框架也不太適合強化學習培訓。

為了支持自定義訓練循環,我們通過tf.distribute.Strategy類提供了一組核心方法。使用這些代碼可能最初需要對代碼進行較小的重組,但是一旦完成,您只需更改策略實例就應該能夠在GPU,TPU和多台計算機之間切換。

在這裡,我們將顯示一個簡短的代碼片段,說明使用與以前相同的Keras模型的簡單培訓示例的此用例。

首先,我們在策略範圍內創建模型和優化器。這樣可以確保使用模型和優化器創建的任何變量都是鏡像變量。

 with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
 

接下來,我們創建輸入數據集並調用tf.distribute.Strategy.experimental_distribute_dataset以根據策略分配數據集。

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
 

然後,我們定義了培訓的一個步驟。我們將使用tf.GradientTape計算梯度,並使用優化器應用這些梯度來更新模型變量。為了分發此訓練步驟,我們放入一個函數train_step ,並將其與從dist_dataset創建的dist_dataset獲取的數據集輸入一起傳遞給tf.distrbute.Strategy.run

 loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
 

上面的代碼中還有一些其他注意事項:

  1. 我們使用tf.nn.compute_average_loss計算損失。 tf.nn.compute_average_loss將每個示例的損失相加,然後將其除以global_batch_size。這很重要,因為稍後在對每個副本計算梯度之後,通過它們求和將它們匯總到副本中。
  2. 我們使用了tf.distribute.Strategy.reduce API來匯總tf.distribute.Strategy.run返回的結果。 tf.distribute.Strategy.run從策略中的每個本地副本返回結果,並且有多種方法可以使用此結果。您可以reduce它們以獲得匯總值。您也可以執行tf.distribute.Strategy.experimental_local_results來獲取結果中包含的值的列表,每個本地副本一個。
  3. 在分發策略範圍內調用apply_gradients ,將修改其行為。具體而言,在同步訓練期間將梯度應用於每個並行實例之前,它會執行所有梯度的總和。

最後,一旦定義了訓練步驟,就可以遍歷dist_dataset並循環運行訓練:

 for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
 
tf.Tensor(0.4155251, shape=(), dtype=float32)
tf.Tensor(0.41321823, shape=(), dtype=float32)
tf.Tensor(0.4109319, shape=(), dtype=float32)
tf.Tensor(0.40866604, shape=(), dtype=float32)
tf.Tensor(0.40642032, shape=(), dtype=float32)
tf.Tensor(0.40419456, shape=(), dtype=float32)
tf.Tensor(0.4019885, shape=(), dtype=float32)
tf.Tensor(0.399802, shape=(), dtype=float32)
tf.Tensor(0.39763477, shape=(), dtype=float32)
tf.Tensor(0.3954866, shape=(), dtype=float32)
tf.Tensor(0.39335734, shape=(), dtype=float32)
tf.Tensor(0.3912467, shape=(), dtype=float32)
tf.Tensor(0.38915452, shape=(), dtype=float32)
tf.Tensor(0.38708064, shape=(), dtype=float32)
tf.Tensor(0.38502476, shape=(), dtype=float32)
tf.Tensor(0.38298675, shape=(), dtype=float32)
tf.Tensor(0.38096642, shape=(), dtype=float32)
tf.Tensor(0.3789635, shape=(), dtype=float32)
tf.Tensor(0.3769779, shape=(), dtype=float32)
tf.Tensor(0.37500936, shape=(), dtype=float32)

在上面的示例中,我們遍歷了dist_dataset您的培訓提供輸入。我們還提供了tf.distribute.Strategy.make_experimental_numpy_dataset以支持numpy輸入。您可以在調用tf.distribute.Strategy.experimental_distribute_dataset之前使用此API創建數據集。

迭代數據的另一種方法是顯式使用迭代器。當您要運行給定數量的步驟而不是遍歷整個數據集時,可能需要執行此操作。上述迭代現在將修改首先創建一個迭代器,然後顯式調用next就可以得到的輸入數據。

 iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
 
tf.Tensor(0.37305772, shape=(), dtype=float32)
tf.Tensor(0.3711228, shape=(), dtype=float32)
tf.Tensor(0.3692044, shape=(), dtype=float32)
tf.Tensor(0.36730233, shape=(), dtype=float32)
tf.Tensor(0.3654165, shape=(), dtype=float32)
tf.Tensor(0.36354658, shape=(), dtype=float32)
tf.Tensor(0.36169255, shape=(), dtype=float32)
tf.Tensor(0.3598542, shape=(), dtype=float32)
tf.Tensor(0.35803124, shape=(), dtype=float32)
tf.Tensor(0.3562236, shape=(), dtype=float32)

這涵蓋了使用tf.distribute.Strategy API分發自定義訓練循環的最簡單情況。我們正在改進這些API。由於此用例需要更多工作來適應您的代碼,因此我們將來會發佈單獨的詳細指南。

現在支持什麼?

培訓API 鏡像策略 TPU策略多工鏡像策略中央存儲策略 ParameterServerStrategy
自定義培訓循環支持的支持的實驗支持實驗支持計劃支持的職位2.3

示例和教程

以下是將分發策略與自定義訓練循環一起使用的一些示例:

  1. 使用MirroredStrategy訓練MNIST的教程
  2. 使用TPUStrategy培訓MNIST的指南
  3. TensorFlow Model Garden 存儲庫包含使用各種策略實施的最新模型的集合。

tf.distribute.Strategy與Estimator tf.distribute.Strategy使用(有限支持)

tf.estimator是一個分佈式培訓TensorFlow API,最初支持異步參數服務器方法。與tf.distribute.Strategy一樣,我們將tf.distribute.Strategy集成到tf.Estimator 。如果您使用Estimator進行培訓,則只需更改很少的代碼即可輕鬆更改為分佈式培訓。這樣,Estimator用戶現在可以在多個GPU和多個工作程序上進行同步分佈式培訓,以及使用TPU。但是,Estimator中的這種支持是有限的。有關更多詳細信息,請參見下面的“現在支持什麼”部分。

Estimator中tf.distribute.Strategy的用法與tf.distribute.Strategy情況稍有不同。現在我們RunConfig使用strategy.scope ,而是將策略對RunConfig傳遞給RunConfig

這是一段代碼片段,其中使用預製的Estimator LinearRegressorMirroredStrategy展示了這一點:

 mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp2ack9oru
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp2ack9oru', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

我們在這裡使用預製的Estimator,但是相同的代碼也可以與自定義Estimator一起使用。 train_distribute確定培訓的分配方式,而eval_distribute確定評估的分配方式。這與Keras的另一個不同之處在於Keras在訓練和評估時都使用相同的策略。

現在,我們可以使用輸入函數來訓練和評估此估算器:

 def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
 
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-08-04T20:28:12Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp2ack9oru/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.20350s
INFO:tensorflow:Finished evaluation at 2020-08-04-20:28:12
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmp2ack9oru/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Estimator和Keras之間要強調的另一個區別是輸入處理。在Keras中,我們提到數據集的每一批都自動拆分為多個副本。但是,在Estimator中,我們不會自動拆分批次,也不會自動將數據分片給不同的工作程序。您可以完全控制希望如何在工作人員和設備之間分配數據,並且必須提供input_fn來指定如何分配數據。

每個worker調用一次input_fn ,因此每個worker給出一個數據集。然後,該數據集中的一批將被饋送到該工作程序上的一個副本,從而消耗1個工作程序上的N個副本的N個批次。換句話說, input_fn返回的數據集應提供PER_REPLICA_BATCH_SIZE大小的PER_REPLICA_BATCH_SIZE 。並且可以通過PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync獲得步驟的全局批處理大小。

在進行多員工培訓時,您應該將數據分散到各個員工之間,或者在每個員工上隨機散佈種子。您可以在使用Estimator進行的多員工培訓中看到有關如何執行此操作的示例。

同樣,您也可以使用多工作者和參數服務器策略。代碼保持不變,但是您需要使用tf.estimator.train_and_evaluate ,並為集群中運行的每個二進製文件設置TF_CONFIG環境變量。

現在支持什麼?

TPUStrategy之外,使用所有策略進行Estimator培訓的支持有限。基本的培訓和評估應該有效,但是諸如腳手架之類的許多高級功能尚不起作用。此集成中可能還存在許多錯誤。目前,我們不打算積極改善這種支持,而是專注於Keras和自定義訓練循環支持。如果有可能,您應該更喜歡將tf.distribute與這些API結合使用。

培訓API 鏡像策略 TPU策略多工鏡像策略中央存儲策略 ParameterServerStrategy
估算器API 有限的支持不支持有限的支持有限的支持有限的支持

示例和教程

以下是一些示例,展示了Estimator各種策略的端到端用法:

  1. 使用估計器進行多工人培訓,以使用MultiWorkerMirroredStrategy培訓多名工人的MultiWorkerMirroredStrategy
  2. 使用Kubernetes模板在tensorflow /生態系統中進行多工作者培訓的端到端示例 。本示例從Keras模型開始,然後使用tf.keras.estimator.model_to_estimator API將其轉換為Estimator。
  3. 官方ResNet50模型,可以使用MirroredStrategyMultiWorkerMirroredStrategy進行訓練。

其他話題

在本節中,我們將涵蓋與多個用例相關的一些主題。

設置TF_CONFIG環境變量

如前所述,對於多工人培訓,您需要為集群中運行的每個二進製文件設置TF_CONFIG環境變量。 TF_CONFIG環境變量是一個JSON字符串,它指定組成集群的任務,它們的地址以及集群中每個任務的角色。我們在tensorflow /生態系統倉庫中提供了一個Kubernetes模板,該模板為您的訓練任務設置了TF_CONFIG

TF_CONFIG有兩個組件:集群和任務。群集提供有關培訓群集的信息,這是由不同類型的工作(例如工人)組成的命令。在多工人培訓中,除了一名普通工人外,通常還有一名工人承擔更多責任,例如保存檢查點和為TensorBoard編寫摘要文件。這種工人稱為“首席”工人,習慣上將索引為0的工人指定為主要工人(實際上這就是tf.distribute.Strategy的實現方式)。另一方面,task提供當前任務的信息。第一個組件集群對所有工作程序都相同,而第二個組件任務在每個工作程序上均不同,並指定該工作程序的類型和索引。

TF_CONFIG一個示例是:

 os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})
 

TF_CONFIG指定集群中包含三個worker和兩個ps任務以及它們的主機和端口。 “任務”部分指定集群中當前任務的角色,即工作程序1(第二個工作程序)。群集中的有效角色是“首席”,“工人”,“ ps”和“評估者”。除了使用tf.distribute.experimental.ParameterServerStrategy之外, tf.distribute.experimental.ParameterServerStrategy應有“ ps”作業。

下一步是什麼?

tf.distribute.Strategy正在積極開發中。我們歡迎您嘗試一下,並使用GitHub問題提供您的反饋。